#!/usr/bin/env python3 """ 多项目管理器:处理多个知识库项目的合并 """ import os import shutil import json import logging from pathlib import Path from typing import List, Dict, Optional from datetime import datetime # Configure logger logger = logging.getLogger('app') from utils.file_utils import get_document_preview from utils import settings def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str: """ 生成机器人项目的目录树结构 Args: robot_dir: 机器人项目目录路径 robot_id: 机器人ID max_depth: 最大深度 Returns: str: 目录树字符串 """ def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]: if depth > max_depth: return [] lines = [] try: entries = sorted(os.listdir(path)) # 分离目录和文件 dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')] entries = dirs + files for i, entry in enumerate(entries): entry_path = os.path.join(path, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 # 选择适当的树形符号 if is_last_entry: connector = "└── " new_prefix = prefix + " " else: connector = "├── " new_prefix = prefix + "│ " # 添加条目行 line = prefix + connector + entry if is_dir: line += "/" lines.append(line) # 递归添加子目录 if is_dir and depth < max_depth: sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1) lines.extend(sub_lines) except PermissionError: lines.append(prefix + "└── [Permission Denied]") except Exception as e: lines.append(prefix + "└── [Error: " + str(e) + "]") return lines # 从dataset目录开始构建树 dataset_dir = os.path.join(robot_dir, "dataset") tree_lines = [] if not os.path.exists(dataset_dir): return "└── [No dataset directory found]" try: entries = sorted(os.listdir(dataset_dir)) dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')] entries = dirs + files if not entries: tree_lines.append("└── [Empty dataset directory]") else: for i, entry in enumerate(entries): entry_path = os.path.join(dataset_dir, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 if is_last_entry: connector = "└── " prefix = " " else: connector = "├── " prefix = "│ " line = connector + entry if is_dir: line += "/" tree_lines.append(line) # 递归添加子目录 if is_dir: sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1) tree_lines.extend(sub_lines) except Exception as e: tree_lines.append(f"└── [Error generating tree: {str(e)}]") return "\n".join(tree_lines) def get_unique_folder_name(target_dir: Path, original_name: str) -> str: """ 获取唯一的文件夹名称,如果存在重名则添加数字后缀 Args: target_dir: 目标目录 original_name: 原始文件夹名称 Returns: str: 唯一的文件夹名称 """ if not (target_dir / original_name).exists(): return original_name # 存在重名,添加数字后缀 counter = 1 while True: new_name = f"{original_name}_{counter}" if not (target_dir / new_name).exists(): return new_name counter += 1 def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict: """ 复制单个项目的dataset文件夹到目标目录 Args: source_project_id: 源项目ID target_dataset_dir: 目标dataset目录 folder_name: 要复制的文件夹名称 project_path: 项目路径 Returns: Dict: 复制结果 """ result = { "success": False, "source_path": "", "target_path": "", "original_folder_name": folder_name, "final_folder_name": folder_name, "error": None } try: source_folder = project_path / "data" / source_project_id / "dataset" / folder_name result["source_path"] = str(source_folder) if not source_folder.exists(): result["error"] = f"Source folder does not exist: {source_folder}" return result # 处理重名冲突 unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name) result["final_folder_name"] = unique_folder_name target_folder = target_dataset_dir / unique_folder_name result["target_path"] = str(target_folder) # 复制文件夹 shutil.copytree(source_folder, target_folder) result["success"] = True logger.info(f" Copied: {source_folder} -> {target_folder}") except Exception as e: result["error"] = str(e) logger.error(f" Error copying {folder_name}: {str(e)}") return result def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str: """ 生成机器人项目的README.md文件 Args: robot_id: 机器人ID dataset_ids: 源项目ID列表 copy_results: 复制结果列表 Returns: str: README.md文件路径 """ readme_path = project_path / "robot" / robot_id / "README.md" readme_path.parent.mkdir(parents=True, exist_ok=True) robot_dir = project_path / "robot" / robot_id # 统计信息 total_folders = len(copy_results) successful_copies = sum(1 for r in copy_results if r["success"]) failed_copies = total_folders - successful_copies # 按源项目分组 project_groups = {} for result in copy_results: if result["success"]: source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/... if source_project not in project_groups: project_groups[source_project] = [] project_groups[source_project].append(result) readme_content = "\n## 目录结构\n\n" # 生成实际目录树 readme_content += "```\n" readme_content += generate_robot_directory_tree(str(robot_dir), robot_id) readme_content += "\n```\n\n" readme_content += "## 数据集详情\n\n" dataset_dir = robot_dir / "dataset" if not dataset_dir.exists(): readme_content += "No dataset files available.\n" else: # 获取所有文档目录 doc_dirs = [] try: for item in sorted(os.listdir(dataset_dir)): item_path = dataset_dir / item if item_path.is_dir(): doc_dirs.append(item) except Exception as e: logger.error(f"Error listing dataset directories: {str(e)}") if not doc_dirs: readme_content += "No document directories found.\n" else: # 按源项目分组显示文档 for project_id, folders in project_groups.items(): for folder in folders: folder_name = folder["final_folder_name"] doc_path = dataset_dir / folder_name readme_content += f"#### {folder_name}\n\n" readme_content += f"**Files:**\n" # 检查文件存在性 document_file = doc_path / "document.txt" pagination_file = doc_path / "pagination.txt" embeddings_file = doc_path / "embedding.pkl" readme_content += f"- `{folder_name}/document.txt`" if document_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/pagination.txt`" if pagination_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/embedding.pkl`" if embeddings_file.exists(): readme_content += " ✓" readme_content += "\n\n" # 添加文档预览 if document_file.exists(): readme_content += f"**内容预览 (前10行):**\n\n```\n" preview = get_document_preview(str(document_file), 10) readme_content += preview readme_content += "\n```\n\n" else: readme_content += f"**内容预览:** 不可用\n\n" # 显示重命名信息 original_name = folder["original_folder_name"] if original_name != folder_name: readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n" readme_content += "---\n\n" # 写入README文件 with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) logger.info(f"Generated README: {readme_path}") return str(readme_path) def _get_robot_dir(project_path: Path, bot_id: str) -> Path: return project_path / "robot" / bot_id def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str, project_path: Path) -> bool: """ 检查是否需要重建机器人项目 1. 检查机器人项目是否存在 2. 检查是否有新增的dataset_id 3. 检查processing_log.json文件是否更新 Args: dataset_ids: 源项目ID列表 bot_id: 机器人ID project_path: 项目路径 Returns: bool: 是否需要重建 """ robot_dir = _get_robot_dir(project_path, bot_id) # 如果机器人项目不存在,需要创建 if not robot_dir.exists(): logger.info(f"Robot project does not exist, need to create: {bot_id}") return True # 检查机器人项目的配置信息 config_file = robot_dir / "robot_config.json" if not config_file.exists(): logger.info(f"Robot config file not found, need to rebuild: {bot_id}") return True # 读取配置信息 try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) cached_dataset_ids = set(config.get("dataset_ids", [])) except Exception as e: logger.error(f"Error reading robot config: {e}, need to rebuild") return True # 检查dataset_ids是否有变化 current_dataset_ids = set(dataset_ids) # 如果有新增的dataset_id new_ids = current_dataset_ids - cached_dataset_ids if new_ids: logger.info(f"Found new dataset_ids: {new_ids}, need to rebuild") return True # 如果有删除的dataset_id removed_ids = cached_dataset_ids - current_dataset_ids if removed_ids: logger.info(f"Removed dataset_ids: {removed_ids}, need to rebuild") return True # 获取机器人项目的最后修改时间 robot_mod_time = robot_dir.stat().st_mtime # 检查每个源项目的processing_log.json文件 for source_project_id in dataset_ids: log_file = project_path / "data" / source_project_id / "processing_log.json" if not log_file.exists(): logger.info(f"Processing log file not found for project {source_project_id}, will rebuild") return True log_mod_time = log_file.stat().st_mtime # 如果任何一个processing_log.json文件比机器人项目新,需要重建 if log_mod_time > robot_mod_time: logger.info(f"Processing log updated for project {source_project_id}, need to rebuild") return True logger.info(f"Robot project {bot_id} is up to date, no rebuild needed") return False def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None, robot_type: str = "catalog_agent") -> str: """ 创建机器人项目,合并多个源项目的dataset文件夹 Args: dataset_ids: 源项目ID列表 bot_id: 机器人ID force_rebuild: 是否强制重建 skills: 技能文件名列表(如 ["rag-retrieve", "device_controller.zip"]) robot_type: 机器人类型 (catalog_agent, deep_agent 等) Returns: str: 机器人项目目录路径 """ # 如果 skills 为空或 None,且 robot_type 是 catalog_agent 或 deep_agent,默认加载 rag-retrieve if not skills and robot_type in ("catalog_agent", "deep_agent"): skills = ["rag-retrieve"] logger.info(f"No skills provided, using default skill 'rag-retrieve' for {robot_type}") logger.info(f"Creating robot project: {bot_id} from sources: {dataset_ids}, skills: {skills}") # 检查是否需要重建 if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id, project_path): robot_dir = project_path / "robot" / bot_id logger.info(f"Using existing robot project: {robot_dir}") # 即使使用现有项目,也要处理 skills(如果提供了) if skills: _extract_skills_to_robot(robot_dir, skills, project_path) return str(robot_dir) # 创建机器人目录结构 robot_dir = _get_robot_dir(project_path, bot_id) dataset_dir = robot_dir / "dataset" # 清理已存在的目录(如果需要) if robot_dir.exists(): logger.info(f"Robot directory already exists, cleaning up: {robot_dir}") shutil.rmtree(robot_dir) robot_dir.mkdir(parents=True, exist_ok=True) dataset_dir.mkdir(parents=True, exist_ok=True) copy_results = [] # 遍历每个源项目 for source_project_id in dataset_ids: logger.info(f"\nProcessing source project: {source_project_id}") source_dataset_dir = project_path / "data" / source_project_id / "dataset" if not source_dataset_dir.exists(): logger.warning(f" Warning: Dataset directory not found for project {source_project_id}") continue # 获取所有子文件夹 folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()] if not folders: logger.warning(f" Warning: No folders found in dataset directory for project {source_project_id}") continue # 复制每个文件夹 for folder in folders: result = copy_dataset_folder(source_project_id, dataset_dir, folder.name, project_path) copy_results.append(result) # 保存配置信息 config_file = robot_dir / "robot_config.json" config_data = { "dataset_ids": dataset_ids, "bot_id": bot_id, "created_at": datetime.now().isoformat(), "total_folders": len(copy_results), "successful_copies": sum(1 for r in copy_results if r["success"]) } with open(config_file, 'w', encoding='utf-8') as f: json.dump(config_data, f, ensure_ascii=False, indent=2) # 生成README readme_path = generate_robot_readme(bot_id, dataset_ids, copy_results, project_path) # 统计信息 successful_copies = sum(1 for r in copy_results if r["success"]) logger.info(f"\nRobot project creation completed:") logger.info(f" Robot directory: {robot_dir}") logger.info(f" Total folders processed: {len(copy_results)}") logger.info(f" Successful copies: {successful_copies}") logger.info(f" Config saved: {config_file}") logger.info(f" README generated: {readme_path}") # 处理 skills 解压 if skills: _extract_skills_to_robot(robot_dir, skills, project_path) return str(robot_dir) if __name__ == "__main__": # 测试代码 test_dataset_ids = ["test-project-1", "test-project-2"] test_bot_id = "test-robot-001" robot_dir = create_robot_project(test_dataset_ids, test_bot_id, robot_type="catalog_agent") logger.info(f"Created robot project at: {robot_dir}") def _extract_skills_to_robot(robot_dir: Path, skills: List[str], project_path: Path) -> None: """ 解压 skills 到 robot 项目的 skills 文件夹 Args: robot_dir: 机器人项目目录 skills: 技能文件名列表(如 ["rag-retrieve", "device_controller.zip"]) project_path: 项目路径 """ import zipfile # skills 源目录在 projects/skills,需要通过解析软链接获取正确路径 # project_path 可能是 ~/.deepagents (软链接 -> projects/robot) # 所以 skills 源目录是 project_path.resolve().parent / "skills" skills_source_dir = project_path / "skills" skills_target_dir = robot_dir / "skills" # 先清空 skills_target_dir,然后重新解压 if skills_target_dir.exists(): logger.info(f" Removing existing skills directory: {skills_target_dir}") shutil.rmtree(skills_target_dir) skills_target_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Extracting skills to {skills_target_dir}") for skill in skills: # 规范化文件名(确保有 .zip 后缀) if not skill.endswith(".zip"): skill_file = skill + ".zip" else: skill_file = skill skill_source_path = skills_source_dir / skill_file if not skill_source_path.exists(): logger.warning(f" Skill file not found: {skill_source_path}") continue # 获取解压后的文件夹名称(去掉 .zip 后缀) folder_name = skill_file.replace(".zip", "") extract_target = skills_target_dir / folder_name # 解压文件 try: with zipfile.ZipFile(skill_source_path, 'r') as zip_ref: zip_ref.extractall(extract_target) logger.info(f" Extracted: {skill_file} -> {extract_target}") except Exception as e: logger.error(f" Failed to extract {skill_file}: {e}")