catalog-agent/task_queue/integration_tasks.py
2025-10-18 09:20:59 +08:00

215 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Queue tasks for file processing integration.
"""
import os
import json
import time
from typing import Dict, List, Optional, Any
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.task_status import task_status_store
from utils import download_dataset_files, save_processed_files_log
@huey.task()
def process_files_async(
unique_id: str,
files: Optional[Dict[str, List[str]]] = None,
system_prompt: Optional[str] = None,
mcp_settings: Optional[List[Dict]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
异步处理文件任务 - 与现有files/process API兼容
Args:
unique_id: 项目唯一ID
files: 按key分组的文件路径字典
system_prompt: 系统提示词
mcp_settings: MCP设置
task_id: 任务ID用于状态跟踪
Returns:
处理结果字典
"""
try:
print(f"开始异步处理文件任务项目ID: {unique_id}")
# 如果有task_id设置初始状态
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=unique_id,
status="running"
)
# 确保项目目录存在
project_dir = os.path.join("projects", unique_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# 处理文件使用按key分组格式
processed_files_by_key = {}
if files:
# 使用请求中的文件按key分组
# 由于这是异步任务,需要同步调用
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(unique_id, files))
total_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"异步处理了 {total_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key项目ID: {unique_id}")
else:
print(f"请求中未提供文件项目ID: {unique_id}")
# 收集项目目录下所有的 document.txt 文件
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# 保存system_prompt和mcp_settings到项目目录如果提供
if system_prompt:
system_prompt_file = os.path.join(project_dir, "system_prompt.md")
with open(system_prompt_file, 'w', encoding='utf-8') as f:
f.write(system_prompt)
print(f"已保存system_prompt项目ID: {unique_id}")
if mcp_settings:
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
with open(mcp_settings_file, 'w', encoding='utf-8') as f:
json.dump(mcp_settings, f, ensure_ascii=False, indent=2)
print(f"已保存mcp_settings项目ID: {unique_id}")
# 生成项目README.md文件
try:
from utils.project_manager import save_project_readme
save_project_readme(unique_id)
print(f"已生成README.md文件项目ID: {unique_id}")
except Exception as e:
print(f"生成README.md失败项目ID: {unique_id}, 错误: {str(e)}")
# 不影响主要处理流程,继续执行
# 构建结果文件列表
result_files = []
for key in processed_files_by_key.keys():
# 添加对应的dataset document.txt路径
document_path = os.path.join("projects", unique_id, "dataset", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# 对于没有在processed_files_by_key中但存在的document.txt文件也添加到结果中
existing_document_paths = set(result_files) # 避免重复
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"成功异步处理了 {len(result_files)} 个文档文件,涉及 {len(processed_files_by_key)} 个key",
"unique_id": unique_id,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()),
"processing_time": time.time()
}
# 更新任务状态为完成
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"异步文件处理任务完成: {unique_id}")
return result
except Exception as e:
error_msg = f"异步处理文件时发生错误: {str(e)}"
print(error_msg)
# 更新任务状态为错误
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"unique_id": unique_id,
"error": str(e)
}
@huey.task()
def cleanup_project_async(
unique_id: str,
remove_all: bool = False
) -> Dict[str, Any]:
"""
异步清理项目文件
Args:
unique_id: 项目唯一ID
remove_all: 是否删除整个项目目录
Returns:
清理结果字典
"""
try:
print(f"开始异步清理项目项目ID: {unique_id}")
project_dir = os.path.join("projects", unique_id)
removed_items = []
if remove_all and os.path.exists(project_dir):
import shutil
shutil.rmtree(project_dir)
removed_items.append(project_dir)
result = {
"status": "success",
"message": f"已删除整个项目目录: {project_dir}",
"unique_id": unique_id,
"removed_items": removed_items,
"action": "remove_all"
}
else:
# 只清理处理日志
log_file = os.path.join(project_dir, "processed_files.json")
if os.path.exists(log_file):
os.remove(log_file)
removed_items.append(log_file)
result = {
"status": "success",
"message": f"已清理项目处理日志项目ID: {unique_id}",
"unique_id": unique_id,
"removed_items": removed_items,
"action": "cleanup_logs"
}
print(f"异步清理任务完成: {unique_id}")
return result
except Exception as e:
error_msg = f"异步清理项目时发生错误: {str(e)}"
print(error_msg)
return {
"status": "error",
"message": error_msg,
"unique_id": unique_id,
"error": str(e)
}