#!/usr/bin/env python3 """ Queue tasks for file processing integration. """ import os import json import time from typing import Dict, List, Optional, Any from task_queue.config import huey from task_queue.manager import queue_manager from task_queue.task_status import task_status_store from utils import download_dataset_files, save_processed_files_log, load_processed_files_log from utils.dataset_manager import remove_dataset_directory_by_key @huey.task() def process_files_async( unique_id: str, files: Optional[Dict[str, List[str]]] = None, system_prompt: Optional[str] = None, mcp_settings: Optional[List[Dict]] = None, task_id: Optional[str] = None ) -> Dict[str, Any]: """ 异步处理文件任务 - 与现有files/process API兼容 Args: unique_id: 项目唯一ID files: 按key分组的文件路径字典 system_prompt: 系统提示词 mcp_settings: MCP设置 task_id: 任务ID(用于状态跟踪) Returns: 处理结果字典 """ try: print(f"开始异步处理文件任务,项目ID: {unique_id}") # 如果有task_id,设置初始状态 if task_id: task_status_store.set_status( task_id=task_id, unique_id=unique_id, status="running" ) # 确保项目目录存在 project_dir = os.path.join("projects", "data", unique_id) if not os.path.exists(project_dir): os.makedirs(project_dir, exist_ok=True) # 处理文件:使用按key分组格式 processed_files_by_key = {} if files: # 使用请求中的文件(按key分组) # 由于这是异步任务,需要同步调用 import asyncio try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) processed_files_by_key = loop.run_until_complete(download_dataset_files(unique_id, files)) total_files = sum(len(files_list) for files_list in processed_files_by_key.values()) print(f"异步处理了 {total_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key,项目ID: {unique_id}") else: print(f"请求中未提供文件,项目ID: {unique_id}") # 收集项目目录下所有的 document.txt 文件 document_files = [] for root, dirs, files_list in os.walk(project_dir): for file in files_list: if file == "document.txt": document_files.append(os.path.join(root, file)) # 保存system_prompt和mcp_settings到项目目录(如果提供) if system_prompt: system_prompt_file = os.path.join(project_dir, "system_prompt.md") with open(system_prompt_file, 'w', encoding='utf-8') as f: f.write(system_prompt) print(f"已保存system_prompt,项目ID: {unique_id}") if mcp_settings: mcp_settings_file = os.path.join(project_dir, "mcp_settings.json") with open(mcp_settings_file, 'w', encoding='utf-8') as f: json.dump(mcp_settings, f, ensure_ascii=False, indent=2) print(f"已保存mcp_settings,项目ID: {unique_id}") # 生成项目README.md文件 try: from utils.project_manager import save_project_readme save_project_readme(unique_id) print(f"已生成README.md文件,项目ID: {unique_id}") except Exception as e: print(f"生成README.md失败,项目ID: {unique_id}, 错误: {str(e)}") # 不影响主要处理流程,继续执行 # 构建结果文件列表 result_files = [] for key in processed_files_by_key.keys(): # 添加对应的dataset document.txt路径 document_path = os.path.join("projects", "data", unique_id, "dataset", key, "document.txt") if os.path.exists(document_path): result_files.append(document_path) # 对于没有在processed_files_by_key中但存在的document.txt文件,也添加到结果中 existing_document_paths = set(result_files) # 避免重复 for doc_file in document_files: if doc_file not in existing_document_paths: result_files.append(doc_file) result = { "status": "success", "message": f"成功异步处理了 {len(result_files)} 个文档文件,涉及 {len(processed_files_by_key)} 个key", "unique_id": unique_id, "processed_files": result_files, "processed_files_by_key": processed_files_by_key, "document_files": document_files, "total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()), "processing_time": time.time() } # 更新任务状态为完成 if task_id: task_status_store.update_status( task_id=task_id, status="completed", result=result ) print(f"异步文件处理任务完成: {unique_id}") return result except Exception as e: error_msg = f"异步处理文件时发生错误: {str(e)}" print(error_msg) # 更新任务状态为错误 if task_id: task_status_store.update_status( task_id=task_id, status="failed", error=error_msg ) return { "status": "error", "message": error_msg, "unique_id": unique_id, "error": str(e) } @huey.task() def process_files_incremental_async( dataset_id: str, files_to_add: Optional[Dict[str, List[str]]] = None, files_to_remove: Optional[Dict[str, List[str]]] = None, system_prompt: Optional[str] = None, mcp_settings: Optional[List[Dict]] = None, task_id: Optional[str] = None ) -> Dict[str, Any]: """ 增量处理文件任务 - 支持添加和删除文件 Args: dataset_id: 项目唯一ID files_to_add: 按key分组的要添加的文件路径字典 files_to_remove: 按key分组的要删除的文件路径字典 system_prompt: 系统提示词 mcp_settings: MCP设置 task_id: 任务ID(用于状态跟踪) Returns: 处理结果字典 """ try: print(f"开始增量处理文件任务,项目ID: {dataset_id}") # 如果有task_id,设置初始状态 if task_id: task_status_store.set_status( task_id=task_id, unique_id=dataset_id, status="running" ) # 确保项目目录存在 project_dir = os.path.join("projects", "data", dataset_id) if not os.path.exists(project_dir): os.makedirs(project_dir, exist_ok=True) # 加载现有的处理日志 processed_log = load_processed_files_log(dataset_id) print(f"加载现有处理日志,包含 {len(processed_log)} 个文件记录") removed_files = [] added_files = [] # 1. 处理删除操作 if files_to_remove: print(f"开始处理删除操作,涉及 {len(files_to_remove)} 个key分组") for key, file_list in files_to_remove.items(): if not file_list: # 如果文件列表为空,删除整个key分组 print(f"删除整个key分组: {key}") if remove_dataset_directory_by_key(dataset_id, key): removed_files.append(f"dataset/{key}") # 从处理日志中移除该key的所有记录 keys_to_remove = [file_hash for file_hash, file_info in processed_log.items() if file_info.get('key') == key] for file_hash in keys_to_remove: del processed_log[file_hash] removed_files.append(f"log_entry:{file_hash}") else: # 删除特定文件 for file_path in file_list: print(f"删除特定文件: {key}/{file_path}") # 计算文件hash以在日志中查找 import hashlib file_hash = hashlib.md5(file_path.encode('utf-8')).hexdigest() # 从处理日志中移除 if file_hash in processed_log: del processed_log[file_hash] removed_files.append(f"log_entry:{file_hash}") # 2. 处理添加操作 processed_files_by_key = {} if files_to_add: print(f"开始处理添加操作,涉及 {len(files_to_add)} 个key分组") # 使用异步处理下载文件 import asyncio try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files_to_add)) total_added_files = sum(len(files_list) for files_list in processed_files_by_key.values()) print(f"异步处理了 {total_added_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key,项目ID: {dataset_id}") # 记录添加的文件 for key, files_list in processed_files_by_key.items(): for file_path in files_list: added_files.append(f"{key}/{file_path}") else: print(f"请求中未提供要添加的文件,项目ID: {dataset_id}") # 保存更新后的处理日志 save_processed_files_log(dataset_id, processed_log) print(f"已更新处理日志,当前包含 {len(processed_log)} 个文件记录") # 保存system_prompt和mcp_settings到项目目录(如果提供) if system_prompt: system_prompt_file = os.path.join(project_dir, "system_prompt.md") with open(system_prompt_file, 'w', encoding='utf-8') as f: f.write(system_prompt) print(f"已保存system_prompt,项目ID: {dataset_id}") if mcp_settings: mcp_settings_file = os.path.join(project_dir, "mcp_settings.json") with open(mcp_settings_file, 'w', encoding='utf-8') as f: json.dump(mcp_settings, f, ensure_ascii=False, indent=2) print(f"已保存mcp_settings,项目ID: {dataset_id}") # 生成项目README.md文件 try: from utils.project_manager import save_project_readme save_project_readme(dataset_id) print(f"已生成README.md文件,项目ID: {dataset_id}") except Exception as e: print(f"生成README.md失败,项目ID: {dataset_id}, 错误: {str(e)}") # 不影响主要处理流程,继续执行 # 收集项目目录下所有的 document.txt 文件 document_files = [] for root, dirs, files_list in os.walk(project_dir): for file in files_list: if file == "document.txt": document_files.append(os.path.join(root, file)) # 构建结果文件列表 result_files = [] for key in processed_files_by_key.keys(): # 添加对应的dataset document.txt路径 document_path = os.path.join("projects", "data", dataset_id, "dataset", key, "document.txt") if os.path.exists(document_path): result_files.append(document_path) # 对于没有在processed_files_by_key中但存在的document.txt文件,也添加到结果中 existing_document_paths = set(result_files) # 避免重复 for doc_file in document_files: if doc_file not in existing_document_paths: result_files.append(doc_file) result = { "status": "success", "message": f"增量处理完成 - 添加了 {len(added_files)} 个文件,删除了 {len(removed_files)} 个文件,最终保留 {len(result_files)} 个文档文件", "dataset_id": dataset_id, "removed_files": removed_files, "added_files": added_files, "processed_files": result_files, "processed_files_by_key": processed_files_by_key, "document_files": document_files, "total_files_added": sum(len(files_list) for files_list in processed_files_by_key.values()), "total_files_removed": len(removed_files), "final_files_count": len(result_files), "processing_time": time.time() } # 更新任务状态为完成 if task_id: task_status_store.update_status( task_id=task_id, status="completed", result=result ) print(f"增量文件处理任务完成: {dataset_id}") return result except Exception as e: error_msg = f"增量处理文件时发生错误: {str(e)}" print(error_msg) # 更新任务状态为错误 if task_id: task_status_store.update_status( task_id=task_id, status="failed", error=error_msg ) return { "status": "error", "message": error_msg, "dataset_id": dataset_id, "error": str(e) } @huey.task() def cleanup_project_async( unique_id: str, remove_all: bool = False ) -> Dict[str, Any]: """ 异步清理项目文件 Args: unique_id: 项目唯一ID remove_all: 是否删除整个项目目录 Returns: 清理结果字典 """ try: print(f"开始异步清理项目,项目ID: {unique_id}") project_dir = os.path.join("projects", "data", unique_id) removed_items = [] if remove_all and os.path.exists(project_dir): import shutil shutil.rmtree(project_dir) removed_items.append(project_dir) result = { "status": "success", "message": f"已删除整个项目目录: {project_dir}", "unique_id": unique_id, "removed_items": removed_items, "action": "remove_all" } else: # 只清理处理日志 log_file = os.path.join(project_dir, "processed_files.json") if os.path.exists(log_file): os.remove(log_file) removed_items.append(log_file) result = { "status": "success", "message": f"已清理项目处理日志,项目ID: {unique_id}", "unique_id": unique_id, "removed_items": removed_items, "action": "cleanup_logs" } print(f"异步清理任务完成: {unique_id}") return result except Exception as e: error_msg = f"异步清理项目时发生错误: {str(e)}" print(error_msg) return { "status": "error", "message": error_msg, "unique_id": unique_id, "error": str(e) }