qwen_agent/task_queue/integration_tasks.py
2025-11-05 10:33:46 +08:00

406 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Queue tasks for file processing integration.
"""
import os
import json
import time
from typing import Dict, List, Optional, Any
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.task_status import task_status_store
from utils import download_dataset_files, save_processed_files_log, load_processed_files_log
from utils.dataset_manager import remove_dataset_directory_by_key
@huey.task()
def process_files_async(
unique_id: str,
files: Optional[Dict[str, List[str]]] = None,
system_prompt: Optional[str] = None,
mcp_settings: Optional[List[Dict]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
异步处理文件任务 - 与现有files/process API兼容
Args:
unique_id: 项目唯一ID
files: 按key分组的文件路径字典
system_prompt: 系统提示词
mcp_settings: MCP设置
task_id: 任务ID用于状态跟踪
Returns:
处理结果字典
"""
try:
print(f"开始异步处理文件任务项目ID: {unique_id}")
# 如果有task_id设置初始状态
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=unique_id,
status="running"
)
# 确保项目目录存在
project_dir = os.path.join("projects", "data", unique_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# 处理文件使用按key分组格式
processed_files_by_key = {}
if files:
# 使用请求中的文件按key分组
# 由于这是异步任务,需要同步调用
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(unique_id, files))
total_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"异步处理了 {total_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key项目ID: {unique_id}")
else:
print(f"请求中未提供文件项目ID: {unique_id}")
# 收集项目目录下所有的 document.txt 文件
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# 保存system_prompt和mcp_settings到项目目录如果提供
if system_prompt:
system_prompt_file = os.path.join(project_dir, "system_prompt.md")
with open(system_prompt_file, 'w', encoding='utf-8') as f:
f.write(system_prompt)
print(f"已保存system_prompt项目ID: {unique_id}")
if mcp_settings:
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
with open(mcp_settings_file, 'w', encoding='utf-8') as f:
json.dump(mcp_settings, f, ensure_ascii=False, indent=2)
print(f"已保存mcp_settings项目ID: {unique_id}")
# 生成项目README.md文件
try:
from utils.project_manager import save_project_readme
save_project_readme(unique_id)
print(f"已生成README.md文件项目ID: {unique_id}")
except Exception as e:
print(f"生成README.md失败项目ID: {unique_id}, 错误: {str(e)}")
# 不影响主要处理流程,继续执行
# 构建结果文件列表
result_files = []
for key in processed_files_by_key.keys():
# 添加对应的dataset document.txt路径
document_path = os.path.join("projects", "data", unique_id, "dataset", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# 对于没有在processed_files_by_key中但存在的document.txt文件也添加到结果中
existing_document_paths = set(result_files) # 避免重复
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"成功异步处理了 {len(result_files)} 个文档文件,涉及 {len(processed_files_by_key)} 个key",
"unique_id": unique_id,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()),
"processing_time": time.time()
}
# 更新任务状态为完成
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"异步文件处理任务完成: {unique_id}")
return result
except Exception as e:
error_msg = f"异步处理文件时发生错误: {str(e)}"
print(error_msg)
# 更新任务状态为错误
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"unique_id": unique_id,
"error": str(e)
}
@huey.task()
def process_files_incremental_async(
dataset_id: str,
files_to_add: Optional[Dict[str, List[str]]] = None,
files_to_remove: Optional[Dict[str, List[str]]] = None,
system_prompt: Optional[str] = None,
mcp_settings: Optional[List[Dict]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
增量处理文件任务 - 支持添加和删除文件
Args:
dataset_id: 项目唯一ID
files_to_add: 按key分组的要添加的文件路径字典
files_to_remove: 按key分组的要删除的文件路径字典
system_prompt: 系统提示词
mcp_settings: MCP设置
task_id: 任务ID用于状态跟踪
Returns:
处理结果字典
"""
try:
print(f"开始增量处理文件任务项目ID: {dataset_id}")
# 如果有task_id设置初始状态
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="running"
)
# 确保项目目录存在
project_dir = os.path.join("projects", "data", dataset_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# 加载现有的处理日志
processed_log = load_processed_files_log(dataset_id)
print(f"加载现有处理日志,包含 {len(processed_log)} 个文件记录")
removed_files = []
added_files = []
# 1. 处理删除操作
if files_to_remove:
print(f"开始处理删除操作,涉及 {len(files_to_remove)} 个key分组")
for key, file_list in files_to_remove.items():
if not file_list: # 如果文件列表为空删除整个key分组
print(f"删除整个key分组: {key}")
if remove_dataset_directory_by_key(dataset_id, key):
removed_files.append(f"dataset/{key}")
# 从处理日志中移除该key的所有记录
keys_to_remove = [file_hash for file_hash, file_info in processed_log.items()
if file_info.get('key') == key]
for file_hash in keys_to_remove:
del processed_log[file_hash]
removed_files.append(f"log_entry:{file_hash}")
else:
# 删除特定文件
for file_path in file_list:
print(f"删除特定文件: {key}/{file_path}")
# 计算文件hash以在日志中查找
import hashlib
file_hash = hashlib.md5(file_path.encode('utf-8')).hexdigest()
# 从处理日志中移除
if file_hash in processed_log:
del processed_log[file_hash]
removed_files.append(f"log_entry:{file_hash}")
# 2. 处理添加操作
processed_files_by_key = {}
if files_to_add:
print(f"开始处理添加操作,涉及 {len(files_to_add)} 个key分组")
# 使用异步处理下载文件
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files_to_add))
total_added_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"异步处理了 {total_added_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key项目ID: {dataset_id}")
# 记录添加的文件
for key, files_list in processed_files_by_key.items():
for file_path in files_list:
added_files.append(f"{key}/{file_path}")
else:
print(f"请求中未提供要添加的文件项目ID: {dataset_id}")
# 保存更新后的处理日志
save_processed_files_log(dataset_id, processed_log)
print(f"已更新处理日志,当前包含 {len(processed_log)} 个文件记录")
# 保存system_prompt和mcp_settings到项目目录如果提供
if system_prompt:
system_prompt_file = os.path.join(project_dir, "system_prompt.md")
with open(system_prompt_file, 'w', encoding='utf-8') as f:
f.write(system_prompt)
print(f"已保存system_prompt项目ID: {dataset_id}")
if mcp_settings:
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
with open(mcp_settings_file, 'w', encoding='utf-8') as f:
json.dump(mcp_settings, f, ensure_ascii=False, indent=2)
print(f"已保存mcp_settings项目ID: {dataset_id}")
# 生成项目README.md文件
try:
from utils.project_manager import save_project_readme
save_project_readme(dataset_id)
print(f"已生成README.md文件项目ID: {dataset_id}")
except Exception as e:
print(f"生成README.md失败项目ID: {dataset_id}, 错误: {str(e)}")
# 不影响主要处理流程,继续执行
# 收集项目目录下所有的 document.txt 文件
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# 构建结果文件列表
result_files = []
for key in processed_files_by_key.keys():
# 添加对应的dataset document.txt路径
document_path = os.path.join("projects", "data", dataset_id, "dataset", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# 对于没有在processed_files_by_key中但存在的document.txt文件也添加到结果中
existing_document_paths = set(result_files) # 避免重复
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"增量处理完成 - 添加了 {len(added_files)} 个文件,删除了 {len(removed_files)} 个文件,最终保留 {len(result_files)} 个文档文件",
"dataset_id": dataset_id,
"removed_files": removed_files,
"added_files": added_files,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_added": sum(len(files_list) for files_list in processed_files_by_key.values()),
"total_files_removed": len(removed_files),
"final_files_count": len(result_files),
"processing_time": time.time()
}
# 更新任务状态为完成
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"增量文件处理任务完成: {dataset_id}")
return result
except Exception as e:
error_msg = f"增量处理文件时发生错误: {str(e)}"
print(error_msg)
# 更新任务状态为错误
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}
@huey.task()
def cleanup_project_async(
unique_id: str,
remove_all: bool = False
) -> Dict[str, Any]:
"""
异步清理项目文件
Args:
unique_id: 项目唯一ID
remove_all: 是否删除整个项目目录
Returns:
清理结果字典
"""
try:
print(f"开始异步清理项目项目ID: {unique_id}")
project_dir = os.path.join("projects", "data", unique_id)
removed_items = []
if remove_all and os.path.exists(project_dir):
import shutil
shutil.rmtree(project_dir)
removed_items.append(project_dir)
result = {
"status": "success",
"message": f"已删除整个项目目录: {project_dir}",
"unique_id": unique_id,
"removed_items": removed_items,
"action": "remove_all"
}
else:
# 只清理处理日志
log_file = os.path.join(project_dir, "processed_files.json")
if os.path.exists(log_file):
os.remove(log_file)
removed_items.append(log_file)
result = {
"status": "success",
"message": f"已清理项目处理日志项目ID: {unique_id}",
"unique_id": unique_id,
"removed_items": removed_items,
"action": "cleanup_logs"
}
print(f"异步清理任务完成: {unique_id}")
return result
except Exception as e:
error_msg = f"异步清理项目时发生错误: {str(e)}"
print(error_msg)
return {
"status": "error",
"message": error_msg,
"unique_id": unique_id,
"error": str(e)
}