qwen_agent/utils/multi_project_manager.py
2026-03-12 19:54:09 +08:00

429 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
多项目管理器:处理多个知识库项目的合并
"""
import os
import shutil
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
# Configure logger
logger = logging.getLogger('app')
from utils.file_utils import get_document_preview
from utils import settings
def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str:
"""
生成机器人项目的目录树结构
Args:
robot_dir: 机器人项目目录路径
robot_id: 机器人ID
max_depth: 最大深度
Returns:
str: 目录树字符串
"""
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
if depth > max_depth:
return []
lines = []
try:
entries = sorted(os.listdir(path))
# 分离目录和文件
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
entries = dirs + files
for i, entry in enumerate(entries):
entry_path = os.path.join(path, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
# 选择适当的树形符号
if is_last_entry:
connector = "└── "
new_prefix = prefix + " "
else:
connector = "├── "
new_prefix = prefix + ""
# 添加条目行
line = prefix + connector + entry
if is_dir:
line += "/"
lines.append(line)
# 递归添加子目录
if is_dir and depth < max_depth:
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
lines.extend(sub_lines)
except PermissionError:
lines.append(prefix + "└── [Permission Denied]")
except Exception as e:
lines.append(prefix + "└── [Error: " + str(e) + "]")
return lines
# 从dataset目录开始构建树
dataset_dir = os.path.join(robot_dir, "dataset")
tree_lines = []
if not os.path.exists(dataset_dir):
return "└── [No dataset directory found]"
try:
entries = sorted(os.listdir(dataset_dir))
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
entries = dirs + files
if not entries:
tree_lines.append("└── [Empty dataset directory]")
else:
for i, entry in enumerate(entries):
entry_path = os.path.join(dataset_dir, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
if is_last_entry:
connector = "└── "
prefix = " "
else:
connector = "├── "
prefix = ""
line = connector + entry
if is_dir:
line += "/"
tree_lines.append(line)
# 递归添加子目录
if is_dir:
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
tree_lines.extend(sub_lines)
except Exception as e:
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
return "\n".join(tree_lines)
def get_unique_folder_name(target_dir: Path, original_name: str) -> str:
"""
获取唯一的文件夹名称,如果存在重名则添加数字后缀
Args:
target_dir: 目标目录
original_name: 原始文件夹名称
Returns:
str: 唯一的文件夹名称
"""
if not (target_dir / original_name).exists():
return original_name
# 存在重名,添加数字后缀
counter = 1
while True:
new_name = f"{original_name}_{counter}"
if not (target_dir / new_name).exists():
return new_name
counter += 1
def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict:
"""
复制单个项目的dataset文件夹到目标目录
Args:
source_project_id: 源项目ID
target_dataset_dir: 目标dataset目录
folder_name: 要复制的文件夹名称
project_path: 项目路径
Returns:
Dict: 复制结果
"""
result = {
"success": False,
"source_path": "",
"target_path": "",
"original_folder_name": folder_name,
"final_folder_name": folder_name,
"error": None
}
try:
source_folder = project_path / "data" / source_project_id / "dataset" / folder_name
result["source_path"] = str(source_folder)
if not source_folder.exists():
result["error"] = f"Source folder does not exist: {source_folder}"
return result
# 处理重名冲突
unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name)
result["final_folder_name"] = unique_folder_name
target_folder = target_dataset_dir / unique_folder_name
result["target_path"] = str(target_folder)
# 复制文件夹
shutil.copytree(source_folder, target_folder)
result["success"] = True
logger.info(f" Copied: {source_folder} -> {target_folder}")
except Exception as e:
result["error"] = str(e)
logger.error(f" Error copying {folder_name}: {str(e)}")
return result
def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str:
"""
生成机器人项目的README.md文件
Args:
robot_id: 机器人ID
dataset_ids: 源项目ID列表
copy_results: 复制结果列表
Returns:
str: README.md文件路径
"""
readme_path = project_path / "robot" / robot_id / "README.md"
readme_path.parent.mkdir(parents=True, exist_ok=True)
robot_dir = project_path / "robot" / robot_id
# 统计信息
total_folders = len(copy_results)
successful_copies = sum(1 for r in copy_results if r["success"])
failed_copies = total_folders - successful_copies
# 按源项目分组
project_groups = {}
for result in copy_results:
if result["success"]:
source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/...
if source_project not in project_groups:
project_groups[source_project] = []
project_groups[source_project].append(result)
readme_content = "\n## 目录结构\n\n"
# 生成实际目录树
readme_content += "```\n"
readme_content += generate_robot_directory_tree(str(robot_dir), robot_id)
readme_content += "\n```\n\n"
readme_content += "## 数据集详情\n\n"
dataset_dir = robot_dir / "dataset"
if not dataset_dir.exists():
readme_content += "No dataset files available.\n"
else:
# 获取所有文档目录
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = dataset_dir / item
if item_path.is_dir():
doc_dirs.append(item)
except Exception as e:
logger.error(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
# 按源项目分组显示文档
for project_id, folders in project_groups.items():
for folder in folders:
folder_name = folder["final_folder_name"]
doc_path = dataset_dir / folder_name
readme_content += f"#### {folder_name}\n\n"
readme_content += f"**Files:**\n"
# 检查文件存在性
document_file = doc_path / "document.txt"
pagination_file = doc_path / "pagination.txt"
embeddings_file = doc_path / "embedding.pkl"
readme_content += f"- `{folder_name}/document.txt`"
if document_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/pagination.txt`"
if pagination_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/embedding.pkl`"
if embeddings_file.exists():
readme_content += ""
readme_content += "\n\n"
# 添加文档预览
if document_file.exists():
readme_content += f"**内容预览 (前10行):**\n\n```\n"
preview = get_document_preview(str(document_file), 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**内容预览:** 不可用\n\n"
# 显示重命名信息
original_name = folder["original_folder_name"]
if original_name != folder_name:
readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n"
readme_content += "---\n\n"
# 写入README文件
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info(f"Generated README: {readme_path}")
return str(readme_path)
def _get_robot_dir(project_path: Path, bot_id: str) -> Path:
return project_path / "robot" / bot_id
def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None) -> str:
"""
确保机器人项目目录存在(不进行重建)
Args:
dataset_ids: 源项目ID列表已废弃保留兼容性
bot_id: 机器人ID
force_rebuild: 忽略此参数(保留兼容性)
skills: 技能文件名列表
Returns:
str: 机器人项目目录路径
"""
logger.info(f"Ensuring robot project exists: {bot_id}, skills: {skills}")
# 创建机器人目录结构(如果不存在)
robot_dir = _get_robot_dir(project_path, bot_id)
dataset_dir = robot_dir / "dataset"
scripts_dir = robot_dir / "scripts"
download_dir = robot_dir / "download"
# 创建目录(不删除已存在的内容)
robot_dir.mkdir(parents=True, exist_ok=True)
dataset_dir.mkdir(parents=True, exist_ok=True)
scripts_dir.mkdir(parents=True, exist_ok=True)
download_dir.mkdir(parents=True, exist_ok=True)
# 清空 dataset_dir 下的所有软链接
for item in dataset_dir.iterdir():
if item.is_symlink():
item.unlink()
logger.info(f"Removed from dataset_dir: {item}")
# 为 dataset_ids 创建软链接
docs_datasets_dir = project_path / "docs" / "datasets"
for dataset_id in dataset_ids:
source = docs_datasets_dir / dataset_id
target = dataset_dir / dataset_id
if source.exists():
os.symlink(source.resolve(), target)
logger.info(f"Created symlink: {target} -> {source.resolve()}")
else:
logger.warning(f"Dataset source not found, skipping symlink: {source}")
# 处理 skills每次都更新
if skills:
_extract_skills_to_robot(bot_id, skills, project_path)
logger.info(f"Robot project ready: {robot_dir}")
return str(robot_dir)
if __name__ == "__main__":
# 测试代码
test_dataset_ids = ["test-project-1", "test-project-2"]
test_bot_id = "test-robot-001"
robot_dir = create_robot_project(test_dataset_ids, test_bot_id)
logger.info(f"Created robot project at: {robot_dir}")
def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path) -> None:
"""
复制 skills 到 robot 项目的 skills 文件夹
- 如果是完整路径(如 "projects/uploads/xxx/skills/rag-retrieve_2.zip"),直接使用该路径
- 如果是简单名称(如 "rag-retrieve"),从以下目录按优先级顺序查找:
1. projects/uploads/{bot_id}/skills/
2. skills/
搜索目录优先级:先搜索 projects/uploads/{bot_id}/skills/,再搜索 skills/
Args:
bot_id: 机器人 ID
skills: 技能文件名列表(如 ["rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"]
project_path: 项目路径
"""
import zipfile
# skills 源目录(按优先级顺序)
skills_source_dirs = [
project_path / "uploads" / bot_id / "skills",
Path("skills"),
]
skills_target_dir = project_path / "robot" / bot_id / "skills"
# 先清空 skills_target_dir然后重新复制
if skills_target_dir.exists():
logger.info(f" Removing existing skills directory: {skills_target_dir}")
shutil.rmtree(skills_target_dir)
skills_target_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Copying skills to {skills_target_dir}")
for skill in skills:
source_dir = None
# 简单名称:按优先级顺序在多个目录中查找
for base_dir in skills_source_dirs:
candidate_dir = base_dir / skill
if candidate_dir.exists():
source_dir = candidate_dir
logger.info(f" Found skill '{skill}' in {base_dir}")
break
if source_dir is None:
logger.warning(f" Skill directory '{skill}' not found in any source directory: {[str(d) for d in skills_source_dirs]}")
continue
if not source_dir.exists():
logger.warning(f" Skill directory not found: {source_dir}")
continue
target_dir = skills_target_dir / os.path.basename(skill)
try:
shutil.copytree(source_dir, target_dir)
logger.info(f" Copied: {source_dir} -> {target_dir}")
except Exception as e:
logger.error(f" Failed to copy {source_dir}: {e}")