#!/usr/bin/env python3 """ Multi-project manager for merging multiple knowledge-base projects. """ import os import re import shutil import json import logging import re from pathlib import Path from typing import List, Dict, Optional from datetime import datetime # Configure logger logger = logging.getLogger('app') from utils.file_utils import get_document_preview from utils import settings def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str: """ Generate the directory tree for a robot project. Args: robot_dir: Path to the robot project directory robot_id: Robot ID max_depth: Maximum depth Returns: str: Directory tree string """ def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]: if depth > max_depth: return [] lines = [] try: entries = sorted(os.listdir(path)) # Separate directories and files dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')] entries = dirs + files for i, entry in enumerate(entries): entry_path = os.path.join(path, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 # Choose the appropriate tree connector if is_last_entry: connector = "└── " new_prefix = prefix + " " else: connector = "├── " new_prefix = prefix + "│ " # Add the entry line line = prefix + connector + entry if is_dir: line += "/" lines.append(line) # Recursively add subdirectories if is_dir and depth < max_depth: sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1) lines.extend(sub_lines) except PermissionError: lines.append(prefix + "└── [Permission Denied]") except Exception as e: lines.append(prefix + "└── [Error: " + str(e) + "]") return lines # Build the tree starting from the datasets directory dataset_dir = os.path.join(robot_dir, "datasets") tree_lines = [] if not os.path.exists(dataset_dir): return "└── [No dataset directory found]" try: entries = sorted(os.listdir(dataset_dir)) dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')] files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')] entries = dirs + files if not entries: tree_lines.append("└── [Empty dataset directory]") else: for i, entry in enumerate(entries): entry_path = os.path.join(dataset_dir, entry) is_dir = os.path.isdir(entry_path) is_last_entry = i == len(entries) - 1 if is_last_entry: connector = "└── " prefix = " " else: connector = "├── " prefix = "│ " line = connector + entry if is_dir: line += "/" tree_lines.append(line) # Recursively add subdirectories if is_dir: sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1) tree_lines.extend(sub_lines) except Exception as e: tree_lines.append(f"└── [Error generating tree: {str(e)}]") return "\n".join(tree_lines) def get_unique_folder_name(target_dir: Path, original_name: str) -> str: """ Get a unique folder name by appending a numeric suffix when needed. Args: target_dir: Target directory original_name: Original folder name Returns: str: Unique folder name """ if not (target_dir / original_name).exists(): return original_name # Add a numeric suffix when a name collision exists counter = 1 while True: new_name = f"{original_name}_{counter}" if not (target_dir / new_name).exists(): return new_name counter += 1 def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict: """ Copy a single project's dataset folder to the target directory. Args: source_project_id: Source project ID target_dataset_dir: Target datasets directory folder_name: Folder name to copy project_path: Project path Returns: Dict: Copy result """ result = { "success": False, "source_path": "", "target_path": "", "original_folder_name": folder_name, "final_folder_name": folder_name, "error": None } try: source_folder = project_path / "data" / source_project_id / "datasets" / folder_name result["source_path"] = str(source_folder) if not source_folder.exists(): result["error"] = f"Source folder does not exist: {source_folder}" return result # Handle name collisions unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name) result["final_folder_name"] = unique_folder_name target_folder = target_dataset_dir / unique_folder_name result["target_path"] = str(target_folder) # Copy the folder shutil.copytree(source_folder, target_folder) result["success"] = True logger.info(f" Copied: {source_folder} -> {target_folder}") except Exception as e: result["error"] = str(e) logger.error(f" Error copying {folder_name}: {str(e)}") return result def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str: """ Generate the README.md file for a robot project. Args: robot_id: Robot ID dataset_ids: List of source project IDs copy_results: List of copy results Returns: str: Path to the README.md file """ readme_path = project_path / "robot" / robot_id / "README.md" readme_path.parent.mkdir(parents=True, exist_ok=True) robot_dir = project_path / "robot" / robot_id # Statistics total_folders = len(copy_results) successful_copies = sum(1 for r in copy_results if r["success"]) failed_copies = total_folders - successful_copies # Group by source project project_groups = {} for result in copy_results: if result["success"]: source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/... if source_project not in project_groups: project_groups[source_project] = [] project_groups[source_project].append(result) readme_content = "\n## Directory Structure\n\n" # Generate the actual directory tree readme_content += "```\n" readme_content += generate_robot_directory_tree(str(robot_dir), robot_id) readme_content += "\n```\n\n" readme_content += "## Dataset Details\n\n" dataset_dir = robot_dir / "datasets" if not dataset_dir.exists(): readme_content += "No dataset files available.\n" else: # Get all document directories doc_dirs = [] try: for item in sorted(os.listdir(dataset_dir)): item_path = dataset_dir / item if item_path.is_dir(): doc_dirs.append(item) except Exception as e: logger.error(f"Error listing dataset directories: {str(e)}") if not doc_dirs: readme_content += "No document directories found.\n" else: # Display documents grouped by source project for project_id, folders in project_groups.items(): for folder in folders: folder_name = folder["final_folder_name"] doc_path = dataset_dir / folder_name readme_content += f"#### {folder_name}\n\n" readme_content += f"**Files:**\n" # Check file existence document_file = doc_path / "document.txt" pagination_file = doc_path / "pagination.txt" embeddings_file = doc_path / "embedding.pkl" readme_content += f"- `{folder_name}/document.txt`" if document_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/pagination.txt`" if pagination_file.exists(): readme_content += " ✓" readme_content += "\n" readme_content += f"- `{folder_name}/embedding.pkl`" if embeddings_file.exists(): readme_content += " ✓" readme_content += "\n\n" # Add document preview if document_file.exists(): readme_content += f"**Content Preview (first 10 lines):**\n\n```\n" preview = get_document_preview(str(document_file), 10) readme_content += preview readme_content += "\n```\n\n" else: readme_content += f"**Content Preview:** Not available\n\n" # Show rename information original_name = folder["original_folder_name"] if original_name != folder_name: readme_content += f"**Original Name:** `{original_name}` → `{folder_name}`\n\n" readme_content += "---\n\n" # Write the README file with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme_content) logger.info(f"Generated README: {readme_path}") return str(readme_path) def _get_robot_dir(project_path: Path, bot_id: str) -> Path: return project_path / "robot" / bot_id def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None) -> str: """ Ensure the robot project directory exists without rebuilding it. Args: dataset_ids: List of source project IDs, deprecated and kept for compatibility bot_id: Robot ID force_rebuild: Ignored and kept for compatibility skills: List of skill filenames Returns: str: Robot project directory path """ def _skill_matches_autoload(skill: str, autoload_skill_name: str) -> bool: normalized_skill = Path(skill.lstrip("@")).name.lower() normalized_autoload_skill_name = autoload_skill_name.lower() if re.search(re.escape(normalized_autoload_skill_name), normalized_skill): return True autoload_prefix = normalized_autoload_skill_name.split("-")[0] return bool(autoload_prefix and re.search(re.escape(autoload_prefix), normalized_skill)) skills = list(skills or []) if os.path.isabs(settings.SKILLS_DIR): autoload_skills_dir = Path(settings.SKILLS_DIR) / "autoload" / settings.PROJECT_NAME else: autoload_skills_dir = project_path.parent / settings.SKILLS_DIR / "autoload" / settings.PROJECT_NAME if autoload_skills_dir.exists(): for item in sorted(autoload_skills_dir.iterdir()): if not item.is_dir() or any(_skill_matches_autoload(skill, item.name) for skill in skills): continue skill_path = f"@skills/autoload/{settings.PROJECT_NAME}/{item.name}" skills.append(skill_path) logger.info(f"Auto loaded skill '{skill_path}' from {autoload_skills_dir}") else: logger.warning(f"Autoload skills directory does not exist: {autoload_skills_dir}") logger.info(f"Ensuring robot project exists: {bot_id}, skills: {skills}") # Create the robot directory structure if it does not exist robot_dir = _get_robot_dir(project_path, bot_id) dataset_dir = robot_dir / "datasets" scripts_dir = robot_dir / "scripts" download_dir = robot_dir / "download" # Create directories without deleting existing content robot_dir.mkdir(parents=True, exist_ok=True) dataset_dir.mkdir(parents=True, exist_ok=True) scripts_dir.mkdir(parents=True, exist_ok=True) download_dir.mkdir(parents=True, exist_ok=True) # Remove all symlinks under dataset_dir for item in dataset_dir.iterdir(): if item.is_symlink(): item.unlink() logger.info(f"Removed from dataset_dir: {item}") # Create symlinks for dataset_ids docs_datasets_dir = project_path / "docs" / "datasets" for dataset_id in dataset_ids: source = docs_datasets_dir / dataset_id target = dataset_dir / dataset_id if source.exists(): os.symlink(source.resolve(), target) logger.info(f"Created symlink: {target} -> {source.resolve()}") else: logger.warning(f"Dataset source not found, skipping symlink: {source}") # Process skills and keep them updated each time if skills: _extract_skills_to_robot(bot_id, skills, project_path) logger.info(f"Robot project ready: {robot_dir}") return str(robot_dir) if __name__ == "__main__": # Test code test_dataset_ids = ["test-project-1", "test-project-2"] test_bot_id = "test-robot-001" robot_dir = create_robot_project(test_dataset_ids, test_bot_id) logger.info(f"Created robot project at: {robot_dir}") def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path) -> None: """ Copy skills into the robot project's skills directory. - If a full path is provided (for example "projects/uploads/xxx/skills/rag-retrieve_2.zip"), use it directly - If a simple name is provided (for example "rag-retrieve"), search in this priority order: 1. projects/uploads/{bot_id}/skills/ 2. skills/{PROJECT_NAME}/ - If a repository-relative path starting with @ is provided (for example "@skills/autoload/support/rag-retrieve"), resolve it directly from the repo root Search priority for skill directories: first projects/uploads/{bot_id}/skills/, then skills/common, and finally skills/{PROJECT_NAME} Args: bot_id: Robot ID skills: List of skill filenames (for example ["rag-retrieve", "@skills/autoload/support/rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"]) project_path: Project path """ # Skill source directories in priority order repo_root = Path(__file__).resolve().parent.parent official_skills_dir = repo_root / "skills" / settings.PROJECT_NAME autoload_skills_dir = repo_root / "skills" / "autoload" / settings.PROJECT_NAME if not official_skills_dir.exists(): logger.warning(f"Official skills directory does not exist: {official_skills_dir}") skills_source_dirs = [ project_path / "uploads" / bot_id / "skills", repo_root / "skills" / "common", official_skills_dir, ] managed_skill_dirs = [*skills_source_dirs, autoload_skills_dir] skills_target_dir = project_path / "robot" / bot_id / "skills" skills_target_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Copying skills to {skills_target_dir}") managed_skill_names = set() for base_dir in managed_skill_dirs: if not base_dir.exists(): continue for item in base_dir.iterdir(): if item.is_dir(): managed_skill_names.add(item.name) # Remove extra managed skill directories that are no longer expected expected_skill_names = {Path(skill.lstrip("@")).name for skill in skills} if skills_target_dir.exists(): for item in skills_target_dir.iterdir(): if not item.is_dir() or item.name in expected_skill_names: continue if item.name in managed_skill_names: logger.info(f" Removing managed stale skill directory: {item}") shutil.rmtree(item) else: logger.info(f" Keeping unmanaged skill directory: {item}") for skill in skills: skill_name = Path(skill.lstrip("@")).name target_dir = skills_target_dir / skill_name source_dir = None if skill.startswith("@"): candidate_dir = repo_root / skill.lstrip("@") if candidate_dir.exists(): source_dir = candidate_dir logger.info(f" Found skill '{skill}' at {candidate_dir}") # For simple names, search multiple directories in priority order if source_dir is None: for base_dir in skills_source_dirs: candidate_dir = base_dir / skill if candidate_dir.exists(): source_dir = candidate_dir logger.info(f" Found skill '{skill}' in {base_dir}") break if source_dir is None: logger.warning(f" Skill directory '{skill}' not found in any source directory: {[str(d) for d in skills_source_dirs]}") continue try: shutil.copytree(source_dir, target_dir, dirs_exist_ok=True) logger.info(f" Synced: {source_dir} -> {target_dir}") except Exception as e: logger.error(f" Failed to copy {source_dir}: {e}") _COMMON_ENV_KEYS = frozenset({ 'TMPDIR', 'PATH', 'HOME', 'USER', 'SHELL', 'LANG', 'TERM', 'PWD', 'OLDPWD', 'NODE_ENV', 'MASTERKEY', 'ASSISTANT_ID', 'USER_IDENTIFIER' , 'TRACE_ID' }) _ENV_PATTERNS = [ re.compile(r'process\.env\.(\w+)'), re.compile(r'os\.getenv\([\'"](\w+)'), re.compile(r'os\.environ\.get\([\'"](\w+)'), re.compile(r'os\.environ\[[\'"](\w+)'), ] _SCAN_EXTENSIONS = {'.js', '.py', '.ts', '.sh', '.md', '.jsx', '.tsx', '.mjs', '.cjs'} def scan_skill_env_keys(bot_id: str, skills: List[str], project_path: Path) -> set: """ 扫描 skills 目录下所有文件,提取引用的环境变量 KEY。 Args: bot_id: 机器人 ID skills: 技能名称列表 project_path: 项目路径(如 Path("projects")) Returns: set: 环境变量 KEY 集合(排除通用变量) """ skills_source_dirs = [ project_path / "uploads" / bot_id / "skills", Path("skills"), ] env_keys = set() for skill in skills: source_dir = None for base_dir in skills_source_dirs: candidate = base_dir / skill if candidate.exists(): source_dir = candidate break if source_dir is None or not source_dir.exists(): continue for file_path in source_dir.rglob('*'): if not file_path.is_file(): continue if file_path.suffix.lower() not in _SCAN_EXTENSIONS: continue try: content = file_path.read_text(encoding='utf-8', errors='ignore') except Exception: continue for pattern in _ENV_PATTERNS: env_keys.update(pattern.findall(content)) env_keys -= _COMMON_ENV_KEYS return env_keys