qwen_agent/utils/multi_project_manager.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

479 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Multi-project manager for merging multiple knowledge-base projects.
"""
import os
import shutil
import json
import logging
import re
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
# Configure logger
logger = logging.getLogger('app')
from utils.file_utils import get_document_preview
from utils import settings
def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str:
"""
Generate the directory tree for a robot project.
Args:
robot_dir: Path to the robot project directory
robot_id: Robot ID
max_depth: Maximum depth
Returns:
str: Directory tree string
"""
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
if depth > max_depth:
return []
lines = []
try:
entries = sorted(os.listdir(path))
# Separate directories and files
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
entries = dirs + files
for i, entry in enumerate(entries):
entry_path = os.path.join(path, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
# Choose the appropriate tree connector
if is_last_entry:
connector = "└── "
new_prefix = prefix + " "
else:
connector = "├── "
new_prefix = prefix + ""
# Add the entry line
line = prefix + connector + entry
if is_dir:
line += "/"
lines.append(line)
# Recursively add subdirectories
if is_dir and depth < max_depth:
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
lines.extend(sub_lines)
except PermissionError:
lines.append(prefix + "└── [Permission Denied]")
except Exception as e:
lines.append(prefix + "└── [Error: " + str(e) + "]")
return lines
# Build the tree starting from the datasets directory
dataset_dir = os.path.join(robot_dir, "datasets")
tree_lines = []
if not os.path.exists(dataset_dir):
return "└── [No dataset directory found]"
try:
entries = sorted(os.listdir(dataset_dir))
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
entries = dirs + files
if not entries:
tree_lines.append("└── [Empty dataset directory]")
else:
for i, entry in enumerate(entries):
entry_path = os.path.join(dataset_dir, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
if is_last_entry:
connector = "└── "
prefix = " "
else:
connector = "├── "
prefix = ""
line = connector + entry
if is_dir:
line += "/"
tree_lines.append(line)
# Recursively add subdirectories
if is_dir:
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
tree_lines.extend(sub_lines)
except Exception as e:
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
return "\n".join(tree_lines)
def get_unique_folder_name(target_dir: Path, original_name: str) -> str:
"""
Get a unique folder name by appending a numeric suffix when needed.
Args:
target_dir: Target directory
original_name: Original folder name
Returns:
str: Unique folder name
"""
if not (target_dir / original_name).exists():
return original_name
# Add a numeric suffix when a name collision exists
counter = 1
while True:
new_name = f"{original_name}_{counter}"
if not (target_dir / new_name).exists():
return new_name
counter += 1
def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str, project_path: Path) -> Dict:
"""
Copy a single project's dataset folder to the target directory.
Args:
source_project_id: Source project ID
target_dataset_dir: Target datasets directory
folder_name: Folder name to copy
project_path: Project path
Returns:
Dict: Copy result
"""
result = {
"success": False,
"source_path": "",
"target_path": "",
"original_folder_name": folder_name,
"final_folder_name": folder_name,
"error": None
}
try:
source_folder = project_path / "data" / source_project_id / "datasets" / folder_name
result["source_path"] = str(source_folder)
if not source_folder.exists():
result["error"] = f"Source folder does not exist: {source_folder}"
return result
# Handle name collisions
unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name)
result["final_folder_name"] = unique_folder_name
target_folder = target_dataset_dir / unique_folder_name
result["target_path"] = str(target_folder)
# Copy the folder
shutil.copytree(source_folder, target_folder)
result["success"] = True
logger.info(f" Copied: {source_folder} -> {target_folder}")
except Exception as e:
result["error"] = str(e)
logger.error(f" Error copying {folder_name}: {str(e)}")
return result
def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict], project_path: Path) -> str:
"""
Generate the README.md file for a robot project.
Args:
robot_id: Robot ID
dataset_ids: List of source project IDs
copy_results: List of copy results
Returns:
str: Path to the README.md file
"""
readme_path = project_path / "robot" / robot_id / "README.md"
readme_path.parent.mkdir(parents=True, exist_ok=True)
robot_dir = project_path / "robot" / robot_id
# Statistics
total_folders = len(copy_results)
successful_copies = sum(1 for r in copy_results if r["success"])
failed_copies = total_folders - successful_copies
# Group by source project
project_groups = {}
for result in copy_results:
if result["success"]:
source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/...
if source_project not in project_groups:
project_groups[source_project] = []
project_groups[source_project].append(result)
readme_content = "\n## Directory Structure\n\n"
# Generate the actual directory tree
readme_content += "```\n"
readme_content += generate_robot_directory_tree(str(robot_dir), robot_id)
readme_content += "\n```\n\n"
readme_content += "## Dataset Details\n\n"
dataset_dir = robot_dir / "datasets"
if not dataset_dir.exists():
readme_content += "No dataset files available.\n"
else:
# Get all document directories
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = dataset_dir / item
if item_path.is_dir():
doc_dirs.append(item)
except Exception as e:
logger.error(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
# Display documents grouped by source project
for project_id, folders in project_groups.items():
for folder in folders:
folder_name = folder["final_folder_name"]
doc_path = dataset_dir / folder_name
readme_content += f"#### {folder_name}\n\n"
readme_content += f"**Files:**\n"
# Check file existence
document_file = doc_path / "document.txt"
pagination_file = doc_path / "pagination.txt"
embeddings_file = doc_path / "embedding.pkl"
readme_content += f"- `{folder_name}/document.txt`"
if document_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/pagination.txt`"
if pagination_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/embedding.pkl`"
if embeddings_file.exists():
readme_content += ""
readme_content += "\n\n"
# Add document preview
if document_file.exists():
readme_content += f"**Content Preview (first 10 lines):**\n\n```\n"
preview = get_document_preview(str(document_file), 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**Content Preview:** Not available\n\n"
# Show rename information
original_name = folder["original_folder_name"]
if original_name != folder_name:
readme_content += f"**Original Name:** `{original_name}` → `{folder_name}`\n\n"
readme_content += "---\n\n"
# Write the README file
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info(f"Generated README: {readme_path}")
return str(readme_path)
def _get_robot_dir(project_path: Path, bot_id: str) -> Path:
return project_path / "robot" / bot_id
def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False, project_path: Path = Path("projects"), skills: Optional[List[str]] = None) -> str:
"""
Ensure the robot project directory exists without rebuilding it.
Args:
dataset_ids: List of source project IDs, deprecated and kept for compatibility
bot_id: Robot ID
force_rebuild: Ignored and kept for compatibility
skills: List of skill filenames
Returns:
str: Robot project directory path
"""
def _skill_matches_autoload(skill: str, autoload_skill_name: str) -> bool:
normalized_skill = Path(skill.lstrip("@")).name.lower()
normalized_autoload_skill_name = autoload_skill_name.lower()
if re.search(re.escape(normalized_autoload_skill_name), normalized_skill):
return True
autoload_prefix = normalized_autoload_skill_name.split("-")[0]
return bool(autoload_prefix and re.search(re.escape(autoload_prefix), normalized_skill))
skills = list(skills or [])
if os.path.isabs(settings.SKILLS_DIR):
autoload_skills_dir = Path(settings.SKILLS_DIR) / "autoload" / settings.PROJECT_NAME
else:
autoload_skills_dir = project_path.parent / settings.SKILLS_DIR / "autoload" / settings.PROJECT_NAME
if autoload_skills_dir.exists():
for item in sorted(autoload_skills_dir.iterdir()):
if not item.is_dir() or any(_skill_matches_autoload(skill, item.name) for skill in skills):
continue
skill_path = f"@skills/autoload/{settings.PROJECT_NAME}/{item.name}"
skills.append(skill_path)
logger.info(f"Auto loaded skill '{skill_path}' from {autoload_skills_dir}")
else:
logger.warning(f"Autoload skills directory does not exist: {autoload_skills_dir}")
logger.info(f"Ensuring robot project exists: {bot_id}, skills: {skills}")
# Create the robot directory structure if it does not exist
robot_dir = _get_robot_dir(project_path, bot_id)
dataset_dir = robot_dir / "datasets"
scripts_dir = robot_dir / "scripts"
download_dir = robot_dir / "download"
# Create directories without deleting existing content
robot_dir.mkdir(parents=True, exist_ok=True)
dataset_dir.mkdir(parents=True, exist_ok=True)
scripts_dir.mkdir(parents=True, exist_ok=True)
download_dir.mkdir(parents=True, exist_ok=True)
# Remove all symlinks under dataset_dir
for item in dataset_dir.iterdir():
if item.is_symlink():
item.unlink()
logger.info(f"Removed from dataset_dir: {item}")
# Create symlinks for dataset_ids
docs_datasets_dir = project_path / "docs" / "datasets"
for dataset_id in dataset_ids:
source = docs_datasets_dir / dataset_id
target = dataset_dir / dataset_id
if source.exists():
os.symlink(source.resolve(), target)
logger.info(f"Created symlink: {target} -> {source.resolve()}")
else:
logger.warning(f"Dataset source not found, skipping symlink: {source}")
# Process skills and keep them updated each time
if skills:
_extract_skills_to_robot(bot_id, skills, project_path)
logger.info(f"Robot project ready: {robot_dir}")
return str(robot_dir)
if __name__ == "__main__":
# Test code
test_dataset_ids = ["test-project-1", "test-project-2"]
test_bot_id = "test-robot-001"
robot_dir = create_robot_project(test_dataset_ids, test_bot_id)
logger.info(f"Created robot project at: {robot_dir}")
def _extract_skills_to_robot(bot_id: str, skills: List[str], project_path: Path) -> None:
"""
Copy skills into the robot project's skills directory.
- If a full path is provided (for example "projects/uploads/xxx/skills/rag-retrieve_2.zip"), use it directly
- If a simple name is provided (for example "rag-retrieve"), search in this priority order:
1. projects/uploads/{bot_id}/skills/
2. skills/{PROJECT_NAME}/
- If a repository-relative path starting with @ is provided
(for example "@skills/autoload/support/rag-retrieve"), resolve it directly from the repo root
Search priority for skill directories: first projects/uploads/{bot_id}/skills/, then skills/common,
and finally skills/{PROJECT_NAME}
Args:
bot_id: Robot ID
skills: List of skill filenames (for example ["rag-retrieve", "@skills/autoload/support/rag-retrieve", "projects/uploads/{bot_id}/skills/rag-retrieve"])
project_path: Project path
"""
# Skill source directories in priority order
repo_root = Path(__file__).resolve().parent.parent
official_skills_dir = repo_root / "skills" / settings.PROJECT_NAME
autoload_skills_dir = repo_root / "skills" / "autoload" / settings.PROJECT_NAME
if not official_skills_dir.exists():
logger.warning(f"Official skills directory does not exist: {official_skills_dir}")
skills_source_dirs = [
project_path / "uploads" / bot_id / "skills",
repo_root / "skills" / "common",
official_skills_dir,
]
managed_skill_dirs = [*skills_source_dirs, autoload_skills_dir]
skills_target_dir = project_path / "robot" / bot_id / "skills"
skills_target_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Copying skills to {skills_target_dir}")
managed_skill_names = set()
for base_dir in managed_skill_dirs:
if not base_dir.exists():
continue
for item in base_dir.iterdir():
if item.is_dir():
managed_skill_names.add(item.name)
# Remove extra managed skill directories that are no longer expected
expected_skill_names = {Path(skill.lstrip("@")).name for skill in skills}
if skills_target_dir.exists():
for item in skills_target_dir.iterdir():
if not item.is_dir() or item.name in expected_skill_names:
continue
if item.name in managed_skill_names:
logger.info(f" Removing managed stale skill directory: {item}")
shutil.rmtree(item)
else:
logger.info(f" Keeping unmanaged skill directory: {item}")
for skill in skills:
skill_name = Path(skill.lstrip("@")).name
target_dir = skills_target_dir / skill_name
source_dir = None
if skill.startswith("@"):
candidate_dir = repo_root / skill.lstrip("@")
if candidate_dir.exists():
source_dir = candidate_dir
logger.info(f" Found skill '{skill}' at {candidate_dir}")
# For simple names, search multiple directories in priority order
if source_dir is None:
for base_dir in skills_source_dirs:
candidate_dir = base_dir / skill
if candidate_dir.exists():
source_dir = candidate_dir
logger.info(f" Found skill '{skill}' in {base_dir}")
break
if source_dir is None:
logger.warning(f" Skill directory '{skill}' not found in any source directory: {[str(d) for d in skills_source_dirs]}")
continue
try:
shutil.copytree(source_dir, target_dir, dirs_exist_ok=True)
logger.info(f" Synced: {source_dir} -> {target_dir}")
except Exception as e:
logger.error(f" Failed to copy {source_dir}: {e}")