新增dataset逻辑

This commit is contained in:
朱潮 2025-10-30 21:19:39 +08:00
parent 0ac79cba48
commit 29a3a17d28
7 changed files with 648 additions and 76 deletions

View File

@ -201,16 +201,16 @@ async def process_files_async_endpoint(request: QueueTaskRequest, authorization:
/api/v1/files/process 功能相同但使用队列异步处理 /api/v1/files/process 功能相同但使用队列异步处理
Args: Args:
request: QueueTaskRequest containing unique_id, files, system_prompt, mcp_settings, and queue options request: QueueTaskRequest containing dataset_id, files, system_prompt, mcp_settings, and queue options
authorization: Authorization header containing API key (Bearer <API_KEY>) authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns: Returns:
QueueTaskResponse: Processing result with task ID for tracking QueueTaskResponse: Processing result with task ID for tracking
""" """
try: try:
unique_id = request.unique_id dataset_id = request.dataset_id
if not unique_id: if not dataset_id:
raise HTTPException(status_code=400, detail="unique_id is required") raise HTTPException(status_code=400, detail="dataset_id is required")
# 估算处理时间(基于文件数量) # 估算处理时间(基于文件数量)
estimated_time = 0 estimated_time = 0
@ -220,7 +220,7 @@ async def process_files_async_endpoint(request: QueueTaskRequest, authorization:
# 提交异步任务 # 提交异步任务
task_id = queue_manager.enqueue_multiple_files( task_id = queue_manager.enqueue_multiple_files(
project_id=unique_id, project_id=dataset_id,
file_paths=[], file_paths=[],
original_filenames=[] original_filenames=[]
) )
@ -230,13 +230,13 @@ async def process_files_async_endpoint(request: QueueTaskRequest, authorization:
task_id = str(uuid.uuid4()) task_id = str(uuid.uuid4())
task_status_store.set_status( task_status_store.set_status(
task_id=task_id, task_id=task_id,
unique_id=unique_id, unique_id=dataset_id,
status="pending" status="pending"
) )
# 提交异步任务 # 提交异步任务
task = process_files_async( task = process_files_async(
unique_id=unique_id, unique_id=dataset_id,
files=request.files, files=request.files,
system_prompt=request.system_prompt, system_prompt=request.system_prompt,
mcp_settings=request.mcp_settings, mcp_settings=request.mcp_settings,
@ -245,8 +245,8 @@ async def process_files_async_endpoint(request: QueueTaskRequest, authorization:
return QueueTaskResponse( return QueueTaskResponse(
success=True, success=True,
message=f"文件处理任务已提交到队列项目ID: {unique_id}", message=f"文件处理任务已提交到队列项目ID: {dataset_id}",
unique_id=unique_id, unique_id=dataset_id,
task_id=task_id, # 使用我们自己的task_id task_id=task_id, # 使用我们自己的task_id
task_status="pending", task_status="pending",
estimated_processing_time=estimated_time estimated_processing_time=estimated_time
@ -313,12 +313,12 @@ async def delete_task(task_id: str):
@app.get("/api/v1/tasks") @app.get("/api/v1/tasks")
async def list_tasks(status: Optional[str] = None, unique_id: Optional[str] = None, limit: int = 100): async def list_tasks(status: Optional[str] = None, dataset_id: Optional[str] = None, limit: int = 100):
"""列出任务,支持筛选""" """列出任务,支持筛选"""
try: try:
if status or unique_id: if status or dataset_id:
# 使用搜索功能 # 使用搜索功能
tasks = task_status_store.search_tasks(status=status, unique_id=unique_id, limit=limit) tasks = task_status_store.search_tasks(status=status, unique_id=dataset_id, limit=limit)
else: else:
# 获取所有任务 # 获取所有任务
all_tasks = task_status_store.list_all() all_tasks = task_status_store.list_all()
@ -331,7 +331,7 @@ async def list_tasks(status: Optional[str] = None, unique_id: Optional[str] = No
"tasks": tasks, "tasks": tasks,
"filters": { "filters": {
"status": status, "status": status,
"unique_id": unique_id, "dataset_id": dataset_id,
"limit": limit "limit": limit
} }
} }
@ -376,16 +376,16 @@ async def cleanup_tasks(older_than_days: int = 7):
raise HTTPException(status_code=500, detail=f"清理任务记录失败: {str(e)}") raise HTTPException(status_code=500, detail=f"清理任务记录失败: {str(e)}")
@app.get("/api/v1/projects/{unique_id}/tasks") @app.get("/api/v1/projects/{dataset_id}/tasks")
async def get_project_tasks(unique_id: str): async def get_project_tasks(dataset_id: str):
"""获取指定项目的所有任务""" """获取指定项目的所有任务"""
try: try:
tasks = task_status_store.get_by_unique_id(unique_id) tasks = task_status_store.get_by_unique_id(dataset_id)
return { return {
"success": True, "success": True,
"message": "项目任务获取成功", "message": "项目任务获取成功",
"unique_id": unique_id, "dataset_id": dataset_id,
"total_tasks": len(tasks), "total_tasks": len(tasks),
"tasks": tasks "tasks": tasks
} }
@ -395,16 +395,16 @@ async def get_project_tasks(unique_id: str):
raise HTTPException(status_code=500, detail=f"获取项目任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取项目任务失败: {str(e)}")
@app.post("/api/v1/files/{unique_id}/cleanup/async") @app.post("/api/v1/files/{dataset_id}/cleanup/async")
async def cleanup_project_async_endpoint(unique_id: str, remove_all: bool = False): async def cleanup_project_async_endpoint(dataset_id: str, remove_all: bool = False):
"""异步清理项目文件""" """异步清理项目文件"""
try: try:
task = cleanup_project_async(unique_id=unique_id, remove_all=remove_all) task = cleanup_project_async(unique_id=dataset_id, remove_all=remove_all)
return { return {
"success": True, "success": True,
"message": f"项目清理任务已提交到队列项目ID: {unique_id}", "message": f"项目清理任务已提交到队列项目ID: {dataset_id}",
"unique_id": unique_id, "dataset_id": dataset_id,
"task_id": task.id, "task_id": task.id,
"action": "remove_all" if remove_all else "cleanup_logs" "action": "remove_all" if remove_all else "cleanup_logs"
} }
@ -419,11 +419,25 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] =
Chat completions API similar to OpenAI, supports both streaming and non-streaming Chat completions API similar to OpenAI, supports both streaming and non-streaming
Args: Args:
request: ChatRequest containing messages, model, dataset with unique_id, system_prompt, mcp_settings, and files request: ChatRequest containing messages, model, dataset_ids (required list), required bot_id, system_prompt, mcp_settings, and files
authorization: Authorization header containing API key (Bearer <API_KEY>) authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns: Returns:
Union[ChatResponse, StreamingResponse]: Chat completion response or stream Union[ChatResponse, StreamingResponse]: Chat completion response or stream
Notes:
- dataset_ids: 必需参数必须是项目ID列表单个项目也使用数组格式
- bot_id: 必需参数机器人ID用于创建项目目录
- 所有请求都会创建机器人项目目录projects/robot/{bot_id}/
- 支持多知识库合并自动处理文件夹重名冲突
Required Parameters:
- dataset_ids: List[str] - 源知识库项目ID列表单个项目也使用数组格式
- bot_id: str - 目标机器人项目ID
Example:
{"dataset_ids": ["project-123"], "bot_id": "my-bot-001"}
{"dataset_ids": ["project-123", "project-456"], "bot_id": "my-bot-002"}
""" """
try: try:
# 从Authorization header中提取API key # 从Authorization header中提取API key
@ -435,23 +449,27 @@ async def chat_completions(request: ChatRequest, authorization: Optional[str] =
else: else:
api_key = authorization api_key = authorization
# 获取unique_id # 获取dataset_ids必需参数必须是数组
unique_id = request.unique_id dataset_ids_list = request.dataset_ids
if not unique_id: if not dataset_ids_list:
raise HTTPException(status_code=400, detail="unique_id is required") raise HTTPException(status_code=400, detail="dataset_ids is required and must be a non-empty list")
# 使用unique_id获取项目目录 # 获取bot_id必需参数
project_dir = os.path.join("projects", "data", unique_id) bot_id = request.bot_id
if not os.path.exists(project_dir): if not bot_id:
project_dir = "" raise HTTPException(status_code=400, detail="bot_id is required")
# 创建机器人目录并合并数据
from utils.multi_project_manager import create_robot_project
project_dir = create_robot_project(dataset_ids_list, bot_id)
# 收集额外参数作为 generate_cfg # 收集额外参数作为 generate_cfg
exclude_fields = {'messages', 'model', 'model_server', 'unique_id', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type'} exclude_fields = {'messages', 'model', 'model_server', 'dataset_ids', 'language', 'tool_response', 'system_prompt', 'mcp_settings' ,'stream', 'robot_type', 'bot_id'}
generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields} generate_cfg = {k: v for k, v in request.model_dump().items() if k not in exclude_fields}
# 从全局管理器获取或创建助手实例配置读取逻辑已在agent_manager内部处理 # 从全局管理器获取或创建助手实例配置读取逻辑已在agent_manager内部处理
agent = await agent_manager.get_or_create_agent( agent = await agent_manager.get_or_create_agent(
unique_id=unique_id, bot_id=bot_id,
project_dir=project_dir, project_dir=project_dir,
model_name=request.model, model_name=request.model,
api_key=api_key, api_key=api_key,
@ -636,27 +654,27 @@ async def get_cached_projects():
@app.post("/system/remove-project-cache") @app.post("/system/remove-project-cache")
async def remove_project_cache(unique_id: str): async def remove_project_cache(dataset_id: str):
"""移除特定项目的缓存""" """移除特定项目的缓存"""
try: try:
removed_count = agent_manager.remove_cache_by_unique_id(unique_id) removed_count = agent_manager.remove_cache_by_unique_id(dataset_id)
if removed_count > 0: if removed_count > 0:
return {"message": f"项目缓存移除成功: {unique_id}", "removed_count": removed_count} return {"message": f"项目缓存移除成功: {dataset_id}", "removed_count": removed_count}
else: else:
return {"message": f"未找到项目缓存: {unique_id}", "removed_count": 0} return {"message": f"未找到项目缓存: {dataset_id}", "removed_count": 0}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"移除项目缓存失败: {str(e)}") raise HTTPException(status_code=500, detail=f"移除项目缓存失败: {str(e)}")
@app.get("/api/v1/files/{unique_id}/status") @app.get("/api/v1/files/{dataset_id}/status")
async def get_files_processing_status(unique_id: str): async def get_files_processing_status(dataset_id: str):
"""获取项目的文件处理状态""" """获取项目的文件处理状态"""
try: try:
# Load processed files log # Load processed files log
processed_log = load_processed_files_log(unique_id) processed_log = load_processed_files_log(dataset_id)
# Get project directory info # Get project directory info
project_dir = os.path.join("projects", "data", unique_id) project_dir = os.path.join("projects", "data", dataset_id)
project_exists = os.path.exists(project_dir) project_exists = os.path.exists(project_dir)
# Collect document.txt files # Collect document.txt files
@ -668,27 +686,27 @@ async def get_files_processing_status(unique_id: str):
document_files.append(os.path.join(root, file)) document_files.append(os.path.join(root, file))
return { return {
"unique_id": unique_id, "dataset_id": dataset_id,
"project_exists": project_exists, "project_exists": project_exists,
"processed_files_count": len(processed_log), "processed_files_count": len(processed_log),
"processed_files": processed_log, "processed_files": processed_log,
"document_files_count": len(document_files), "document_files_count": len(document_files),
"document_files": document_files, "document_files": document_files,
"log_file_exists": os.path.exists(os.path.join("projects", "data", unique_id, "processed_files.json")) "log_file_exists": os.path.exists(os.path.join("projects", "data", dataset_id, "processed_files.json"))
} }
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"获取文件处理状态失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取文件处理状态失败: {str(e)}")
@app.post("/api/v1/files/{unique_id}/reset") @app.post("/api/v1/files/{dataset_id}/reset")
async def reset_files_processing(unique_id: str): async def reset_files_processing(dataset_id: str):
"""重置项目的文件处理状态,删除处理日志和所有文件""" """重置项目的文件处理状态,删除处理日志和所有文件"""
try: try:
project_dir = os.path.join("projects", "data", unique_id) project_dir = os.path.join("projects", "data", dataset_id)
log_file = os.path.join("projects", "data", unique_id, "processed_files.json") log_file = os.path.join("projects", "data", dataset_id, "processed_files.json")
# Load processed log to know what files to remove # Load processed log to know what files to remove
processed_log = load_processed_files_log(unique_id) processed_log = load_processed_files_log(dataset_id)
removed_files = [] removed_files = []
# Remove all processed files and their dataset directories # Remove all processed files and their dataset directories
@ -702,12 +720,12 @@ async def reset_files_processing(unique_id: str):
if 'key' in file_info: if 'key' in file_info:
# Remove dataset directory by key # Remove dataset directory by key
key = file_info['key'] key = file_info['key']
if remove_dataset_directory_by_key(unique_id, key): if remove_dataset_directory_by_key(dataset_id, key):
removed_files.append(f"dataset/{key}") removed_files.append(f"dataset/{key}")
elif 'filename' in file_info: elif 'filename' in file_info:
# Fallback to old filename-based structure # Fallback to old filename-based structure
filename_without_ext = os.path.splitext(file_info['filename'])[0] filename_without_ext = os.path.splitext(file_info['filename'])[0]
dataset_dir = os.path.join("projects", "data", unique_id, "dataset", filename_without_ext) dataset_dir = os.path.join("projects", "data", dataset_id, "dataset", filename_without_ext)
if remove_file_or_directory(dataset_dir): if remove_file_or_directory(dataset_dir):
removed_files.append(dataset_dir) removed_files.append(dataset_dir)
@ -736,7 +754,7 @@ async def reset_files_processing(unique_id: str):
removed_files.append(readme_file) removed_files.append(readme_file)
return { return {
"message": f"文件处理状态重置成功: {unique_id}", "message": f"文件处理状态重置成功: {dataset_id}",
"removed_files_count": len(removed_files), "removed_files_count": len(removed_files),
"removed_files": removed_files "removed_files": removed_files
} }

View File

@ -5,7 +5,7 @@
"command": "python", "command": "python",
"args": [ "args": [
"./mcp/rag_retrieve_server.py", "./mcp/rag_retrieve_server.py",
"{unique_id}" "{bot_id}"
] ]
} }
} }

View File

@ -80,6 +80,15 @@ from .prompt_loader import (
load_system_prompt, load_system_prompt,
) )
from .multi_project_manager import (
create_robot_project,
get_robot_project_info,
cleanup_robot_project,
get_unique_folder_name,
copy_dataset_folder,
generate_robot_readme
)
__all__ = [ __all__ = [
# file_utils # file_utils
'download_file', 'download_file',
@ -148,4 +157,12 @@ __all__ = [
# prompt_loader # prompt_loader
'load_system_prompt', 'load_system_prompt',
# multi_project_manager
'create_robot_project',
'get_robot_project_info',
'cleanup_robot_project',
'get_unique_folder_name',
'copy_dataset_folder',
'generate_robot_readme',
] ]

View File

@ -43,7 +43,8 @@ class ChatRequest(BaseModel):
messages: List[Message] messages: List[Message]
model: str = "qwen3-next" model: str = "qwen3-next"
model_server: str = "" model_server: str = ""
unique_id: Optional[str] = None dataset_ids: List[str]
bot_id: str
stream: Optional[bool] = False stream: Optional[bool] = False
language: Optional[str] = "ja" language: Optional[str] = "ja"
tool_response: Optional[bool] = False tool_response: Optional[bool] = False

View File

@ -39,11 +39,11 @@ class FileLoadedAgentManager:
self.creation_times: Dict[str, float] = {} # 创建时间记录 self.creation_times: Dict[str, float] = {} # 创建时间记录
self.max_cached_agents = max_cached_agents self.max_cached_agents = max_cached_agents
def _get_cache_key(self, unique_id: str, system_prompt: str = None, mcp_settings: List[Dict] = None) -> str: def _get_cache_key(self, bot_id: str, system_prompt: str = None, mcp_settings: List[Dict] = None) -> str:
"""获取包含 unique_id、system_prompt 和 mcp_settings 的哈希值作为缓存键 """获取包含 bot_id、system_prompt 和 mcp_settings 的哈希值作为缓存键
Args: Args:
unique_id: 项目的唯一标识符 bot_id: 机器人项目ID
system_prompt: 系统提示词 system_prompt: 系统提示词
mcp_settings: MCP设置列表 mcp_settings: MCP设置列表
@ -52,7 +52,7 @@ class FileLoadedAgentManager:
""" """
# 构建包含所有相关参数的字符串 # 构建包含所有相关参数的字符串
cache_data = { cache_data = {
'unique_id': unique_id, 'bot_id': bot_id,
'system_prompt': system_prompt or '', 'system_prompt': system_prompt or '',
'mcp_settings': json.dumps(mcp_settings or [], sort_keys=True) 'mcp_settings': json.dumps(mcp_settings or [], sort_keys=True)
} }
@ -91,7 +91,7 @@ class FileLoadedAgentManager:
logger.info(f"已清理 {removed_count} 个过期的助手实例缓存") logger.info(f"已清理 {removed_count} 个过期的助手实例缓存")
async def get_or_create_agent(self, async def get_or_create_agent(self,
unique_id: str, bot_id: str,
project_dir: str, project_dir: str,
model_name: str = "qwen3-next", model_name: str = "qwen3-next",
api_key: Optional[str] = None, api_key: Optional[str] = None,
@ -104,7 +104,7 @@ class FileLoadedAgentManager:
"""获取或创建文件预加载的助手实例 """获取或创建文件预加载的助手实例
Args: Args:
unique_id: 项目的唯一标识符 bot_id: 项目的唯一标识符
project_dir: 项目目录路径用于读取system_prompt.md和mcp_settings.json project_dir: 项目目录路径用于读取system_prompt.md和mcp_settings.json
model_name: 模型名称 model_name: 模型名称
api_key: API 密钥 api_key: API 密钥
@ -121,10 +121,10 @@ class FileLoadedAgentManager:
import os import os
# 实现参数优先级逻辑:传入参数 > 项目配置 > 默认配置 # 实现参数优先级逻辑:传入参数 > 项目配置 > 默认配置
final_system_prompt = load_system_prompt(project_dir, language, system_prompt, robot_type, unique_id) final_system_prompt = load_system_prompt(project_dir, language, system_prompt, robot_type, bot_id)
final_mcp_settings = load_mcp_settings(project_dir, mcp_settings, unique_id, robot_type) final_mcp_settings = load_mcp_settings(project_dir, mcp_settings, bot_id, robot_type)
cache_key = self._get_cache_key(unique_id, final_system_prompt, final_mcp_settings) cache_key = self._get_cache_key(bot_id, final_system_prompt, final_mcp_settings)
# 检查是否已存在该助手实例 # 检查是否已存在该助手实例
if cache_key in self.agents: if cache_key in self.agents:
@ -134,14 +134,14 @@ class FileLoadedAgentManager:
# 动态更新 LLM 配置和系统设置(如果参数有变化) # 动态更新 LLM 配置和系统设置(如果参数有变化)
update_agent_llm(agent, model_name, api_key, model_server, generate_cfg) update_agent_llm(agent, model_name, api_key, model_server, generate_cfg)
logger.info(f"复用现有的助手实例缓存: {cache_key} (unique_id: {unique_id}") logger.info(f"复用现有的助手实例缓存: {cache_key} (bot_id: {bot_id}")
return agent return agent
# 清理过期实例 # 清理过期实例
self._cleanup_old_agents() self._cleanup_old_agents()
# 创建新的助手实例,预加载文件 # 创建新的助手实例,预加载文件
logger.info(f"创建新的助手实例缓存: {cache_key}, unique_id: {unique_id}") logger.info(f"创建新的助手实例缓存: {cache_key}, bot_id: {bot_id}")
current_time = time.time() current_time = time.time()
agent = init_modified_agent_service_with_files( agent = init_modified_agent_service_with_files(
@ -155,7 +155,7 @@ class FileLoadedAgentManager:
# 缓存实例 # 缓存实例
self.agents[cache_key] = agent self.agents[cache_key] = agent
self.unique_ids[cache_key] = unique_id self.unique_ids[cache_key] = bot_id
self.access_times[cache_key] = current_time self.access_times[cache_key] = current_time
self.creation_times[cache_key] = current_time self.creation_times[cache_key] = current_time

View File

@ -0,0 +1,536 @@
#!/usr/bin/env python3
"""
多项目管理器处理多个知识库项目的合并
"""
import os
import shutil
import json
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
from utils.file_utils import get_document_preview
def generate_robot_directory_tree(robot_dir: str, robot_id: str, max_depth: int = 3) -> str:
"""
生成机器人项目的目录树结构
Args:
robot_dir: 机器人项目目录路径
robot_id: 机器人ID
max_depth: 最大深度
Returns:
str: 目录树字符串
"""
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
if depth > max_depth:
return []
lines = []
try:
entries = sorted(os.listdir(path))
# 分离目录和文件
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
entries = dirs + files
for i, entry in enumerate(entries):
entry_path = os.path.join(path, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
# 选择适当的树形符号
if is_last_entry:
connector = "└── "
new_prefix = prefix + " "
else:
connector = "├── "
new_prefix = prefix + ""
# 添加条目行
line = prefix + connector + entry
if is_dir:
line += "/"
lines.append(line)
# 递归添加子目录
if is_dir and depth < max_depth:
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
lines.extend(sub_lines)
except PermissionError:
lines.append(prefix + "└── [Permission Denied]")
except Exception as e:
lines.append(prefix + "└── [Error: " + str(e) + "]")
return lines
# 从dataset目录开始构建树
dataset_dir = os.path.join(robot_dir, "dataset")
tree_lines = []
if not os.path.exists(dataset_dir):
return "└── [No dataset directory found]"
try:
entries = sorted(os.listdir(dataset_dir))
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
entries = dirs + files
if not entries:
tree_lines.append("└── [Empty dataset directory]")
else:
for i, entry in enumerate(entries):
entry_path = os.path.join(dataset_dir, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
if is_last_entry:
connector = "└── "
prefix = " "
else:
connector = "├── "
prefix = ""
line = connector + entry
if is_dir:
line += "/"
tree_lines.append(line)
# 递归添加子目录
if is_dir:
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
tree_lines.extend(sub_lines)
except Exception as e:
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
return "\n".join(tree_lines)
def get_unique_folder_name(target_dir: Path, original_name: str) -> str:
"""
获取唯一的文件夹名称如果存在重名则添加数字后缀
Args:
target_dir: 目标目录
original_name: 原始文件夹名称
Returns:
str: 唯一的文件夹名称
"""
if not (target_dir / original_name).exists():
return original_name
# 存在重名,添加数字后缀
counter = 1
while True:
new_name = f"{original_name}_{counter}"
if not (target_dir / new_name).exists():
return new_name
counter += 1
def copy_dataset_folder(source_project_id: str, target_dataset_dir: Path, folder_name: str) -> Dict:
"""
复制单个项目的dataset文件夹到目标目录
Args:
source_project_id: 源项目ID
target_dataset_dir: 目标dataset目录
folder_name: 要复制的文件夹名称
Returns:
Dict: 复制结果
"""
result = {
"success": False,
"source_path": "",
"target_path": "",
"original_folder_name": folder_name,
"final_folder_name": folder_name,
"error": None
}
try:
source_folder = Path("projects") / "data" / source_project_id / "dataset" / folder_name
result["source_path"] = str(source_folder)
if not source_folder.exists():
result["error"] = f"Source folder does not exist: {source_folder}"
return result
# 处理重名冲突
unique_folder_name = get_unique_folder_name(target_dataset_dir, folder_name)
result["final_folder_name"] = unique_folder_name
target_folder = target_dataset_dir / unique_folder_name
result["target_path"] = str(target_folder)
# 复制文件夹
shutil.copytree(source_folder, target_folder)
result["success"] = True
print(f" Copied: {source_folder} -> {target_folder}")
except Exception as e:
result["error"] = str(e)
print(f" Error copying {folder_name}: {str(e)}")
return result
def generate_robot_readme(robot_id: str, dataset_ids: List[str], copy_results: List[Dict]) -> str:
"""
生成机器人项目的README.md文件
Args:
robot_id: 机器人ID
dataset_ids: 源项目ID列表
copy_results: 复制结果列表
Returns:
str: README.md文件路径
"""
readme_path = Path("projects") / "robot" / robot_id / "README.md"
readme_path.parent.mkdir(parents=True, exist_ok=True)
robot_dir = Path("projects") / "robot" / robot_id
# 统计信息
total_folders = len(copy_results)
successful_copies = sum(1 for r in copy_results if r["success"])
failed_copies = total_folders - successful_copies
# 按源项目分组
project_groups = {}
for result in copy_results:
if result["success"]:
source_project = result["source_path"].split("/")[2] # projects/data/{project_id}/dataset/...
if source_project not in project_groups:
project_groups[source_project] = []
project_groups[source_project].append(result)
readme_content = "\n## 目录结构\n\n"
# 生成实际目录树
readme_content += "```\n"
readme_content += generate_robot_directory_tree(str(robot_dir), robot_id)
readme_content += "\n```\n\n"
readme_content += "## 数据集详情\n\n"
dataset_dir = robot_dir / "dataset"
if not dataset_dir.exists():
readme_content += "No dataset files available.\n"
else:
# 获取所有文档目录
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = dataset_dir / item
if item_path.is_dir():
doc_dirs.append(item)
except Exception as e:
print(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
# 按源项目分组显示文档
for project_id, folders in project_groups.items():
for folder in folders:
folder_name = folder["final_folder_name"]
doc_path = dataset_dir / folder_name
readme_content += f"#### {folder_name}\n\n"
readme_content += f"**Files:**\n"
# 检查文件存在性
document_file = doc_path / "document.txt"
pagination_file = doc_path / "pagination.txt"
embeddings_file = doc_path / "embedding.pkl"
readme_content += f"- `{folder_name}/document.txt`"
if document_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/pagination.txt`"
if pagination_file.exists():
readme_content += ""
readme_content += "\n"
readme_content += f"- `{folder_name}/embedding.pkl`"
if embeddings_file.exists():
readme_content += ""
readme_content += "\n\n"
# 添加文档预览
if document_file.exists():
readme_content += f"**内容预览 (前10行):**\n\n```\n"
preview = get_document_preview(str(document_file), 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**内容预览:** 不可用\n\n"
# 显示重命名信息
original_name = folder["original_folder_name"]
if original_name != folder_name:
readme_content += f"**原始名称:** `{original_name}` → `{folder_name}`\n\n"
readme_content += "---\n\n"
# 写入README文件
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
print(f"Generated README: {readme_path}")
return str(readme_path)
def should_rebuild_robot_project(dataset_ids: List[str], bot_id: str) -> bool:
"""
检查是否需要重建机器人项目
1. 检查机器人项目是否存在
2. 检查是否有新增的dataset_id
3. 检查processing_log.json文件是否更新
Args:
dataset_ids: 源项目ID列表
bot_id: 机器人ID
Returns:
bool: 是否需要重建
"""
robot_dir = Path("projects") / "robot" / bot_id
# 如果机器人项目不存在,需要创建
if not robot_dir.exists():
print(f"Robot project does not exist, need to create: {bot_id}")
return True
# 检查机器人项目的配置信息
config_file = robot_dir / "robot_config.json"
if not config_file.exists():
print(f"Robot config file not found, need to rebuild: {bot_id}")
return True
# 读取配置信息
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
cached_dataset_ids = set(config.get("dataset_ids", []))
except Exception as e:
print(f"Error reading robot config: {e}, need to rebuild")
return True
# 检查dataset_ids是否有变化
current_dataset_ids = set(dataset_ids)
# 如果有新增的dataset_id
new_ids = current_dataset_ids - cached_dataset_ids
if new_ids:
print(f"Found new dataset_ids: {new_ids}, need to rebuild")
return True
# 如果有删除的dataset_id
removed_ids = cached_dataset_ids - current_dataset_ids
if removed_ids:
print(f"Removed dataset_ids: {removed_ids}, need to rebuild")
return True
# 获取机器人项目的最后修改时间
robot_mod_time = robot_dir.stat().st_mtime
# 检查每个源项目的processing_log.json文件
for source_project_id in dataset_ids:
log_file = Path("projects") / "data" / source_project_id / "processing_log.json"
if not log_file.exists():
print(f"Processing log file not found for project {source_project_id}, will rebuild")
return True
log_mod_time = log_file.stat().st_mtime
# 如果任何一个processing_log.json文件比机器人项目新需要重建
if log_mod_time > robot_mod_time:
print(f"Processing log updated for project {source_project_id}, need to rebuild")
return True
print(f"Robot project {bot_id} is up to date, no rebuild needed")
return False
def create_robot_project(dataset_ids: List[str], bot_id: str, force_rebuild: bool = False) -> str:
"""
创建机器人项目合并多个源项目的dataset文件夹
Args:
dataset_ids: 源项目ID列表
bot_id: 机器人ID
force_rebuild: 是否强制重建
Returns:
str: 机器人项目目录路径
"""
print(f"Creating robot project: {bot_id} from sources: {dataset_ids}")
# 检查是否需要重建
if not force_rebuild and not should_rebuild_robot_project(dataset_ids, bot_id):
robot_dir = Path("projects") / "robot" / bot_id
print(f"Using existing robot project: {robot_dir}")
return str(robot_dir)
# 创建机器人目录结构
robot_dir = Path("projects") / "robot" / bot_id
dataset_dir = robot_dir / "dataset"
# 清理已存在的目录(如果需要)
if robot_dir.exists():
print(f"Robot directory already exists, cleaning up: {robot_dir}")
shutil.rmtree(robot_dir)
robot_dir.mkdir(parents=True, exist_ok=True)
dataset_dir.mkdir(parents=True, exist_ok=True)
copy_results = []
# 遍历每个源项目
for source_project_id in dataset_ids:
print(f"\nProcessing source project: {source_project_id}")
source_dataset_dir = Path("projects") / "data" / source_project_id / "dataset"
if not source_dataset_dir.exists():
print(f" Warning: Dataset directory not found for project {source_project_id}")
continue
# 获取所有子文件夹
folders = [f for f in source_dataset_dir.iterdir() if f.is_dir()]
if not folders:
print(f" Warning: No folders found in dataset directory for project {source_project_id}")
continue
# 复制每个文件夹
for folder in folders:
result = copy_dataset_folder(source_project_id, dataset_dir, folder.name)
copy_results.append(result)
# 保存配置信息
config_file = robot_dir / "robot_config.json"
config_data = {
"dataset_ids": dataset_ids,
"created_at": datetime.now().isoformat(),
"total_folders": len(copy_results),
"successful_copies": sum(1 for r in copy_results if r["success"])
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
# 生成README
readme_path = generate_robot_readme(bot_id, dataset_ids, copy_results)
# 统计信息
successful_copies = sum(1 for r in copy_results if r["success"])
print(f"\nRobot project creation completed:")
print(f" Robot directory: {robot_dir}")
print(f" Total folders processed: {len(copy_results)}")
print(f" Successful copies: {successful_copies}")
print(f" Config saved: {config_file}")
print(f" README generated: {readme_path}")
return str(robot_dir)
def get_robot_project_info(bot_id: str) -> Dict:
"""
获取机器人项目信息
Args:
bot_id: 机器人ID
Returns:
Dict: 机器人项目信息
"""
robot_dir = Path("projects") / "robot" / bot_id
if not robot_dir.exists():
return {
"exists": False,
"bot_id": bot_id,
"error": "Robot project does not exist"
}
dataset_dir = robot_dir / "dataset"
readme_path = robot_dir / "README.md"
# 统计文件夹数量
folder_count = 0
total_size = 0
if dataset_dir.exists():
for item in dataset_dir.iterdir():
if item.is_dir():
folder_count += 1
# 计算文件夹大小
for file_path in item.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
return {
"exists": True,
"bot_id": bot_id,
"robot_dir": str(robot_dir),
"dataset_dir": str(dataset_dir),
"readme_exists": readme_path.exists(),
"folder_count": folder_count,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2)
}
def cleanup_robot_project(bot_id: str) -> bool:
"""
清理机器人项目
Args:
bot_id: 机器人ID
Returns:
bool: 清理是否成功
"""
try:
robot_dir = Path("projects") / "robot" / bot_id
if robot_dir.exists():
shutil.rmtree(robot_dir)
print(f"Cleaned up robot project: {bot_id}")
return True
else:
print(f"Robot project does not exist: {bot_id}")
return True
except Exception as e:
print(f"Error cleaning up robot project {bot_id}: {str(e)}")
return False
if __name__ == "__main__":
# 测试代码
test_dataset_ids = ["test-project-1", "test-project-2"]
test_bot_id = "test-robot-001"
robot_dir = create_robot_project(test_dataset_ids, test_bot_id)
print(f"Created robot project at: {robot_dir}")
info = get_robot_project_info(test_bot_id)
print(f"Robot project info: {json.dumps(info, indent=2, ensure_ascii=False)}")

View File

@ -7,7 +7,7 @@ import json
from typing import List, Dict, Optional from typing import List, Dict, Optional
def load_system_prompt(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", unique_id: str="") -> str: def load_system_prompt(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="") -> str:
# 获取语言显示名称 # 获取语言显示名称
language_display_map = { language_display_map = {
'zh': '中文', 'zh': '中文',
@ -19,7 +19,7 @@ def load_system_prompt(project_dir: str, language: str = None, system_prompt: st
# 如果存在{language} 占位符,那么就直接使用 system_prompt # 如果存在{language} 占位符,那么就直接使用 system_prompt
if system_prompt and "{language}" in system_prompt: if system_prompt and "{language}" in system_prompt:
return system_prompt.replace("{language}", language_display).replace('{unique_id}', unique_id) or "" return system_prompt.replace("{language}", language_display).replace('{bot_id}', bot_id) or ""
elif robot_type == "agent" or robot_type == "catalog_agent": elif robot_type == "agent" or robot_type == "catalog_agent":
""" """
优先使用项目目录的system_prompt_catalog_agent.md没有才使用默认的system_prompt_default.md 优先使用项目目录的system_prompt_catalog_agent.md没有才使用默认的system_prompt_default.md
@ -51,13 +51,13 @@ def load_system_prompt(project_dir: str, language: str = None, system_prompt: st
readme = f.read().strip() readme = f.read().strip()
system_prompt_default = system_prompt_default.replace("{readme}", str(readme)) system_prompt_default = system_prompt_default.replace("{readme}", str(readme))
return system_prompt_default.replace("{language}", language_display).replace("{extra_prompt}", system_prompt or "").replace('{unique_id}', unique_id) or "" return system_prompt_default.replace("{language}", language_display).replace("{extra_prompt}", system_prompt or "").replace('{bot_id}', bot_id) or ""
else: else:
return system_prompt.replace("{language}", language_display).replace('{unique_id}', unique_id) or "" return system_prompt.replace("{language}", language_display).replace('{bot_id}', bot_id) or ""
def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, unique_id: str) -> List[Dict]: def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id: str) -> List[Dict]:
""" """
替换 MCP 配置中的占位符 替换 MCP 配置中的占位符
""" """
@ -70,21 +70,21 @@ def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, unique_
for key, value in obj.items(): for key, value in obj.items():
if key == 'args' and isinstance(value, list): if key == 'args' and isinstance(value, list):
# 特别处理 args 列表 # 特别处理 args 列表
obj[key] = [item.replace('{dataset_dir}', dataset_dir).replace('{unique_id}', unique_id) if isinstance(item, str) else item obj[key] = [item.replace('{dataset_dir}', dataset_dir).replace('{bot_id}', bot_id) if isinstance(item, str) else item
for item in value] for item in value]
elif isinstance(value, (dict, list)): elif isinstance(value, (dict, list)):
obj[key] = replace_placeholders_in_obj(value) obj[key] = replace_placeholders_in_obj(value)
elif isinstance(value, str): elif isinstance(value, str):
obj[key] = value.replace('{dataset_dir}', dataset_dir).replace('{unique_id}', unique_id) obj[key] = value.replace('{dataset_dir}', dataset_dir).replace('{bot_id}', bot_id)
elif isinstance(obj, list): elif isinstance(obj, list):
return [replace_placeholders_in_obj(item) if isinstance(item, (dict, list)) else return [replace_placeholders_in_obj(item) if isinstance(item, (dict, list)) else
item.replace('{dataset_dir}', dataset_dir).replace('{unique_id}', unique_id) if isinstance(item, str) else item item.replace('{dataset_dir}', dataset_dir).replace('{bot_id}', bot_id) if isinstance(item, str) else item
for item in obj] for item in obj]
return obj return obj
return replace_placeholders_in_obj(mcp_settings) return replace_placeholders_in_obj(mcp_settings)
def load_mcp_settings(project_dir: str, mcp_settings: list=None, unique_id: str="", robot_type: str = "agent") -> List[Dict]: def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]:
""" """
始终读取默认MCP设置然后与传入的mcp_settings合并合并方式为合并[0].mcpServers对象 始终读取默认MCP设置然后与传入的mcp_settings合并合并方式为合并[0].mcpServers对象
@ -92,7 +92,7 @@ def load_mcp_settings(project_dir: str, mcp_settings: list=None, unique_id: str=
Args: Args:
project_dir: 项目目录路径 project_dir: 项目目录路径
mcp_settings: 可选的MCP设置将与默认设置合并 mcp_settings: 可选的MCP设置将与默认设置合并
unique_id: 唯一标识符 bot_id: 机器人项目ID
robot_type: 机器人类型取值 agent/catalog_agent robot_type: 机器人类型取值 agent/catalog_agent
Returns: Returns:
@ -156,6 +156,6 @@ def load_mcp_settings(project_dir: str, mcp_settings: list=None, unique_id: str=
# 计算 dataset_dir 用于替换 MCP 配置中的占位符 # 计算 dataset_dir 用于替换 MCP 配置中的占位符
dataset_dir = os.path.join(project_dir, "dataset") dataset_dir = os.path.join(project_dir, "dataset")
# 替换 MCP 配置中的 {dataset_dir} 占位符 # 替换 MCP 配置中的 {dataset_dir} 占位符
merged_settings = replace_mcp_placeholders(merged_settings, dataset_dir, unique_id) merged_settings = replace_mcp_placeholders(merged_settings, dataset_dir, bot_id)
return merged_settings return merged_settings