#!/usr/bin/env python3 """ 多项目管理器:处理多个知识库项目的合并 """ import os import shutil import json from pathlib import Path from typing import List, Dict, Optional from datetime import datetime from utils.file_utils import get_document_preview def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str: """ 生成机器人项目的目录树结构 Args: robot_dir: 机器人项目目录路径 robot_id: 机器人ID max_depth: 最大深度 Returns: str: 目录树字符串 """ def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]: if depth > max_depth: return [] lines = [] try: entries = sorted(os.listdir(path)) # 分离目录和文件 dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')] entries = dirs + files for i, entry in enumerate(entries): entry_path = os.path.join(path, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 # 选择适当的树形符号 if is_last_entry: connector = "└── " new_prefix = prefix + " " else: connector = "├── " new_prefix = prefix + "│ " # 添加条目行 line = prefix + connector + entry if is_dir: line += "/" lines.append(line) # 递归添加子目录 if is_dir and depth < max_depth: sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1) lines.extend(sub_lines) except PermissionError: lines.append(prefix + "└── [Permission Denied]") except Exception as e: lines.append(prefix + "└── [Error: " + str(e) + "]") return lines # 从dataset目录开始构建树 dataset_dir = os.path.join(robot_dir, "dataset") tree_lines = [] if not os.path.exists(dataset_dir): return "└── [No dataset directory found]" try: entries = sorted(os.listdir(dataset_dir)) dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')] entries = dirs + files if not entries: tree_lines.append("└── [Empty dataset directory]") else: for i, entry in enumerate(entries): entry_path = os.path.join(dataset_dir, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 if is_last_entry: connector = "└── " prefix = " " else: connector = "├── " prefix = "│ " line = connector + entry if is_dir: line += "/" tree_lines.append(line) # 递归添加子目录 if is_dir: sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1) tree_lines.extend(sub_lines) except Exception as e: tree_lines.append(f"└── [Error generating tree: {str(e)}]") return "\n".join(tree_lines) def get_unique_folder_name(target_dir: Path, original_name: str) -> str: """ 获取唯一的文件夹名称,如果存在重名则添加数字后缀 Args: target_dir: 目标目录 original_name: 原始文件夹名称 Returns: str: 唯一的文件夹名称 """ if not (target_dir / original_name).exists(): return original_name # 存在重名,添加数字后缀 counter = 1 while True: new_name = f"{original_name}_{counter}" if not (target_dir / new_name).exists(): return new_name counter += 1 def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str) -> Dict: """ 复制单个项目的dataset文件夹到目标目录 Args: source_project_id: 源项目ID target_dataset_dir: 目标dataset目录 folder_name: 要复制的文件夹名称 Returns: Dict: 复制结果 """ result = { "success": False, "source_path": "", "target_path": "", "original_folder_name": folder_name, "final_folder_name": folder_name, "error": None } try: source_folder = Path("projects") / "data" / source_project_id / "dataset" / folder_name result["source_path"] = str(source_folder) if not source_folder.exists(): result["error"] = f"Source folder does not exist: {source_folder}" return result # 处理重名冲突 unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name) result["final_folder_name"] = unique_folder_name target_folder = target_dataset_dir / unique_folder_name result["target_path"] = str(target_folder) # 复制文件夹 shutil.copytree(source_folder, target_folder) result["success"] = True print(f" Copied: {source_folder} -> {target_folder}") except Exception as e: result["error"] = str(e) print(f" Error copying {folder_name}: {str(e)}") return result def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict]) -> str: """ 生成机器人项目的README.md文件 Args: robot_id: 机器人ID dataset_ids: 源项目ID列表 copy_results: 复制结果列表 Returns: str: README.md文件路径 """ readme_path = Path("projects") / "robot" / robot_id / "README.md" readme_path.parent.mkdir(parents=True, exist_ok=True) robot_dir = Path("projects") / "robot" / robot_id # 统计信息 total_folders = len(copy_results) successful_copies = sum(1 for r in copy_results if r["success"]) failed_copies = total_folders - successful_copies # 按源项目分组 project_groups = {} for result in copy_results: if result["success"]: source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/... if source_project not in project_groups: project_groups[source_project] = [] project_groups[source_project].append(result) readme_content = "\n## 目录结构\n\n" # 生成实际目录树 readme_content += "```\n" readme_content += generate_robot_directory_tree(str(robot_dir), robot_id) readme_content += "\n```\n\n" readme_content += "## 数据集详情\n\n" dataset_dir = robot_dir / "dataset" if not dataset_dir.exists(): readme_content += "No dataset files available.\n" else: # 获取所有文档目录 doc_dirs = [] try: for item in sorted(os.listdir(dataset_dir)): item_path = dataset_dir / item if item_path.is_dir(): doc_dirs.append(item) except Exception as e: print(f"Error listing dataset directories: {str(e)}") if not doc_dirs: readme_content += "No document directories found.\n" else: # 按源项目分组显示文档 for project_id, folders in project_groups.items(): for folder in folders: folder_name = folder["final_folder_name"] doc_path = dataset_dir / folder_name readme_content += f"#### {folder_name}\n\n" readme_content += f"**Files:**\n" # 检查文件存在性 document_file = doc_path / "document.txt" pagination_file = doc_path / "pagination.txt" embeddings_file = doc_path / "embedding.pkl" readme_content += f"- `{folder_name}/document.txt`" if document_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/pagination.txt`" if pagination_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/embedding.pkl`" if embeddings_file.exists(): readme_content += " ✓" readme_content += "\n\n" # 添加文档预览 if document_file.exists(): readme_content += f"**内容预览 (前10行):**\n\n```\n" preview = get_document_preview(str(document_file), 10) readme_content += preview readme_content += "\n```\n\n" else: readme_content += f"**内容预览:** 不可用\n\n" # 显示重命名信息 original_name = folder["original_folder_name"] if original_name != folder_name: readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n" readme_content += "---\n\n" # 写入README文件 with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) print(f"Generated README: {readme_path}") return str(readme_path) def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool: """ 检查是否需要重建机器人项目 1. 检查机器人项目是否存在 2. 检查是否有新增的dataset_id 3. 检查processing_log.json文件是否更新 Args: dataset_ids: 源项目ID列表 bot_id: 机器人ID Returns: bool: 是否需要重建 """ robot_dir = Path("projects") / "robot" / bot_id # 如果机器人项目不存在,需要创建 if not robot_dir.exists(): print(f"Robot project does not exist, need to create: {bot_id}") return True # 检查机器人项目的配置信息 config_file = robot_dir / "robot_config.json" if not config_file.exists(): print(f"Robot config file not found, need to rebuild: {bot_id}") return True # 读取配置信息 try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) cached_dataset_ids = set(config.get("dataset_ids", [])) except Exception as e: print(f"Error reading robot config: {e}, need to rebuild") return True # 检查dataset_ids是否有变化 current_dataset_ids = set(dataset_ids) # 如果有新增的dataset_id new_ids = current_dataset_ids - cached_dataset_ids if new_ids: print(f"Found new dataset_ids: {new_ids}, need to rebuild") return True # 如果有删除的dataset_id removed_ids = cached_dataset_ids - current_dataset_ids if removed_ids: print(f"Removed dataset_ids: {removed_ids}, need to rebuild") return True # 获取机器人项目的最后修改时间 robot_mod_time = robot_dir.stat().st_mtime # 检查每个源项目的processing_log.json文件 for source_project_id in dataset_ids: log_file = Path("projects") / "data" / source_project_id / "processing_log.json" if not log_file.exists(): print(f"Processing log file not found for project {source_project_id}, will rebuild") return True log_mod_time = log_file.stat().st_mtime # 如果任何一个processing_log.json文件比机器人项目新,需要重建 if log_mod_time > robot_mod_time: print(f"Processing log updated for project {source_project_id}, need to rebuild") return True print(f"Robot project {bot_id} is up to date, no rebuild needed") return False def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False) -> str: """ 创建机器人项目,合并多个源项目的dataset文件夹 Args: dataset_ids: 源项目ID列表 bot_id: 机器人ID force_rebuild: 是否强制重建 Returns: str: 机器人项目目录路径 """ print(f"Creating robot project: {bot_id} from sources: {dataset_ids}") # 检查是否需要重建 if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id): robot_dir = Path("projects") / "robot" / bot_id print(f"Using existing robot project: {robot_dir}") return str(robot_dir) # 创建机器人目录结构 robot_dir = Path("projects") / "robot" / bot_id dataset_dir = robot_dir / "dataset" # 清理已存在的目录(如果需要) if robot_dir.exists(): print(f"Robot directory already exists, cleaning up: {robot_dir}") shutil.rmtree(robot_dir) robot_dir.mkdir(parents=True, exist_ok=True) dataset_dir.mkdir(parents=True, exist_ok=True) copy_results = [] # 遍历每个源项目 for source_project_id in dataset_ids: print(f"\nProcessing source project: {source_project_id}") source_dataset_dir = Path("projects") / "data" / source_project_id / "dataset" if not source_dataset_dir.exists(): print(f" Warning: Dataset directory not found for project {source_project_id}") continue # 获取所有子文件夹 folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()] if not folders: print(f" Warning: No folders found in dataset directory for project {source_project_id}") continue # 复制每个文件夹 for folder in folders: result = copy_dataset_folder(source_project_id, dataset_dir, folder.name) copy_results.append(result) # 保存配置信息 config_file = robot_dir / "robot_config.json" config_data = { "dataset_ids": dataset_ids, "created_at": datetime.now().isoformat(), "total_folders": len(copy_results), "successful_copies": sum(1 for r in copy_results if r["success"]) } with open(config_file, 'w', encoding='utf-8') as f: json.dump(config_data, f, ensure_ascii=False, indent=2) # 生成README readme_path = generate_robot_readme(bot_id, dataset_ids, copy_results) # 统计信息 successful_copies = sum(1 for r in copy_results if r["success"]) print(f"\nRobot project creation completed:") print(f" Robot directory: {robot_dir}") print(f" Total folders processed: {len(copy_results)}") print(f" Successful copies: {successful_copies}") print(f" Config saved: {config_file}") print(f" README generated: {readme_path}") return str(robot_dir) def get_robot_project_info(bot_id: str) -> Dict: """ 获取机器人项目信息 Args: bot_id: 机器人ID Returns: Dict: 机器人项目信息 """ robot_dir = Path("projects") / "robot" / bot_id if not robot_dir.exists(): return { "exists": False, "bot_id": bot_id, "error": "Robot project does not exist" } dataset_dir = robot_dir / "dataset" readme_path = robot_dir / "README.md" # 统计文件夹数量 folder_count = 0 total_size = 0 if dataset_dir.exists(): for item in dataset_dir.iterdir(): if item.is_dir(): folder_count += 1 # 计算文件夹大小 for file_path in item.rglob('*'): if file_path.is_file(): total_size += file_path.stat().st_size return { "exists": True, "bot_id": bot_id, "robot_dir": str(robot_dir), "dataset_dir": str(dataset_dir), "readme_exists": readme_path.exists(), "folder_count": folder_count, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2) } def cleanup_robot_project(bot_id: str) -> bool: """ 清理机器人项目 Args: bot_id: 机器人ID Returns: bool: 清理是否成功 """ try: robot_dir = Path("projects") / "robot" / bot_id if robot_dir.exists(): shutil.rmtree(robot_dir) print(f"Cleaned up robot project: {bot_id}") return True else: print(f"Robot project does not exist: {bot_id}") return True except Exception as e: print(f"Error cleaning up robot project {bot_id}: {str(e)}") return False if __name__ == "__main__": # 测试代码 test_dataset_ids = ["test-project-1", "test-project-2"] test_bot_id = "test-robot-001" robot_dir = create_robot_project(test_dataset_ids, test_bot_id) print(f"Created robot project at: {robot_dir}") info = get_robot_project_info(test_bot_id) print(f"Robot project info: {json.dumps(info, indent=2, ensure_ascii=False)}")