qwen_agent/task_queue/integration_tasks.py
2025-11-09 11:54:03 +08:00

499 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Queue tasks for file processing integration.
"""
import os
import json
import time
import hashlib
import shutil
from typing import Dict, List, Optional, Any
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.task_status import task_status_store
from utils import download_dataset_files, save_processed_files_log, load_processed_files_log
from utils.dataset_manager import remove_dataset_directory_by_key
def scan_upload_folder(upload_dir: str) -> List[str]:
"""
扫描上传文件夹中的所有支持格式的文件
Args:
upload_dir: 上传文件夹路径
Returns:
List[str]: 支持的文件路径列表
"""
supported_extensions = {
# 文本文件
'.txt', '.md', '.rtf',
# 文档文件
'.doc', '.docx', '.pdf', '.odt',
# 表格文件
'.xls', '.xlsx', '.csv', '.ods',
# 演示文件
'.ppt', '.pptx', '.odp',
# 电子书
'.epub', '.mobi',
# 网页文件
'.html', '.htm',
# 配置文件
'.json', '.xml', '.yaml', '.yml',
# 代码文件
'.py', '.js', '.java', '.cpp', '.c', '.go', '.rs',
# 压缩文件
'.zip', '.rar', '.7z', '.tar', '.gz'
}
scanned_files = []
if not os.path.exists(upload_dir):
return scanned_files
for root, dirs, files in os.walk(upload_dir):
for file in files:
# 跳过隐藏文件和系统文件
if file.startswith('.') or file.startswith('~'):
continue
file_path = os.path.join(root, file)
file_extension = os.path.splitext(file)[1].lower()
# 检查文件扩展名是否支持
if file_extension in supported_extensions:
scanned_files.append(file_path)
else:
# 对于没有扩展名的文件,也尝试处理(可能是文本文件)
if not file_extension:
try:
# 尝试读取文件头部来判断是否为文本文件
with open(file_path, 'r', encoding='utf-8') as f:
f.read(1024) # 读取前1KB
scanned_files.append(file_path)
except (UnicodeDecodeError, PermissionError):
# 不是文本文件或无法读取,跳过
pass
return scanned_files
@huey.task()
def process_files_async(
dataset_id: str,
files: Optional[Dict[str, List[str]]] = None,
upload_folder: Optional[Dict[str, str]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
异步处理文件任务 - 与现有files/process API兼容
Args:
dataset_id: 项目唯一ID
files: 按key分组的文件路径字典
upload_folder: 上传文件夹字典,按组名组织文件夹,例如 {'group1': 'my_project1', 'group2': 'my_project2'}
task_id: 任务ID用于状态跟踪
Returns:
处理结果字典
"""
try:
print(f"开始异步处理文件任务项目ID: {dataset_id}")
# 如果有task_id设置初始状态
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="running"
)
# 确保项目目录存在
project_dir = os.path.join("projects", "data", dataset_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# 处理文件使用按key分组格式
processed_files_by_key = {}
# 如果提供了upload_folder扫描这些文件夹中的文件
if upload_folder and not files:
scanned_files_by_group = {}
total_scanned_files = 0
for group_name, folder_name in upload_folder.items():
# 安全性检查:防止路径遍历攻击
safe_folder_name = os.path.basename(folder_name)
upload_dir = os.path.join("projects", "uploads", safe_folder_name)
if os.path.exists(upload_dir):
scanned_files = scan_upload_folder(upload_dir)
if scanned_files:
scanned_files_by_group[group_name] = scanned_files
total_scanned_files += len(scanned_files)
print(f"从上传文件夹 '{safe_folder_name}' (组: {group_name}) 扫描到 {len(scanned_files)} 个文件")
else:
print(f"上传文件夹 '{safe_folder_name}' (组: {group_name}) 中没有找到支持的文件")
else:
print(f"上传文件夹不存在: {upload_dir} (组: {group_name})")
if scanned_files_by_group:
files = scanned_files_by_group
print(f"总共从 {len(scanned_files_by_group)} 个组扫描到 {total_scanned_files} 个文件")
else:
print(f"所有上传文件夹中都没有找到支持的文件")
if files:
# 使用请求中的文件按key分组
# 由于这是异步任务,需要同步调用
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files))
total_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"异步处理了 {total_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key项目ID: {dataset_id}")
else:
print(f"请求中未提供文件项目ID: {dataset_id}")
# 收集项目目录下所有的 document.txt 文件
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# 生成项目README.md文件
try:
from utils.project_manager import save_project_readme
save_project_readme(dataset_id)
print(f"已生成README.md文件项目ID: {dataset_id}")
except Exception as e:
print(f"生成README.md失败项目ID: {dataset_id}, 错误: {str(e)}")
# 不影响主要处理流程,继续执行
# 构建结果文件列表
result_files = []
for key in processed_files_by_key.keys():
# 添加对应的dataset document.txt路径
document_path = os.path.join("projects", "data", dataset_id, "dataset", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# 对于没有在processed_files_by_key中但存在的document.txt文件也添加到结果中
existing_document_paths = set(result_files) # 避免重复
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"成功异步处理了 {len(result_files)} 个文档文件,涉及 {len(processed_files_by_key)} 个key",
"dataset_id": dataset_id,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()),
"processing_time": time.time()
}
# 更新任务状态为完成
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"异步文件处理任务完成: {dataset_id}")
return result
except Exception as e:
error_msg = f"异步处理文件时发生错误: {str(e)}"
print(error_msg)
# 更新任务状态为错误
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}
@huey.task()
def process_files_incremental_async(
dataset_id: str,
files_to_add: Optional[Dict[str, List[str]]] = None,
files_to_remove: Optional[Dict[str, List[str]]] = None,
system_prompt: Optional[str] = None,
mcp_settings: Optional[List[Dict]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
增量处理文件任务 - 支持添加和删除文件
Args:
dataset_id: 项目唯一ID
files_to_add: 按key分组的要添加的文件路径字典
files_to_remove: 按key分组的要删除的文件路径字典
system_prompt: 系统提示词
mcp_settings: MCP设置
task_id: 任务ID用于状态跟踪
Returns:
处理结果字典
"""
try:
print(f"开始增量处理文件任务项目ID: {dataset_id}")
# 如果有task_id设置初始状态
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="running"
)
# 确保项目目录存在
project_dir = os.path.join("projects", "data", dataset_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# 加载现有的处理日志
processed_log = load_processed_files_log(dataset_id)
print(f"加载现有处理日志,包含 {len(processed_log)} 个文件记录")
removed_files = []
added_files = []
# 1. 处理删除操作
if files_to_remove:
print(f"开始处理删除操作,涉及 {len(files_to_remove)} 个key分组")
for key, file_list in files_to_remove.items():
if not file_list: # 如果文件列表为空删除整个key分组
print(f"删除整个key分组: {key}")
if remove_dataset_directory_by_key(dataset_id, key):
removed_files.append(f"dataset/{key}")
# 从处理日志中移除该key的所有记录
keys_to_remove = [file_hash for file_hash, file_info in processed_log.items()
if file_info.get('key') == key]
for file_hash in keys_to_remove:
del processed_log[file_hash]
removed_files.append(f"log_entry:{file_hash}")
else:
# 删除特定文件
for file_path in file_list:
print(f"删除特定文件: {key}/{file_path}")
# 实际删除文件
filename = os.path.basename(file_path)
# 删除原始文件
source_file = os.path.join("projects", "data", dataset_id, "files", key, filename)
if os.path.exists(source_file):
os.remove(source_file)
removed_files.append(f"file:{key}/{filename}")
# 删除处理后的文件目录
processed_dir = os.path.join("projects", "data", dataset_id, "processed", key, filename)
if os.path.exists(processed_dir):
shutil.rmtree(processed_dir)
removed_files.append(f"processed:{key}/{filename}")
# 计算文件hash以在日志中查找
file_hash = hashlib.md5(file_path.encode('utf-8')).hexdigest()
# 从处理日志中移除
if file_hash in processed_log:
del processed_log[file_hash]
removed_files.append(f"log_entry:{file_hash}")
# 2. 处理添加操作
processed_files_by_key = {}
if files_to_add:
print(f"开始处理添加操作,涉及 {len(files_to_add)} 个key分组")
# 使用异步处理下载文件
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files_to_add, incremental_mode=True))
total_added_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"异步处理了 {total_added_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key项目ID: {dataset_id}")
# 记录添加的文件
for key, files_list in processed_files_by_key.items():
for file_path in files_list:
added_files.append(f"{key}/{file_path}")
else:
print(f"请求中未提供要添加的文件项目ID: {dataset_id}")
# 保存更新后的处理日志
save_processed_files_log(dataset_id, processed_log)
print(f"已更新处理日志,当前包含 {len(processed_log)} 个文件记录")
# 保存system_prompt和mcp_settings到项目目录如果提供
if system_prompt:
system_prompt_file = os.path.join(project_dir, "system_prompt.md")
with open(system_prompt_file, 'w', encoding='utf-8') as f:
f.write(system_prompt)
print(f"已保存system_prompt项目ID: {dataset_id}")
if mcp_settings:
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
with open(mcp_settings_file, 'w', encoding='utf-8') as f:
json.dump(mcp_settings, f, ensure_ascii=False, indent=2)
print(f"已保存mcp_settings项目ID: {dataset_id}")
# 生成项目README.md文件
try:
from utils.project_manager import save_project_readme
save_project_readme(dataset_id)
print(f"已生成README.md文件项目ID: {dataset_id}")
except Exception as e:
print(f"生成README.md失败项目ID: {dataset_id}, 错误: {str(e)}")
# 不影响主要处理流程,继续执行
# 收集项目目录下所有的 document.txt 文件
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# 构建结果文件列表
result_files = []
for key in processed_files_by_key.keys():
# 添加对应的dataset document.txt路径
document_path = os.path.join("projects", "data", dataset_id, "dataset", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# 对于没有在processed_files_by_key中但存在的document.txt文件也添加到结果中
existing_document_paths = set(result_files) # 避免重复
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"增量处理完成 - 添加了 {len(added_files)} 个文件,删除了 {len(removed_files)} 个文件,最终保留 {len(result_files)} 个文档文件",
"dataset_id": dataset_id,
"removed_files": removed_files,
"added_files": added_files,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_added": sum(len(files_list) for files_list in processed_files_by_key.values()),
"total_files_removed": len(removed_files),
"final_files_count": len(result_files),
"processing_time": time.time()
}
# 更新任务状态为完成
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"增量文件处理任务完成: {dataset_id}")
return result
except Exception as e:
error_msg = f"增量处理文件时发生错误: {str(e)}"
print(error_msg)
# 更新任务状态为错误
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}
@huey.task()
def cleanup_project_async(
dataset_id: str,
remove_all: bool = False
) -> Dict[str, Any]:
"""
异步清理项目文件
Args:
dataset_id: 项目唯一ID
remove_all: 是否删除整个项目目录
Returns:
清理结果字典
"""
try:
print(f"开始异步清理项目项目ID: {dataset_id}")
project_dir = os.path.join("projects", "data", dataset_id)
removed_items = []
if remove_all and os.path.exists(project_dir):
import shutil
shutil.rmtree(project_dir)
removed_items.append(project_dir)
result = {
"status": "success",
"message": f"已删除整个项目目录: {project_dir}",
"dataset_id": dataset_id,
"removed_items": removed_items,
"action": "remove_all"
}
else:
# 只清理处理日志
log_file = os.path.join(project_dir, "processed_files.json")
if os.path.exists(log_file):
os.remove(log_file)
removed_items.append(log_file)
result = {
"status": "success",
"message": f"已清理项目处理日志项目ID: {dataset_id}",
"dataset_id": dataset_id,
"removed_items": removed_items,
"action": "cleanup_logs"
}
print(f"异步清理任务完成: {dataset_id}")
return result
except Exception as e:
error_msg = f"异步清理项目时发生错误: {str(e)}"
print(error_msg)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}