- Remove complex symlink resolution in _get_robot_dir - Simplify skills source directory path calculation - Remove project_path parameter from create_project_directory call 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
543 lines
19 KiB
Python
543 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
多项目管理器:处理多个知识库项目的合并
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
import json
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import List, Dict, Optional
|
||
from datetime import datetime
|
||
|
||
# Configure logger
|
||
logger = logging.getLogger('app')
|
||
|
||
from utils.file_utils import get_document_preview
|
||
from utils import settings
|
||
|
||
|
||
def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str:
|
||
"""
|
||
生成机器人项目的目录树结构
|
||
|
||
Args:
|
||
robot_dir: 机器人项目目录路径
|
||
robot_id: 机器人ID
|
||
max_depth: 最大深度
|
||
|
||
Returns:
|
||
str: 目录树字符串
|
||
"""
|
||
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
|
||
if depth > max_depth:
|
||
return []
|
||
|
||
lines = []
|
||
try:
|
||
entries = sorted(os.listdir(path))
|
||
# 分离目录和文件
|
||
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
|
||
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
|
||
|
||
entries = dirs + files
|
||
|
||
for i, entry in enumerate(entries):
|
||
entry_path = os.path.join(path, entry)
|
||
is_dir = os.path.isdir(entry_path)
|
||
is_last_entry = i == len(entries) - 1
|
||
|
||
# 选择适当的树形符号
|
||
if is_last_entry:
|
||
connector = "└── "
|
||
new_prefix = prefix + " "
|
||
else:
|
||
connector = "├── "
|
||
new_prefix = prefix + "│ "
|
||
|
||
# 添加条目行
|
||
line = prefix + connector + entry
|
||
if is_dir:
|
||
line += "/"
|
||
lines.append(line)
|
||
|
||
# 递归添加子目录
|
||
if is_dir and depth < max_depth:
|
||
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
|
||
lines.extend(sub_lines)
|
||
|
||
except PermissionError:
|
||
lines.append(prefix + "└── [Permission Denied]")
|
||
except Exception as e:
|
||
lines.append(prefix + "└── [Error: " + str(e) + "]")
|
||
|
||
return lines
|
||
|
||
# 从dataset目录开始构建树
|
||
dataset_dir = os.path.join(robot_dir, "dataset")
|
||
tree_lines = []
|
||
|
||
if not os.path.exists(dataset_dir):
|
||
return "└── [No dataset directory found]"
|
||
|
||
try:
|
||
entries = sorted(os.listdir(dataset_dir))
|
||
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
|
||
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
|
||
|
||
entries = dirs + files
|
||
|
||
if not entries:
|
||
tree_lines.append("└── [Empty dataset directory]")
|
||
else:
|
||
for i, entry in enumerate(entries):
|
||
entry_path = os.path.join(dataset_dir, entry)
|
||
is_dir = os.path.isdir(entry_path)
|
||
is_last_entry = i == len(entries) - 1
|
||
|
||
if is_last_entry:
|
||
connector = "└── "
|
||
prefix = " "
|
||
else:
|
||
connector = "├── "
|
||
prefix = "│ "
|
||
|
||
line = connector + entry
|
||
if is_dir:
|
||
line += "/"
|
||
tree_lines.append(line)
|
||
|
||
# 递归添加子目录
|
||
if is_dir:
|
||
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
|
||
tree_lines.extend(sub_lines)
|
||
|
||
except Exception as e:
|
||
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
|
||
|
||
return "\n".join(tree_lines)
|
||
|
||
|
||
def get_unique_folder_name(target_dir: Path, original_name: str) -> str:
|
||
"""
|
||
获取唯一的文件夹名称,如果存在重名则添加数字后缀
|
||
|
||
Args:
|
||
target_dir: 目标目录
|
||
original_name: 原始文件夹名称
|
||
|
||
Returns:
|
||
str: 唯一的文件夹名称
|
||
"""
|
||
if not (target_dir / original_name).exists():
|
||
return original_name
|
||
|
||
# 存在重名,添加数字后缀
|
||
counter = 1
|
||
while True:
|
||
new_name = f"{original_name}_{counter}"
|
||
if not (target_dir / new_name).exists():
|
||
return new_name
|
||
counter += 1
|
||
|
||
|
||
def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict:
|
||
"""
|
||
复制单个项目的dataset文件夹到目标目录
|
||
|
||
Args:
|
||
source_project_id: 源项目ID
|
||
target_dataset_dir: 目标dataset目录
|
||
folder_name: 要复制的文件夹名称
|
||
project_path: 项目路径
|
||
|
||
Returns:
|
||
Dict: 复制结果
|
||
"""
|
||
result = {
|
||
"success": False,
|
||
"source_path": "",
|
||
"target_path": "",
|
||
"original_folder_name": folder_name,
|
||
"final_folder_name": folder_name,
|
||
"error": None
|
||
}
|
||
|
||
try:
|
||
source_folder = project_path / "data" / source_project_id / "dataset" / folder_name
|
||
result["source_path"] = str(source_folder)
|
||
|
||
if not source_folder.exists():
|
||
result["error"] = f"Source folder does not exist: {source_folder}"
|
||
return result
|
||
|
||
# 处理重名冲突
|
||
unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name)
|
||
result["final_folder_name"] = unique_folder_name
|
||
|
||
target_folder = target_dataset_dir / unique_folder_name
|
||
result["target_path"] = str(target_folder)
|
||
|
||
# 复制文件夹
|
||
shutil.copytree(source_folder, target_folder)
|
||
result["success"] = True
|
||
|
||
logger.info(f" Copied: {source_folder} -> {target_folder}")
|
||
|
||
except Exception as e:
|
||
result["error"] = str(e)
|
||
logger.error(f" Error copying {folder_name}: {str(e)}")
|
||
|
||
return result
|
||
|
||
|
||
def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str:
|
||
"""
|
||
生成机器人项目的README.md文件
|
||
|
||
Args:
|
||
robot_id: 机器人ID
|
||
dataset_ids: 源项目ID列表
|
||
copy_results: 复制结果列表
|
||
|
||
Returns:
|
||
str: README.md文件路径
|
||
"""
|
||
readme_path = project_path / "robot" / robot_id / "README.md"
|
||
readme_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
robot_dir = project_path / "robot" / robot_id
|
||
|
||
# 统计信息
|
||
total_folders = len(copy_results)
|
||
successful_copies = sum(1 for r in copy_results if r["success"])
|
||
failed_copies = total_folders - successful_copies
|
||
|
||
# 按源项目分组
|
||
project_groups = {}
|
||
for result in copy_results:
|
||
if result["success"]:
|
||
source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/...
|
||
if source_project not in project_groups:
|
||
project_groups[source_project] = []
|
||
project_groups[source_project].append(result)
|
||
|
||
readme_content = "\n## 目录结构\n\n"
|
||
# 生成实际目录树
|
||
readme_content += "```\n"
|
||
readme_content += generate_robot_directory_tree(str(robot_dir), robot_id)
|
||
readme_content += "\n```\n\n"
|
||
|
||
readme_content += "## 数据集详情\n\n"
|
||
|
||
dataset_dir = robot_dir / "dataset"
|
||
if not dataset_dir.exists():
|
||
readme_content += "No dataset files available.\n"
|
||
else:
|
||
# 获取所有文档目录
|
||
doc_dirs = []
|
||
try:
|
||
for item in sorted(os.listdir(dataset_dir)):
|
||
item_path = dataset_dir / item
|
||
if item_path.is_dir():
|
||
doc_dirs.append(item)
|
||
except Exception as e:
|
||
logger.error(f"Error listing dataset directories: {str(e)}")
|
||
|
||
if not doc_dirs:
|
||
readme_content += "No document directories found.\n"
|
||
else:
|
||
# 按源项目分组显示文档
|
||
for project_id, folders in project_groups.items():
|
||
for folder in folders:
|
||
folder_name = folder["final_folder_name"]
|
||
doc_path = dataset_dir / folder_name
|
||
|
||
readme_content += f"#### {folder_name}\n\n"
|
||
readme_content += f"**Files:**\n"
|
||
|
||
# 检查文件存在性
|
||
document_file = doc_path / "document.txt"
|
||
pagination_file = doc_path / "pagination.txt"
|
||
embeddings_file = doc_path / "embedding.pkl"
|
||
|
||
readme_content += f"- `{folder_name}/document.txt`"
|
||
if document_file.exists():
|
||
readme_content += " ✓"
|
||
readme_content += "\n"
|
||
|
||
readme_content += f"- `{folder_name}/pagination.txt`"
|
||
if pagination_file.exists():
|
||
readme_content += " ✓"
|
||
readme_content += "\n"
|
||
|
||
readme_content += f"- `{folder_name}/embedding.pkl`"
|
||
if embeddings_file.exists():
|
||
readme_content += " ✓"
|
||
readme_content += "\n\n"
|
||
|
||
# 添加文档预览
|
||
if document_file.exists():
|
||
readme_content += f"**内容预览 (前10行):**\n\n```\n"
|
||
preview = get_document_preview(str(document_file), 10)
|
||
readme_content += preview
|
||
readme_content += "\n```\n\n"
|
||
else:
|
||
readme_content += f"**内容预览:** 不可用\n\n"
|
||
|
||
# 显示重命名信息
|
||
original_name = folder["original_folder_name"]
|
||
if original_name != folder_name:
|
||
readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n"
|
||
|
||
readme_content += "---\n\n"
|
||
|
||
|
||
# 写入README文件
|
||
with open(readme_path, 'w', encoding='utf-8') as f:
|
||
f.write(readme_content)
|
||
|
||
logger.info(f"Generated README: {readme_path}")
|
||
return str(readme_path)
|
||
|
||
|
||
def _get_robot_dir(project_path: Path, bot_id: str) -> Path:
|
||
return project_path / "robot" / bot_id
|
||
|
||
|
||
def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str, project_path: Path) -> bool:
|
||
"""
|
||
检查是否需要重建机器人项目
|
||
1. 检查机器人项目是否存在
|
||
2. 检查是否有新增的dataset_id
|
||
3. 检查processing_log.json文件是否更新
|
||
|
||
Args:
|
||
dataset_ids: 源项目ID列表
|
||
bot_id: 机器人ID
|
||
project_path: 项目路径
|
||
|
||
Returns:
|
||
bool: 是否需要重建
|
||
"""
|
||
robot_dir = _get_robot_dir(project_path, bot_id)
|
||
|
||
# 如果机器人项目不存在,需要创建
|
||
if not robot_dir.exists():
|
||
logger.info(f"Robot project does not exist, need to create: {bot_id}")
|
||
return True
|
||
|
||
# 检查机器人项目的配置信息
|
||
config_file = robot_dir / "robot_config.json"
|
||
if not config_file.exists():
|
||
logger.info(f"Robot config file not found, need to rebuild: {bot_id}")
|
||
return True
|
||
|
||
# 读取配置信息
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
cached_dataset_ids = set(config.get("dataset_ids", []))
|
||
except Exception as e:
|
||
logger.error(f"Error reading robot config: {e}, need to rebuild")
|
||
return True
|
||
|
||
# 检查dataset_ids是否有变化
|
||
current_dataset_ids = set(dataset_ids)
|
||
|
||
# 如果有新增的dataset_id
|
||
new_ids = current_dataset_ids - cached_dataset_ids
|
||
if new_ids:
|
||
logger.info(f"Found new dataset_ids: {new_ids}, need to rebuild")
|
||
return True
|
||
|
||
# 如果有删除的dataset_id
|
||
removed_ids = cached_dataset_ids - current_dataset_ids
|
||
if removed_ids:
|
||
logger.info(f"Removed dataset_ids: {removed_ids}, need to rebuild")
|
||
return True
|
||
|
||
# 获取机器人项目的最后修改时间
|
||
robot_mod_time = robot_dir.stat().st_mtime
|
||
|
||
# 检查每个源项目的processing_log.json文件
|
||
for source_project_id in dataset_ids:
|
||
log_file = project_path / "data" / source_project_id / "processing_log.json"
|
||
|
||
if not log_file.exists():
|
||
logger.info(f"Processing log file not found for project {source_project_id}, will rebuild")
|
||
return True
|
||
|
||
log_mod_time = log_file.stat().st_mtime
|
||
|
||
# 如果任何一个processing_log.json文件比机器人项目新,需要重建
|
||
if log_mod_time > robot_mod_time:
|
||
logger.info(f"Processing log updated for project {source_project_id}, need to rebuild")
|
||
return True
|
||
|
||
logger.info(f"Robot project {bot_id} is up to date, no rebuild needed")
|
||
return False
|
||
|
||
|
||
def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None, robot_type: str = "catalog_agent") -> str:
|
||
"""
|
||
创建机器人项目,合并多个源项目的dataset文件夹
|
||
|
||
Args:
|
||
dataset_ids: 源项目ID列表
|
||
bot_id: 机器人ID
|
||
force_rebuild: 是否强制重建
|
||
skills: 技能文件名列表(如 ["rag-retrieve", "device_controller.zip"])
|
||
robot_type: 机器人类型 (catalog_agent, deep_agent 等)
|
||
|
||
Returns:
|
||
str: 机器人项目目录路径
|
||
"""
|
||
# 如果 skills 为空或 None,且 robot_type 是 catalog_agent 或 deep_agent,默认加载 rag-retrieve
|
||
if not skills and robot_type in ("catalog_agent", "deep_agent"):
|
||
skills = ["rag-retrieve"]
|
||
logger.info(f"No skills provided, using default skill 'rag-retrieve' for {robot_type}")
|
||
|
||
logger.info(f"Creating robot project: {bot_id} from sources: {dataset_ids}, skills: {skills}")
|
||
|
||
# 检查是否需要重建
|
||
if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id, project_path):
|
||
robot_dir = project_path / "robot" / bot_id
|
||
logger.info(f"Using existing robot project: {robot_dir}")
|
||
# 即使使用现有项目,也要处理 skills(如果提供了)
|
||
if skills:
|
||
_extract_skills_to_robot(robot_dir, skills, project_path)
|
||
return str(robot_dir)
|
||
|
||
# 创建机器人目录结构
|
||
robot_dir = _get_robot_dir(project_path, bot_id)
|
||
dataset_dir = robot_dir / "dataset"
|
||
|
||
# 清理已存在的目录(如果需要)
|
||
if robot_dir.exists():
|
||
logger.info(f"Robot directory already exists, cleaning up: {robot_dir}")
|
||
shutil.rmtree(robot_dir)
|
||
|
||
robot_dir.mkdir(parents=True, exist_ok=True)
|
||
dataset_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
copy_results = []
|
||
|
||
# 遍历每个源项目
|
||
for source_project_id in dataset_ids:
|
||
logger.info(f"\nProcessing source project: {source_project_id}")
|
||
|
||
source_dataset_dir = project_path / "data" / source_project_id / "dataset"
|
||
|
||
if not source_dataset_dir.exists():
|
||
logger.warning(f" Warning: Dataset directory not found for project {source_project_id}")
|
||
continue
|
||
|
||
# 获取所有子文件夹
|
||
folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()]
|
||
|
||
if not folders:
|
||
logger.warning(f" Warning: No folders found in dataset directory for project {source_project_id}")
|
||
continue
|
||
|
||
# 复制每个文件夹
|
||
for folder in folders:
|
||
result = copy_dataset_folder(source_project_id, dataset_dir, folder.name, project_path)
|
||
copy_results.append(result)
|
||
|
||
# 保存配置信息
|
||
config_file = robot_dir / "robot_config.json"
|
||
config_data = {
|
||
"dataset_ids": dataset_ids,
|
||
"bot_id": bot_id,
|
||
"env": {
|
||
"backend_host": settings.BACKEND_HOST,
|
||
"masterkey": settings.MASTERKEY
|
||
},
|
||
"created_at": datetime.now().isoformat(),
|
||
"total_folders": len(copy_results),
|
||
"successful_copies": sum(1 for r in copy_results if r["success"])
|
||
}
|
||
|
||
with open(config_file, 'w', encoding='utf-8') as f:
|
||
json.dump(config_data, f, ensure_ascii=False, indent=2)
|
||
|
||
# 生成README
|
||
readme_path = generate_robot_readme(bot_id, dataset_ids, copy_results, project_path)
|
||
|
||
# 统计信息
|
||
successful_copies = sum(1 for r in copy_results if r["success"])
|
||
logger.info(f"\nRobot project creation completed:")
|
||
logger.info(f" Robot directory: {robot_dir}")
|
||
logger.info(f" Total folders processed: {len(copy_results)}")
|
||
logger.info(f" Successful copies: {successful_copies}")
|
||
logger.info(f" Config saved: {config_file}")
|
||
logger.info(f" README generated: {readme_path}")
|
||
|
||
# 处理 skills 解压
|
||
if skills:
|
||
_extract_skills_to_robot(robot_dir, skills, project_path)
|
||
|
||
return str(robot_dir)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
test_dataset_ids = ["test-project-1", "test-project-2"]
|
||
test_bot_id = "test-robot-001"
|
||
|
||
robot_dir = create_robot_project(test_dataset_ids, test_bot_id, robot_type="catalog_agent")
|
||
logger.info(f"Created robot project at: {robot_dir}")
|
||
|
||
|
||
def _extract_skills_to_robot(robot_dir: Path, skills: List[str], project_path: Path) -> None:
|
||
"""
|
||
解压 skills 到 robot 项目的 skills 文件夹
|
||
|
||
Args:
|
||
robot_dir: 机器人项目目录
|
||
skills: 技能文件名列表(如 ["rag-retrieve", "device_controller.zip"])
|
||
project_path: 项目路径
|
||
"""
|
||
import zipfile
|
||
|
||
# skills 源目录在 projects/skills,需要通过解析软链接获取正确路径
|
||
# project_path 可能是 ~/.deepagents (软链接 -> projects/robot)
|
||
# 所以 skills 源目录是 project_path.resolve().parent / "skills"
|
||
skills_source_dir = project_path / "skills"
|
||
skills_target_dir = robot_dir / "skills"
|
||
|
||
# 先清空 skills_target_dir,然后重新解压
|
||
if skills_target_dir.exists():
|
||
logger.info(f" Removing existing skills directory: {skills_target_dir}")
|
||
shutil.rmtree(skills_target_dir)
|
||
|
||
skills_target_dir.mkdir(parents=True, exist_ok=True)
|
||
logger.info(f"Extracting skills to {skills_target_dir}")
|
||
|
||
for skill in skills:
|
||
# 规范化文件名(确保有 .zip 后缀)
|
||
if not skill.endswith(".zip"):
|
||
skill_file = skill + ".zip"
|
||
else:
|
||
skill_file = skill
|
||
|
||
skill_source_path = skills_source_dir / skill_file
|
||
|
||
if not skill_source_path.exists():
|
||
logger.warning(f" Skill file not found: {skill_source_path}")
|
||
continue
|
||
|
||
# 获取解压后的文件夹名称(去掉 .zip 后缀)
|
||
folder_name = skill_file.replace(".zip", "")
|
||
extract_target = skills_target_dir / folder_name
|
||
|
||
# 解压文件
|
||
try:
|
||
with zipfile.ZipFile(skill_source_path, 'r') as zip_ref:
|
||
zip_ref.extractall(extract_target)
|
||
logger.info(f" Extracted: {skill_file} -> {extract_target}")
|
||
except Exception as e:
|
||
logger.error(f" Failed to extract {skill_file}: {e}")
|