qwen_agent/utils/multi_project_manager.py
2025-10-30 21:19:39 +08:00

537 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
多项目管理器:处理多个知识库项目的合并
"""
import os
import shutil
import json
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
from utils.file_utils import get_document_preview
def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str:
"""
生成机器人项目的目录树结构
Args:
robot_dir: 机器人项目目录路径
robot_id: 机器人ID
max_depth: 最大深度
Returns:
str: 目录树字符串
"""
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
if depth > max_depth:
return []
lines = []
try:
entries = sorted(os.listdir(path))
# 分离目录和文件
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
entries = dirs + files
for i, entry in enumerate(entries):
entry_path = os.path.join(path, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
# 选择适当的树形符号
if is_last_entry:
connector = "└── "
new_prefix = prefix + " "
else:
connector = "├── "
new_prefix = prefix + ""
# 添加条目行
line = prefix + connector + entry
if is_dir:
line += "/"
lines.append(line)
# 递归添加子目录
if is_dir and depth < max_depth:
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
lines.extend(sub_lines)
except PermissionError:
lines.append(prefix + "└── [Permission Denied]")
except Exception as e:
lines.append(prefix + "└── [Error: " + str(e) + "]")
return lines
# 从dataset目录开始构建树
dataset_dir = os.path.join(robot_dir, "dataset")
tree_lines = []
if not os.path.exists(dataset_dir):
return "└── [No dataset directory found]"
try:
entries = sorted(os.listdir(dataset_dir))
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
entries = dirs + files
if not entries:
tree_lines.append("└── [Empty dataset directory]")
else:
for i, entry in enumerate(entries):
entry_path = os.path.join(dataset_dir, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
if is_last_entry:
connector = "└── "
prefix = " "
else:
connector = "├── "
prefix = ""
line = connector + entry
if is_dir:
line += "/"
tree_lines.append(line)
# 递归添加子目录
if is_dir:
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
tree_lines.extend(sub_lines)
except Exception as e:
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
return "\n".join(tree_lines)
def get_unique_folder_name(target_dir: Path, original_name: str) -> str:
"""
获取唯一的文件夹名称,如果存在重名则添加数字后缀
Args:
target_dir: 目标目录
original_name: 原始文件夹名称
Returns:
str: 唯一的文件夹名称
"""
if not (target_dir / original_name).exists():
return original_name
# 存在重名,添加数字后缀
counter = 1
while True:
new_name = f"{original_name}_{counter}"
if not (target_dir / new_name).exists():
return new_name
counter += 1
def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str) -> Dict:
"""
复制单个项目的dataset文件夹到目标目录
Args:
source_project_id: 源项目ID
target_dataset_dir: 目标dataset目录
folder_name: 要复制的文件夹名称
Returns:
Dict: 复制结果
"""
result = {
"success": False,
"source_path": "",
"target_path": "",
"original_folder_name": folder_name,
"final_folder_name": folder_name,
"error": None
}
try:
source_folder = Path("projects") / "data" / source_project_id / "dataset" / folder_name
result["source_path"] = str(source_folder)
if not source_folder.exists():
result["error"] = f"Source folder does not exist: {source_folder}"
return result
# 处理重名冲突
unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name)
result["final_folder_name"] = unique_folder_name
target_folder = target_dataset_dir / unique_folder_name
result["target_path"] = str(target_folder)
# 复制文件夹
shutil.copytree(source_folder, target_folder)
result["success"] = True
print(f" Copied: {source_folder} -> {target_folder}")
except Exception as e:
result["error"] = str(e)
print(f" Error copying {folder_name}: {str(e)}")
return result
def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict]) -> str:
"""
生成机器人项目的README.md文件
Args:
robot_id: 机器人ID
dataset_ids: 源项目ID列表
copy_results: 复制结果列表
Returns:
str: README.md文件路径
"""
readme_path = Path("projects") / "robot" / robot_id / "README.md"
readme_path.parent.mkdir(parents=True, exist_ok=True)
robot_dir = Path("projects") / "robot" / robot_id
# 统计信息
total_folders = len(copy_results)
successful_copies = sum(1 for r in copy_results if r["success"])
failed_copies = total_folders - successful_copies
# 按源项目分组
project_groups = {}
for result in copy_results:
if result["success"]:
source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/...
if source_project not in project_groups:
project_groups[source_project] = []
project_groups[source_project].append(result)
readme_content = "\n## 目录结构\n\n"
# 生成实际目录树
readme_content += "```\n"
readme_content += generate_robot_directory_tree(str(robot_dir), robot_id)
readme_content += "\n```\n\n"
readme_content += "## 数据集详情\n\n"
dataset_dir = robot_dir / "dataset"
if not dataset_dir.exists():
readme_content += "No dataset files available.\n"
else:
# 获取所有文档目录
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = dataset_dir / item
if item_path.is_dir():
doc_dirs.append(item)
except Exception as e:
print(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
# 按源项目分组显示文档
for project_id, folders in project_groups.items():
for folder in folders:
folder_name = folder["final_folder_name"]
doc_path = dataset_dir / folder_name
readme_content += f"#### {folder_name}\n\n"
readme_content += f"**Files:**\n"
# 检查文件存在性
document_file = doc_path / "document.txt"
pagination_file = doc_path / "pagination.txt"
embeddings_file = doc_path / "embedding.pkl"
readme_content += f"- `{folder_name}/document.txt`"
if document_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/pagination.txt`"
if pagination_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/embedding.pkl`"
if embeddings_file.exists():
readme_content += ""
readme_content += "\n\n"
# 添加文档预览
if document_file.exists():
readme_content += f"**内容预览 (前10行):**\n\n```\n"
preview = get_document_preview(str(document_file), 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**内容预览:** 不可用\n\n"
# 显示重命名信息
original_name = folder["original_folder_name"]
if original_name != folder_name:
readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n"
readme_content += "---\n\n"
# 写入README文件
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
print(f"Generated README: {readme_path}")
return str(readme_path)
def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool:
"""
检查是否需要重建机器人项目
1. 检查机器人项目是否存在
2. 检查是否有新增的dataset_id
3. 检查processing_log.json文件是否更新
Args:
dataset_ids: 源项目ID列表
bot_id: 机器人ID
Returns:
bool: 是否需要重建
"""
robot_dir = Path("projects") / "robot" / bot_id
# 如果机器人项目不存在,需要创建
if not robot_dir.exists():
print(f"Robot project does not exist, need to create: {bot_id}")
return True
# 检查机器人项目的配置信息
config_file = robot_dir / "robot_config.json"
if not config_file.exists():
print(f"Robot config file not found, need to rebuild: {bot_id}")
return True
# 读取配置信息
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
cached_dataset_ids = set(config.get("dataset_ids", []))
except Exception as e:
print(f"Error reading robot config: {e}, need to rebuild")
return True
# 检查dataset_ids是否有变化
current_dataset_ids = set(dataset_ids)
# 如果有新增的dataset_id
new_ids = current_dataset_ids - cached_dataset_ids
if new_ids:
print(f"Found new dataset_ids: {new_ids}, need to rebuild")
return True
# 如果有删除的dataset_id
removed_ids = cached_dataset_ids - current_dataset_ids
if removed_ids:
print(f"Removed dataset_ids: {removed_ids}, need to rebuild")
return True
# 获取机器人项目的最后修改时间
robot_mod_time = robot_dir.stat().st_mtime
# 检查每个源项目的processing_log.json文件
for source_project_id in dataset_ids:
log_file = Path("projects") / "data" / source_project_id / "processing_log.json"
if not log_file.exists():
print(f"Processing log file not found for project {source_project_id}, will rebuild")
return True
log_mod_time = log_file.stat().st_mtime
# 如果任何一个processing_log.json文件比机器人项目新需要重建
if log_mod_time > robot_mod_time:
print(f"Processing log updated for project {source_project_id}, need to rebuild")
return True
print(f"Robot project {bot_id} is up to date, no rebuild needed")
return False
def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False) -> str:
"""
创建机器人项目合并多个源项目的dataset文件夹
Args:
dataset_ids: 源项目ID列表
bot_id: 机器人ID
force_rebuild: 是否强制重建
Returns:
str: 机器人项目目录路径
"""
print(f"Creating robot project: {bot_id} from sources: {dataset_ids}")
# 检查是否需要重建
if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id):
robot_dir = Path("projects") / "robot" / bot_id
print(f"Using existing robot project: {robot_dir}")
return str(robot_dir)
# 创建机器人目录结构
robot_dir = Path("projects") / "robot" / bot_id
dataset_dir = robot_dir / "dataset"
# 清理已存在的目录(如果需要)
if robot_dir.exists():
print(f"Robot directory already exists, cleaning up: {robot_dir}")
shutil.rmtree(robot_dir)
robot_dir.mkdir(parents=True, exist_ok=True)
dataset_dir.mkdir(parents=True, exist_ok=True)
copy_results = []
# 遍历每个源项目
for source_project_id in dataset_ids:
print(f"\nProcessing source project: {source_project_id}")
source_dataset_dir = Path("projects") / "data" / source_project_id / "dataset"
if not source_dataset_dir.exists():
print(f" Warning: Dataset directory not found for project {source_project_id}")
continue
# 获取所有子文件夹
folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()]
if not folders:
print(f" Warning: No folders found in dataset directory for project {source_project_id}")
continue
# 复制每个文件夹
for folder in folders:
result = copy_dataset_folder(source_project_id, dataset_dir, folder.name)
copy_results.append(result)
# 保存配置信息
config_file = robot_dir / "robot_config.json"
config_data = {
"dataset_ids": dataset_ids,
"created_at": datetime.now().isoformat(),
"total_folders": len(copy_results),
"successful_copies": sum(1 for r in copy_results if r["success"])
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
# 生成README
readme_path = generate_robot_readme(bot_id, dataset_ids, copy_results)
# 统计信息
successful_copies = sum(1 for r in copy_results if r["success"])
print(f"\nRobot project creation completed:")
print(f" Robot directory: {robot_dir}")
print(f" Total folders processed: {len(copy_results)}")
print(f" Successful copies: {successful_copies}")
print(f" Config saved: {config_file}")
print(f" README generated: {readme_path}")
return str(robot_dir)
def get_robot_project_info(bot_id: str) -> Dict:
"""
获取机器人项目信息
Args:
bot_id: 机器人ID
Returns:
Dict: 机器人项目信息
"""
robot_dir = Path("projects") / "robot" / bot_id
if not robot_dir.exists():
return {
"exists": False,
"bot_id": bot_id,
"error": "Robot project does not exist"
}
dataset_dir = robot_dir / "dataset"
readme_path = robot_dir / "README.md"
# 统计文件夹数量
folder_count = 0
total_size = 0
if dataset_dir.exists():
for item in dataset_dir.iterdir():
if item.is_dir():
folder_count += 1
# 计算文件夹大小
for file_path in item.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
return {
"exists": True,
"bot_id": bot_id,
"robot_dir": str(robot_dir),
"dataset_dir": str(dataset_dir),
"readme_exists": readme_path.exists(),
"folder_count": folder_count,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2)
}
def cleanup_robot_project(bot_id: str) -> bool:
"""
清理机器人项目
Args:
bot_id: 机器人ID
Returns:
bool: 清理是否成功
"""
try:
robot_dir = Path("projects") / "robot" / bot_id
if robot_dir.exists():
shutil.rmtree(robot_dir)
print(f"Cleaned up robot project: {bot_id}")
return True
else:
print(f"Robot project does not exist: {bot_id}")
return True
except Exception as e:
print(f"Error cleaning up robot project {bot_id}: {str(e)}")
return False
if __name__ == "__main__":
# 测试代码
test_dataset_ids = ["test-project-1", "test-project-2"]
test_bot_id = "test-robot-001"
robot_dir = create_robot_project(test_dataset_ids, test_bot_id)
print(f"Created robot project at: {robot_dir}")
info = get_robot_project_info(test_bot_id)
print(f"Robot project info: {json.dumps(info, indent=2, ensure_ascii=False)}")