#!/usr/bin/env python3 """ 多项目管理器:处理多个知识库项目的合并 """ import os import shutil import json import logging from pathlib import Path from typing import List, Dict, Optional from datetime import datetime # Configure logger logger = logging.getLogger('app') from utils.file_utils import get_document_preview from utils import settings def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str: """ 生成机器人项目的目录树结构 Args: robot_dir: 机器人项目目录路径 robot_id: 机器人ID max_depth: 最大深度 Returns: str: 目录树字符串 """ def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]: if depth > max_depth: return [] lines = [] try: entries = sorted(os.listdir(path)) # 分离目录和文件 dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')] entries = dirs + files for i, entry in enumerate(entries): entry_path = os.path.join(path, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 # 选择适当的树形符号 if is_last_entry: connector = "└── " new_prefix = prefix + " " else: connector = "├── " new_prefix = prefix + "│ " # 添加条目行 line = prefix + connector + entry if is_dir: line += "/" lines.append(line) # 递归添加子目录 if is_dir and depth < max_depth: sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1) lines.extend(sub_lines) except PermissionError: lines.append(prefix + "└── [Permission Denied]") except Exception as e: lines.append(prefix + "└── [Error: " + str(e) + "]") return lines # 从dataset目录开始构建树 dataset_dir = os.path.join(robot_dir, "dataset") tree_lines = [] if not os.path.exists(dataset_dir): return "└── [No dataset directory found]" try: entries = sorted(os.listdir(dataset_dir)) dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')] entries = dirs + files if not entries: tree_lines.append("└── [Empty dataset directory]") else: for i, entry in enumerate(entries): entry_path = os.path.join(dataset_dir, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 if is_last_entry: connector = "└── " prefix = " " else: connector = "├── " prefix = "│ " line = connector + entry if is_dir: line += "/" tree_lines.append(line) # 递归添加子目录 if is_dir: sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1) tree_lines.extend(sub_lines) except Exception as e: tree_lines.append(f"└── [Error generating tree: {str(e)}]") return "\n".join(tree_lines) def get_unique_folder_name(target_dir: Path, original_name: str) -> str: """ 获取唯一的文件夹名称,如果存在重名则添加数字后缀 Args: target_dir: 目标目录 original_name: 原始文件夹名称 Returns: str: 唯一的文件夹名称 """ if not (target_dir / original_name).exists(): return original_name # 存在重名,添加数字后缀 counter = 1 while True: new_name = f"{original_name}_{counter}" if not (target_dir / new_name).exists(): return new_name counter += 1 def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict: """ 复制单个项目的dataset文件夹到目标目录 Args: source_project_id: 源项目ID target_dataset_dir: 目标dataset目录 folder_name: 要复制的文件夹名称 project_path: 项目路径 Returns: Dict: 复制结果 """ result = { "success": False, "source_path": "", "target_path": "", "original_folder_name": folder_name, "final_folder_name": folder_name, "error": None } try: source_folder = project_path / "data" / source_project_id / "dataset" / folder_name result["source_path"] = str(source_folder) if not source_folder.exists(): result["error"] = f"Source folder does not exist: {source_folder}" return result # 处理重名冲突 unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name) result["final_folder_name"] = unique_folder_name target_folder = target_dataset_dir / unique_folder_name result["target_path"] = str(target_folder) # 复制文件夹 shutil.copytree(source_folder, target_folder) result["success"] = True logger.info(f" Copied: {source_folder} -> {target_folder}") except Exception as e: result["error"] = str(e) logger.error(f" Error copying {folder_name}: {str(e)}") return result def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str: """ 生成机器人项目的README.md文件 Args: robot_id: 机器人ID dataset_ids: 源项目ID列表 copy_results: 复制结果列表 Returns: str: README.md文件路径 """ readme_path = project_path / "robot" / robot_id / "README.md" readme_path.parent.mkdir(parents=True, exist_ok=True) robot_dir = project_path / "robot" / robot_id # 统计信息 total_folders = len(copy_results) successful_copies = sum(1 for r in copy_results if r["success"]) failed_copies = total_folders - successful_copies # 按源项目分组 project_groups = {} for result in copy_results: if result["success"]: source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/... if source_project not in project_groups: project_groups[source_project] = [] project_groups[source_project].append(result) readme_content = "\n## 目录结构\n\n" # 生成实际目录树 readme_content += "```\n" readme_content += generate_robot_directory_tree(str(robot_dir), robot_id) readme_content += "\n```\n\n" readme_content += "## 数据集详情\n\n" dataset_dir = robot_dir / "dataset" if not dataset_dir.exists(): readme_content += "No dataset files available.\n" else: # 获取所有文档目录 doc_dirs = [] try: for item in sorted(os.listdir(dataset_dir)): item_path = dataset_dir / item if item_path.is_dir(): doc_dirs.append(item) except Exception as e: logger.error(f"Error listing dataset directories: {str(e)}") if not doc_dirs: readme_content += "No document directories found.\n" else: # 按源项目分组显示文档 for project_id, folders in project_groups.items(): for folder in folders: folder_name = folder["final_folder_name"] doc_path = dataset_dir / folder_name readme_content += f"#### {folder_name}\n\n" readme_content += f"**Files:**\n" # 检查文件存在性 document_file = doc_path / "document.txt" pagination_file = doc_path / "pagination.txt" embeddings_file = doc_path / "embedding.pkl" readme_content += f"- `{folder_name}/document.txt`" if document_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/pagination.txt`" if pagination_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/embedding.pkl`" if embeddings_file.exists(): readme_content += " ✓" readme_content += "\n\n" # 添加文档预览 if document_file.exists(): readme_content += f"**内容预览 (前10行):**\n\n```\n" preview = get_document_preview(str(document_file), 10) readme_content += preview readme_content += "\n```\n\n" else: readme_content += f"**内容预览:** 不可用\n\n" # 显示重命名信息 original_name = folder["original_folder_name"] if original_name != folder_name: readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n" readme_content += "---\n\n" # 写入README文件 with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) logger.info(f"Generated README: {readme_path}") return str(readme_path) def _get_robot_dir(project_path: Path, bot_id: str) -> Path: return project_path / "robot" / bot_id def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None) -> str: """ 确保机器人项目目录存在(不进行重建) Args: dataset_ids: 源项目ID列表(已废弃,保留兼容性) bot_id: 机器人ID force_rebuild: 忽略此参数(保留兼容性) skills: 技能文件名列表 Returns: str: 机器人项目目录路径 """ logger.info(f"Ensuring robot project exists: {bot_id}, skills: {skills}") # 创建机器人目录结构(如果不存在) robot_dir = _get_robot_dir(project_path, bot_id) dataset_dir = robot_dir / "dataset" scripts_dir = robot_dir / "scripts" download_dir = robot_dir / "download" # 创建目录(不删除已存在的内容) robot_dir.mkdir(parents=True, exist_ok=True) dataset_dir.mkdir(parents=True, exist_ok=True) scripts_dir.mkdir(parents=True, exist_ok=True) download_dir.mkdir(parents=True, exist_ok=True) # 清空 dataset_dir 下的所有软链接 for item in dataset_dir.iterdir(): if item.is_symlink(): item.unlink() logger.info(f"Removed from dataset_dir: {item}") # 为 dataset_ids 创建软链接 docs_datasets_dir = project_path / "docs" / "datasets" for dataset_id in dataset_ids: source = docs_datasets_dir / dataset_id target = dataset_dir / dataset_id if source.exists(): os.symlink(source.resolve(), target) logger.info(f"Created symlink: {target} -> {source.resolve()}") else: logger.warning(f"Dataset source not found, skipping symlink: {source}") # 处理 skills(每次都更新) if skills: _extract_skills_to_robot(bot_id, skills, project_path) logger.info(f"Robot project ready: {robot_dir}") return str(robot_dir) if __name__ == "__main__": # 测试代码 test_dataset_ids = ["test-project-1", "test-project-2"] test_bot_id = "test-robot-001" robot_dir = create_robot_project(test_dataset_ids, test_bot_id) logger.info(f"Created robot project at: {robot_dir}") def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path) -> None: """ 复制 skills 到 robot 项目的 skills 文件夹 - 如果是完整路径(如 "projects/uploads/xxx/skills/rag-retrieve_2.zip"),直接使用该路径 - 如果是简单名称(如 "rag-retrieve"),从以下目录按优先级顺序查找: 1. projects/uploads/{bot_id}/skills/ 2. skills/ 搜索目录优先级:先搜索 projects/uploads/{bot_id}/skills/,再搜索 skills/ Args: bot_id: 机器人 ID skills: 技能文件名列表(如 ["rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"]) project_path: 项目路径 """ import zipfile # skills 源目录(按优先级顺序) skills_source_dirs = [ project_path / "uploads" / bot_id / "skills", Path("skills"), ] skills_target_dir = project_path / "robot" / bot_id / "skills" # 先清空 skills_target_dir,然后重新复制 if skills_target_dir.exists(): logger.info(f" Removing existing skills directory: {skills_target_dir}") shutil.rmtree(skills_target_dir) skills_target_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Copying skills to {skills_target_dir}") for skill in skills: source_dir = None # 简单名称:按优先级顺序在多个目录中查找 for base_dir in skills_source_dirs: candidate_dir = base_dir / skill if candidate_dir.exists(): source_dir = candidate_dir logger.info(f" Found skill '{skill}' in {base_dir}") break if source_dir is None: logger.warning(f" Skill directory '{skill}' not found in any source directory: {[str(d) for d in skills_source_dirs]}") continue if not source_dir.exists(): logger.warning(f" Skill directory not found: {source_dir}") continue target_dir = skills_target_dir / os.path.basename(skill) try: shutil.copytree(source_dir, target_dir) logger.info(f" Copied: {source_dir} -> {target_dir}") except Exception as e: logger.error(f" Failed to copy {source_dir}: {e}")