qwen_agent/utils/multi_project_manager.py
2026-02-06 17:05:17 +08:00

549 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
多项目管理器:处理多个知识库项目的合并
"""
import os
import shutil
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
# Configure logger
logger = logging.getLogger('app')
from utils.file_utils import get_document_preview
from utils import settings
def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str:
"""
生成机器人项目的目录树结构
Args:
robot_dir: 机器人项目目录路径
robot_id: 机器人ID
max_depth: 最大深度
Returns:
str: 目录树字符串
"""
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
if depth > max_depth:
return []
lines = []
try:
entries = sorted(os.listdir(path))
# 分离目录和文件
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
entries = dirs + files
for i, entry in enumerate(entries):
entry_path = os.path.join(path, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
# 选择适当的树形符号
if is_last_entry:
connector = "└── "
new_prefix = prefix + " "
else:
connector = "├── "
new_prefix = prefix + ""
# 添加条目行
line = prefix + connector + entry
if is_dir:
line += "/"
lines.append(line)
# 递归添加子目录
if is_dir and depth < max_depth:
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
lines.extend(sub_lines)
except PermissionError:
lines.append(prefix + "└── [Permission Denied]")
except Exception as e:
lines.append(prefix + "└── [Error: " + str(e) + "]")
return lines
# 从dataset目录开始构建树
dataset_dir = os.path.join(robot_dir, "dataset")
tree_lines = []
if not os.path.exists(dataset_dir):
return "└── [No dataset directory found]"
try:
entries = sorted(os.listdir(dataset_dir))
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
entries = dirs + files
if not entries:
tree_lines.append("└── [Empty dataset directory]")
else:
for i, entry in enumerate(entries):
entry_path = os.path.join(dataset_dir, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
if is_last_entry:
connector = "└── "
prefix = " "
else:
connector = "├── "
prefix = ""
line = connector + entry
if is_dir:
line += "/"
tree_lines.append(line)
# 递归添加子目录
if is_dir:
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
tree_lines.extend(sub_lines)
except Exception as e:
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
return "\n".join(tree_lines)
def get_unique_folder_name(target_dir: Path, original_name: str) -> str:
"""
获取唯一的文件夹名称,如果存在重名则添加数字后缀
Args:
target_dir: 目标目录
original_name: 原始文件夹名称
Returns:
str: 唯一的文件夹名称
"""
if not (target_dir / original_name).exists():
return original_name
# 存在重名,添加数字后缀
counter = 1
while True:
new_name = f"{original_name}_{counter}"
if not (target_dir / new_name).exists():
return new_name
counter += 1
def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict:
"""
复制单个项目的dataset文件夹到目标目录
Args:
source_project_id: 源项目ID
target_dataset_dir: 目标dataset目录
folder_name: 要复制的文件夹名称
project_path: 项目路径
Returns:
Dict: 复制结果
"""
result = {
"success": False,
"source_path": "",
"target_path": "",
"original_folder_name": folder_name,
"final_folder_name": folder_name,
"error": None
}
try:
source_folder = project_path / "data" / source_project_id / "dataset" / folder_name
result["source_path"] = str(source_folder)
if not source_folder.exists():
result["error"] = f"Source folder does not exist: {source_folder}"
return result
# 处理重名冲突
unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name)
result["final_folder_name"] = unique_folder_name
target_folder = target_dataset_dir / unique_folder_name
result["target_path"] = str(target_folder)
# 复制文件夹
shutil.copytree(source_folder, target_folder)
result["success"] = True
logger.info(f" Copied: {source_folder} -> {target_folder}")
except Exception as e:
result["error"] = str(e)
logger.error(f" Error copying {folder_name}: {str(e)}")
return result
def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str:
"""
生成机器人项目的README.md文件
Args:
robot_id: 机器人ID
dataset_ids: 源项目ID列表
copy_results: 复制结果列表
Returns:
str: README.md文件路径
"""
readme_path = project_path / "robot" / robot_id / "README.md"
readme_path.parent.mkdir(parents=True, exist_ok=True)
robot_dir = project_path / "robot" / robot_id
# 统计信息
total_folders = len(copy_results)
successful_copies = sum(1 for r in copy_results if r["success"])
failed_copies = total_folders - successful_copies
# 按源项目分组
project_groups = {}
for result in copy_results:
if result["success"]:
source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/...
if source_project not in project_groups:
project_groups[source_project] = []
project_groups[source_project].append(result)
readme_content = "\n## 目录结构\n\n"
# 生成实际目录树
readme_content += "```\n"
readme_content += generate_robot_directory_tree(str(robot_dir), robot_id)
readme_content += "\n```\n\n"
readme_content += "## 数据集详情\n\n"
dataset_dir = robot_dir / "dataset"
if not dataset_dir.exists():
readme_content += "No dataset files available.\n"
else:
# 获取所有文档目录
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = dataset_dir / item
if item_path.is_dir():
doc_dirs.append(item)
except Exception as e:
logger.error(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
# 按源项目分组显示文档
for project_id, folders in project_groups.items():
for folder in folders:
folder_name = folder["final_folder_name"]
doc_path = dataset_dir / folder_name
readme_content += f"#### {folder_name}\n\n"
readme_content += f"**Files:**\n"
# 检查文件存在性
document_file = doc_path / "document.txt"
pagination_file = doc_path / "pagination.txt"
embeddings_file = doc_path / "embedding.pkl"
readme_content += f"- `{folder_name}/document.txt`"
if document_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/pagination.txt`"
if pagination_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/embedding.pkl`"
if embeddings_file.exists():
readme_content += ""
readme_content += "\n\n"
# 添加文档预览
if document_file.exists():
readme_content += f"**内容预览 (前10行):**\n\n```\n"
preview = get_document_preview(str(document_file), 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**内容预览:** 不可用\n\n"
# 显示重命名信息
original_name = folder["original_folder_name"]
if original_name != folder_name:
readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n"
readme_content += "---\n\n"
# 写入README文件
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info(f"Generated README: {readme_path}")
return str(readme_path)
def _get_robot_dir(project_path: Path, bot_id: str) -> Path:
return project_path / "robot" / bot_id
def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str, project_path: Path) -> bool:
"""
检查是否需要重建机器人项目
1. 检查机器人项目是否存在
2. 检查是否有新增的dataset_id
3. 检查processing_log.json文件是否更新
Args:
dataset_ids: 源项目ID列表
bot_id: 机器人ID
project_path: 项目路径
Returns:
bool: 是否需要重建
"""
robot_dir = _get_robot_dir(project_path, bot_id)
# 如果机器人项目不存在,需要创建
if not robot_dir.exists():
logger.info(f"Robot project does not exist, need to create: {bot_id}")
return True
# 检查机器人项目的配置信息
config_file = robot_dir / "robot_config.json"
if not config_file.exists():
logger.info(f"Robot config file not found, need to rebuild: {bot_id}")
return True
# 读取配置信息
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
cached_dataset_ids = set(config.get("dataset_ids", []))
except Exception as e:
logger.error(f"Error reading robot config: {e}, need to rebuild")
return True
# 检查dataset_ids是否有变化
current_dataset_ids = set(dataset_ids)
# 如果有新增的dataset_id
new_ids = current_dataset_ids - cached_dataset_ids
if new_ids:
logger.info(f"Found new dataset_ids: {new_ids}, need to rebuild")
return True
# 如果有删除的dataset_id
removed_ids = cached_dataset_ids - current_dataset_ids
if removed_ids:
logger.info(f"Removed dataset_ids: {removed_ids}, need to rebuild")
return True
# 获取机器人项目的最后修改时间
robot_mod_time = robot_dir.stat().st_mtime
# 检查每个源项目的processing_log.json文件
for source_project_id in dataset_ids:
log_file = project_path / "data" / source_project_id / "processing_log.json"
if not log_file.exists():
logger.info(f"Processing log file not found for project {source_project_id}, will rebuild")
return True
log_mod_time = log_file.stat().st_mtime
# 如果任何一个processing_log.json文件比机器人项目新需要重建
if log_mod_time > robot_mod_time:
logger.info(f"Processing log updated for project {source_project_id}, need to rebuild")
return True
logger.info(f"Robot project {bot_id} is up to date, no rebuild needed")
return False
def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None) -> str:
"""
创建机器人项目合并多个源项目的dataset文件夹
Args:
dataset_ids: 源项目ID列表
bot_id: 机器人ID
force_rebuild: 是否强制重建
skills: 技能文件名列表(如 ["rag-retrieve", "device_controller.zip"]
Returns:
str: 机器人项目目录路径
"""
logger.info(f"Creating robot project: {bot_id} from sources: {dataset_ids}, skills: {skills}")
# 检查是否需要重建
if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id, project_path):
robot_dir = project_path / "robot" / bot_id
logger.info(f"Using existing robot project: {robot_dir}")
# 即使使用现有项目,也要处理 skills如果提供了
if skills:
_extract_skills_to_robot(bot_id, skills, project_path)
return str(robot_dir)
# 创建机器人目录结构
robot_dir = _get_robot_dir(project_path, bot_id)
dataset_dir = robot_dir / "dataset"
# 清理已存在的目录(如果需要)
if robot_dir.exists():
logger.info(f"Robot directory already exists, cleaning up: {robot_dir}")
shutil.rmtree(robot_dir)
robot_dir.mkdir(parents=True, exist_ok=True)
dataset_dir.mkdir(parents=True, exist_ok=True)
# 创建 scripts 和 download 目录
scripts_dir = robot_dir / "scripts"
download_dir = robot_dir / "download"
scripts_dir.mkdir(parents=True, exist_ok=True)
download_dir.mkdir(parents=True, exist_ok=True)
copy_results = []
# 遍历每个源项目
for source_project_id in dataset_ids:
logger.info(f"\nProcessing source project: {source_project_id}")
source_dataset_dir = project_path / "data" / source_project_id / "dataset"
if not source_dataset_dir.exists():
logger.warning(f" Warning: Dataset directory not found for project {source_project_id}")
continue
# 获取所有子文件夹
folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()]
if not folders:
logger.warning(f" Warning: No folders found in dataset directory for project {source_project_id}")
continue
# 复制每个文件夹
for folder in folders:
result = copy_dataset_folder(source_project_id, dataset_dir, folder.name, project_path)
copy_results.append(result)
# 保存配置信息
config_file = robot_dir / "robot_config.json"
config_data = {
"dataset_ids": dataset_ids,
"bot_id": bot_id,
"created_at": datetime.now().isoformat(),
"total_folders": len(copy_results),
"successful_copies": sum(1 for r in copy_results if r["success"])
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
# 生成README
readme_path = generate_robot_readme(bot_id, dataset_ids, copy_results, project_path)
# 统计信息
successful_copies = sum(1 for r in copy_results if r["success"])
logger.info(f"\nRobot project creation completed:")
logger.info(f" Robot directory: {robot_dir}")
logger.info(f" Total folders processed: {len(copy_results)}")
logger.info(f" Successful copies: {successful_copies}")
logger.info(f" Config saved: {config_file}")
logger.info(f" README generated: {readme_path}")
# 处理 skills 解压
if skills:
_extract_skills_to_robot(bot_id, skills, project_path)
return str(robot_dir)
if __name__ == "__main__":
# 测试代码
test_dataset_ids = ["test-project-1", "test-project-2"]
test_bot_id = "test-robot-001"
robot_dir = create_robot_project(test_dataset_ids, test_bot_id)
logger.info(f"Created robot project at: {robot_dir}")
def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path) -> None:
"""
复制 skills 到 robot 项目的 skills 文件夹
- 如果是完整路径(如 "projects/uploads/xxx/skills/rag-retrieve_2.zip"),直接使用该路径
- 如果是简单名称(如 "rag-retrieve"),从以下目录按优先级顺序查找:
1. projects/uploads/{bot_id}/skills/
2. skills/
搜索目录优先级:先搜索 projects/uploads/{bot_id}/skills/,再搜索 skills/
Args:
bot_id: 机器人 ID
skills: 技能文件名列表(如 ["rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"]
project_path: 项目路径
"""
import zipfile
# skills 源目录(按优先级顺序)
skills_source_dirs = [
project_path / "uploads" / bot_id / "skills",
Path("skills"),
]
skills_target_dir = project_path / "robot" / bot_id / "skills"
# 先清空 skills_target_dir然后重新复制
if skills_target_dir.exists():
logger.info(f" Removing existing skills directory: {skills_target_dir}")
shutil.rmtree(skills_target_dir)
skills_target_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Copying skills to {skills_target_dir}")
for skill in skills:
source_dir = None
# 简单名称:按优先级顺序在多个目录中查找
for base_dir in skills_source_dirs:
candidate_dir = base_dir / skill
if candidate_dir.exists():
source_dir = candidate_dir
logger.info(f" Found skill '{skill}' in {base_dir}")
break
if source_dir is None:
logger.warning(f" Skill directory '{skill}' not found in any source directory: {[str(d) for d in skills_source_dirs]}")
continue
if not source_dir.exists():
logger.warning(f" Skill directory not found: {source_dir}")
continue
target_dir = skills_target_dir / os.path.basename(skill)
try:
shutil.copytree(source_dir, target_dir)
logger.info(f" Copied: {source_dir} -> {target_dir}")
except Exception as e:
logger.error(f" Failed to copy {source_dir}: {e}")