From 8f0a5569e2df3a763320a7fa1369912e2e4d9cfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sat, 18 Oct 2025 09:20:59 +0800 Subject: [PATCH] add task --- .gitignore | 1 + README.md | 536 +++++++++++++++++++++----------- db_manager.py | 168 ++++++++++ fastapi_app.py | 235 +++++++++++++- poetry.lock | 17 +- pyproject.toml | 1 + start_queue.py | 18 ++ task_queue/README.md | 154 +++++++++ task_queue/__init__.py | 23 ++ task_queue/config.py | 27 ++ task_queue/consumer.py | 171 ++++++++++ task_queue/example.py | 132 ++++++++ task_queue/integration_tasks.py | 215 +++++++++++++ task_queue/manager.py | 362 +++++++++++++++++++++ task_queue/task_status.py | 210 +++++++++++++ task_queue/tasks.py | 356 +++++++++++++++++++++ utils/__init__.py | 8 + utils/api_models.py | 60 ++++ 18 files changed, 2512 insertions(+), 182 deletions(-) create mode 100755 db_manager.py create mode 100755 start_queue.py create mode 100644 task_queue/README.md create mode 100644 task_queue/__init__.py create mode 100644 task_queue/config.py create mode 100755 task_queue/consumer.py create mode 100755 task_queue/example.py create mode 100644 task_queue/integration_tasks.py create mode 100644 task_queue/manager.py create mode 100644 task_queue/task_status.py create mode 100644 task_queue/tasks.py diff --git a/.gitignore b/.gitignore index 80f227d..fc9b0f1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ workspace __pycache__ public models +queue_data diff --git a/README.md b/README.md index a658679..ccecf9e 100644 --- a/README.md +++ b/README.md @@ -1,108 +1,185 @@ # Qwen Agent - 智能数据检索专家系统 -## 项目概述 +[![Python](https://img.shields.io/badge/Python-3.8+-blue.svg)](https://python.org) +[![FastAPI](https://img.shields.io/badge/FastAPI-0.100+-green.svg)](https://fastapi.tiangolo.com) +[![License](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) + +## 📋 项目概述 Qwen Agent 是一个基于 FastAPI 构建的智能数据检索专家系统,专门用于处理和分析结构化数据集。系统通过无状态的 ZIP 项目加载机制,支持动态加载多种数据集,并提供类似 OpenAI 的聊天接口,便于与现有 AI 应用集成。 -## 核心功能 +### 🌟 核心特性 -### 1. 智能数据检索 -- 基于倒排索引和多层数据架构的专业数据检索 -- 支持复杂查询优化和自主决策能力 -- 动态制定最优检索策略 +- **🔍 智能数据检索** - 基于倒排索引和多层数据架构的专业数据检索 +- **📦 无状态项目加载** - 通过 ZIP URL 动态加载数据集,自动缓存和解压 +- **🏗️ 多层架构数据处理** - 文档层、序列化层、索引层的分层存储 +- **🚀 异步文件处理队列** - 基于 huey 和 SQLite 的高性能异步任务队列 +- **📊 任务状态管理** - 实时任务状态查询和 SQLite 数据持久化 +- **🤖 兼容 OpenAI API** - 完全兼容 OpenAI chat/completions 接口 -### 2. 无状态项目加载 -- 通过 ZIP URL 动态加载数据集 -- 自动缓存和解压,提高性能 -- 支持多种数据结构和文件格式 +--- -### 3. 多层架构数据处理 -- **文档层** (document.txt): 原始文本内容,提供完整上下文 -- **序列化层** (serialization.txt): 结构化数据,支持高效匹配 -- **索引层** (schema.json): 字段定义、枚举值映射、文件关联关系 +## 🚀 快速开始 -## API 接口协议 +### 环境要求 -### Chat Completions 接口 +- Python 3.8+ +- Poetry (推荐) 或 pip +- 足够的磁盘空间用于缓存 + +### 安装依赖 + +```bash +# 使用 Poetry (推荐) +poetry install +poetry run python fastapi_app.py + +# 或使用 pip +pip install -r requirements.txt +python fastapi_app.py +``` + +### Docker 部署 + +```bash +# 构建镜像 +docker build -t qwen-agent . + +# 运行容器 +docker run -p 8001:8001 qwen-agent + +# 或使用 Docker Compose +docker-compose up -d +``` + +--- + +## 📖 使用指南 + +### 1. 聊天接口 (OpenAI 兼容) **端点**: `POST /api/v1/chat/completions` -**请求格式**: +```bash +curl -X POST "http://localhost:8001/api/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -d '{ + "messages": [ + { + "role": "user", + "content": "HP Elite Mini 800 G9ってノートPC?" + } + ], + "model": "qwen3-next", + "zip_url": "http://127.0.0.1:8080/all_hp_product_spec_book2506.zip", + "stream": false + }' +``` + +### 2. 异步文件处理队列 + +#### 启动队列系统 + +```bash +# 终端1:启动队列消费者 +poetry run python task_queue/consumer.py --workers 2 + +# 终端2:启动API服务器 +poetry run python fastapi_app.py +``` + +#### 提交异步任务 + +```bash +curl -X POST "http://localhost:8001/api/v1/files/process/async" \ + -H "Content-Type: application/json" \ + -d '{ + "unique_id": "my_project_123", + "files": { + "documents": ["public/document.txt"], + "reports": ["public/data.zip"] + }, + "system_prompt": "处理这些文档" + }' +``` + +**响应**: ```json { - "messages": [ - { - "role": "user", - "content": "HP Elite Mini 800 G9ってノートPC?" - } - ], - "model": "qwen3-next", - "model_server": "https://openrouter.ai/api/v1", - "api_key": "your-api-key", - "zip_url": "http://127.0.0.1:8080/all_hp_product_spec_book2506.zip", - "stream": false, - "max_input_tokens": 58000, - "top_p": 0.8, - "temperature": 0.7, - "max_tokens": 2000 + "success": true, + "task_id": "abc-123-def", + "unique_id": "my_project_123", + "task_status": "pending", + "estimated_processing_time": 30 } ``` -**参数说明**: -- `messages`: 聊天消息列表 -- `model`: 模型名称(默认: "qwen3-next") -- `model_server`: 模型服务器地址(必须) -- `api_key`: API 密钥(可通过 Authorization header 传入) -- `zip_url`: ZIP 数据集的 URL(必需) -- `stream`: 是否流式响应(默认: false) -- `max_input_tokens`: 最大输入tokens数 -- `top_p`: 核采样参数 -- `temperature`: 温度参数 -- `max_tokens`: 最大生成tokens数 -- 其他任意模型生成参数 +#### 查询任务状态 -**响应格式**(非流式): +```bash +# 🎯 主要接口 - 只需要记住这一个 +curl "http://localhost:8001/api/v1/task/abc-123-def/status" +``` + +**状态响应**: ```json { - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": "HP Elite Mini 800 G9はノートPCではなく、小型のデスクトップPCです。" - }, - "finish_reason": "stop" - } - ], - "usage": { - "prompt_tokens": 25, - "completion_tokens": 18, - "total_tokens": 43 + "success": true, + "task_id": "abc-123-def", + "status": "completed", + "unique_id": "my_project_123", + "result": { + "status": "success", + "message": "成功处理了 2 个文档文件", + "processed_files": ["projects/my_project_123/dataset/docs/document.txt"] } } ``` -**流式响应**: 使用 Server-Sent Events (SSE) 格式,每个数据块采用 OpenAI 格式。 +### 3. Python 客户端示例 -### 系统管理接口 +```python +import requests +import time -#### 健康检查 -- `GET /api/health` - 系统健康状态检查 +def submit_and_monitor_task(): + # 1. 提交任务 + response = requests.post( + "http://localhost:8001/api/v1/files/process/async", + json={ + "unique_id": "my_project", + "files": {"docs": ["public/file.txt"]} + } + ) + + task_id = response.json()["task_id"] + print(f"任务已提交: {task_id}") + + # 2. 监控任务状态 + while True: + response = requests.get(f"http://localhost:8001/api/v1/task/{task_id}/status") + data = response.json() + + status = data["status"] + print(f"任务状态: {status}") + + if status == "completed": + print("🎉 任务完成!") + break + elif status == "failed": + print("❌ 任务失败!") + break + + time.sleep(2) -#### 系统状态 -- `GET /system/status` - 获取系统状态和缓存统计信息 +submit_and_monitor_task() +``` -#### 缓存管理 -- `POST /system/cleanup-cache` - 清理所有缓存 -- `POST /system/cleanup-agent-cache` - 清理助手实例缓存 -- `GET /system/cached-projects` - 获取所有缓存的项目信息 -- `POST /system/remove-project-cache` - 移除特定项目缓存 +--- -## ZIP_URL 数据包结构 +## 🗃️ 数据包结构 -### 压缩包内容要求 - -ZIP 压缩包应包含以下目录结构: +### ZIP 数据集格式 ``` dataset_name/ @@ -110,57 +187,183 @@ dataset_name/ ├── dataset/ │ └── data_collection/ │ ├── document.txt # 原始文本内容 -│ ├── serialization.txt # 序列化结构数据 +│ ├── serialization.txt # 结构化数据 │ └── schema.json # 字段定义和元数据 ├── mcp_settings.json # MCP 工具配置 └── system_prompt.md # 系统提示词(可选) ``` -### 文件详细说明 +### 文件说明 -#### 1. document.txt -- 包含原始 Markdown 文本内容 -- 提供数据的完整上下文信息 -- 检索时需要包含前后行的上下文才有意义 +- **document.txt**: 原始 Markdown 文本,提供完整上下文 +- **serialization.txt**: 格式化结构数据,每行 `字段1:值1;字段2:值2` +- **schema.json**: 字段定义、枚举值映射和文件关联关系 -#### 2. serialization.txt -- 基于 document.txt 解析的格式化结构数据 -- 每行格式:`字段1:值1;字段2:值2;...` -- 支持正则高效匹配和关键词检索 -- 单行内容代表一条完整的数据 +--- + +## 📊 数据存储和管理 + +### 任务状态存储 + +任务状态存储在 SQLite 数据库中: -#### 3. schema.json -```json -{ - "字段名": { - "txt_file_name": "document.txt", - "serialization_file_name": "serialization.txt", - "enums": ["枚举值1", "枚举值2"], - "description": "字段描述信息" - } -} ``` -- 定义字段名、枚举值映射和文件关联关系 -- 提供 serialization.txt 中所有字段的集合 -- 用于字段预览和枚举值预览 +queue_data/task_status.db +``` -#### 4. MCP 工具配置 (mcp_settings.json) -- 配置 Model Context Protocol 工具 -- 支持数据检索和处理的工具集成 -- 可包含 JSON reader、多关键词搜索等工具 +**数据库结构**: +```sql +CREATE TABLE task_status ( + task_id TEXT PRIMARY KEY, -- 任务ID + unique_id TEXT NOT NULL, -- 项目ID + status TEXT NOT NULL, -- 任务状态 + created_at REAL NOT NULL, -- 创建时间 + updated_at REAL NOT NULL, -- 更新时间 + result TEXT, -- 处理结果(JSON) + error TEXT -- 错误信息 +); +``` + +### 数据库管理工具 + +```bash +# 查看数据库内容 +poetry run python db_manager.py view + +# 交互式管理 +poetry run python db_manager.py interactive + +# 获取统计信息 +curl "http://localhost:8001/api/v1/tasks/statistics" +``` + +### 数据备份 + +```bash +# 备份数据库 +cp queue_data/task_status.db queue_data/backup_$(date +%Y%m%d).db + +# 清理旧记录 +curl -X POST "http://localhost:8001/api/v1/tasks/cleanup?older_than_days=7" +``` + +--- + +## 🛠️ API 接口总览 + +### 聊天接口 +- `POST /api/v1/chat/completions` - OpenAI 兼容的聊天接口 + +### 文件处理接口 +- `POST /api/v1/files/process` - 同步文件处理 +- `POST /api/v1/files/process/async` - 异步文件处理 +- `GET /api/v1/files/{unique_id}/status` - 文件处理状态 + +### 任务管理接口 +- `GET /api/v1/task/{task_id}/status` - **主要接口** - 查询任务状态 +- `GET /api/v1/tasks` - 列出任务(支持筛选) +- `GET /api/v1/tasks/statistics` - 获取统计信息 +- `DELETE /api/v1/task/{task_id}` - 删除任务记录 + +### 系统管理接口 +- `GET /api/health` - 健康检查 +- `GET /system/status` - 系统状态 +- `POST /system/cleanup-cache` - 清理缓存 + +--- + +## 🔧 配置和部署 + +### 环境变量 + +```bash +# 模型配置 +MODEL_SERVER=https://openrouter.ai/api/v1 +API_KEY=your-api-key + +# 队列配置 +MAX_CACHED_AGENTS=20 + +# 其他配置 +TOKENIZERS_PARALLELISM=false +``` + +### 生产部署建议 + +1. **队列配置** + ```bash + # 设置合适的工作线程数 + poetry run python task_queue/consumer.py --workers 4 --worker-type threads + ``` + +2. **性能优化** + - 使用 Redis 作为队列后端(可选) + - 配置 nginx 作为反向代理 + - 设置适当的缓存策略 + +3. **监控** + - 定期检查任务状态 + - 监控磁盘空间使用 + - 设置日志轮转 + +--- + +## 📈 性能特性 + +### 智能检索策略 +- **探索性查询**: 结构分析 → 模式发现 → 结果扩展 +- **精确性查询**: 目标定位 → 直接搜索 → 结果验证 +- **分析性查询**: 多维度分析 → 深度挖掘 → 洞察提取 + +### 缓存机制 +- ZIP 文件基于 URL 的 MD5 哈希值进行缓存 +- 助手实例缓存,提高响应速度 +- SQLite 查询缓存 + +### 并发处理 +- 异步文件处理队列 +- 多线程任务执行 +- 支持批量操作 + +--- + +## 📁 项目结构 + +``` +qwen-agent/ +├── fastapi_app.py # FastAPI 主应用 +├── gbase_agent.py # 助手服务逻辑 +├── task_queue/ # 队列系统 +│ ├── config.py # 队列配置 +│ ├── manager.py # 队列管理器 +│ ├── tasks.py # 文件处理任务 +│ ├── integration_tasks.py # 集成任务 +│ ├── task_status.py # 任务状态存储 +│ └── consumer.py # 队列消费者 +├── utils/ # 工具模块 +├── models/ # 模型文件 +├── projects/ # 项目目录 +├── queue_data/ # 队列数据 +├── public/ # 静态文件 +├── db_manager.py # 数据库管理工具 +├── requirements.txt # 依赖列表 +├── pyproject.toml # Poetry 配置 +├── Dockerfile # Docker 构建文件 +└── docker-compose.yml # Docker Compose 配置 +``` + +--- + +## 🎯 使用场景 + +### 适用场景 +- **产品规格检索** - 快速查找产品技术规格 +- **文档分析** - 大量文档的智能检索和分析 +- **数据问答** - 基于结构化数据的问答系统 +- **知识库构建** - 企业知识库的智能检索 ### 示例数据集 - -项目中包含的 HP 产品规格书数据集示例: - -``` -all_hp_product_spec_book2506/ -├── document.txt # HP 产品完整规格信息 -├── serialization.txt # 结构化的产品规格数据 -└── schema.json # 产品字段定义(类型、品牌、规格等) -``` - -数据包含: +项目包含 HP 产品规格书数据集: - 商用/个人笔记本电脑 (EliteBook/OmniBook) - 台式机 (Elite/OMEN) - 工作站 (Z系列) @@ -168,85 +371,58 @@ all_hp_product_spec_book2506/ - Poly 通信设备 - HyperX 游戏配件 -## 技术特性 +--- -### 智能检索策略 -- **探索性查询**: 结构分析 → 模式发现 → 结果扩展 -- **精确性查询**: 目标定位 → 直接搜索 → 结果验证 -- **分析性查询**: 多维度分析 → 深度挖掘 → 洞察提取 +## 🤝 贡献指南 -### 专业工具体系 -- **结构分析工具**: json-reader-get_all_keys, json-reader-get_multiple_values -- **搜索执行工具**: multi-keyword-search, ripgrep-count-matches, ripgrep-search -- **智能路径优化**: 根据查询复杂度选择最优搜索路径 +1. Fork 项目 +2. 创建特性分支 (`git checkout -b feature/AmazingFeature`) +3. 提交更改 (`git commit -m 'Add some AmazingFeature'`) +4. 推送到分支 (`git push origin feature/AmazingFeature`) +5. 打开 Pull Request -### 缓存机制 -- ZIP 文件基于 URL 的 MD5 哈希值进行缓存 -- 助手实例缓存,提高响应速度 -- 支持缓存清理和管理 +--- -## 部署方式 +## 📄 许可证 -### Docker 部署 -```bash -# 构建镜像 -docker build -t qwen-agent . +本项目采用 MIT 许可证 - 查看 [LICENSE](LICENSE) 文件了解详情。 -# 运行容器 -docker run -p 8001:8001 qwen-agent -``` +--- -### Docker Compose 部署 -```bash -docker-compose up -d -``` +## 🆘 支持 -### 本地开发部署 -```bash -# 安装依赖 -pip install -r requirements.txt +- 📖 [详细文档](docs/) +- 🐛 [问题反馈](https://github.com/your-repo/qwen-agent/issues) +- 💬 [讨论区](https://github.com/your-repo/qwen-agent/discussions) -# 启动服务 -python fastapi_app.py -``` +--- -## 系统要求 +## 🎉 开始使用 -- Python 3.8+ -- FastAPI -- Uvicorn -- Qwen Agent 库 -- Requests(用于 ZIP 下载) -- 足够的磁盘空间用于缓存 +1. **克隆项目** + ```bash + git clone https://github.com/your-repo/qwen-agent.git + cd qwen-agent + ``` -## 注意事项 +2. **安装依赖** + ```bash + poetry install + ``` -1. **必需参数**: 所有请求都必须提供 zip_url 参数 -2. **API 密钥**: 可通过 Authorization header 或请求参数传入 -3. **URL 格式**: zip_url 必须是有效的 HTTP/HTTPS URL 或本地路径 -4. **文件大小**: 建议 ZIP 文件不超过 100MB -5. **安全性**: 确保 ZIP 文件来源可信 -6. **网络**: 需要能够访问 zip_url 指向的资源 +3. **启动服务** + ```bash + # 启动队列消费者 + poetry run python task_queue/consumer.py --workers 2 + + # 启动API服务器 + poetry run python fastapi_app.py + ``` -## 项目结构 +4. **测试接口** + ```bash + # 运行测试脚本 + poetry run python test_simple_task.py + ``` -``` -qwen-agent/ -├── fastapi_app.py # FastAPI 主应用 -├── gbase_agent.py # 助手服务逻辑 -├── zip_project_handler.py # ZIP 项目处理器 -├── file_loaded_agent_manager.py # 助助实例管理 -├── agent_pool.py # 助手池管理 -├── system_prompt.md # 系统提示词 -├── requirements.txt # 依赖包列表 -├── Dockerfile # Docker 构建文件 -├── docker-compose.yml # Docker Compose 配置 -├── mcp/ # MCP 工具配置 -├── projects/ # 项目目录 -│ ├── _cache/ # ZIP 文件缓存 -│ └── {hash}/ # 解压后的项目目录 -├── public/ # 静态文件 -└── workspace/ # 工作空间 -``` - -此系统提供了完整的智能数据检索解决方案,支持动态数据集加载和高效的查询处理,适用于各种数据分析和检索场景。 +现在您可以开始使用 Qwen Agent 进行智能数据检索了!🚀 \ No newline at end of file diff --git a/db_manager.py b/db_manager.py new file mode 100755 index 0000000..bccd670 --- /dev/null +++ b/db_manager.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +""" +SQLite任务状态数据库管理工具 +""" + +import sqlite3 +import json +import time +from task_queue.task_status import task_status_store + +def view_database(): + """查看数据库内容""" + print("📊 SQLite任务状态数据库内容") + print("=" * 40) + print(f"数据库路径: {task_status_store.db_path}") + + # 连接数据库 + conn = sqlite3.connect(task_status_store.db_path) + cursor = conn.cursor() + + # 查看表结构 + print(f"\n📋 表结构:") + cursor.execute("PRAGMA table_info(task_status)") + columns = cursor.fetchall() + for col in columns: + print(f" {col[1]} ({col[2]})") + + # 查看所有记录 + print(f"\n📝 所有记录:") + cursor.execute("SELECT * FROM task_status ORDER BY updated_at DESC") + rows = cursor.fetchall() + + if not rows: + print(" (空数据库)") + else: + print(f" 共 {len(rows)} 条记录:") + for i, row in enumerate(rows): + task_id, unique_id, status, created_at, updated_at, result, error = row + created_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at)) + updated_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(updated_at)) + + print(f" {i+1}. {task_id}") + print(f" 项目ID: {unique_id}") + print(f" 状态: {status}") + print(f" 创建: {created_str}") + print(f" 更新: {updated_str}") + if result: + try: + result_data = json.loads(result) + print(f" 结果: {result_data.get('message', 'N/A')}") + except: + print(f" 结果: {result[:50]}...") + if error: + print(f" 错误: {error}") + print() + + conn.close() + +def run_query(sql_query: str): + """执行自定义查询""" + print(f"🔍 执行查询: {sql_query}") + + try: + conn = sqlite3.connect(task_status_store.db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute(sql_query) + rows = cursor.fetchall() + + if not rows: + print(" (无结果)") + else: + print(f" {len(rows)} 条结果:") + for row in rows: + print(f" {dict(row)}") + + conn.close() + + except Exception as e: + print(f"❌ 查询失败: {e}") + +def interactive_shell(): + """交互式数据库管理""" + print("\n🖥️ 交互式数据库管理") + print("输入 'help' 查看可用命令,输入 'quit' 退出") + + while True: + try: + command = input("\n> ").strip() + + if command.lower() in ['quit', 'exit', 'q']: + break + elif command.lower() == 'help': + print(""" +可用命令: + view - 查看所有记录 + stats - 查看统计信息 + pending - 查看待处理任务 + completed - 查看已完成任务 + failed - 查看失败任务 + sql <查询> - 执行SQL查询 + cleanup <天数> - 清理N天前的记录 + count - 统计总任务数 + help - 显示帮助 + quit/exit/q - 退出 + """) + elif command.lower() == 'view': + view_database() + elif command.lower() == 'stats': + stats = task_status_store.get_statistics() + print(f"统计信息:") + print(f" 总任务数: {stats['total_tasks']}") + print(f" 状态分布: {stats['status_breakdown']}") + print(f" 最近24小时: {stats['recent_24h']}") + elif command.lower() == 'pending': + tasks = task_status_store.search_tasks(status="pending") + print(f"待处理任务 ({len(tasks)} 个):") + for task in tasks: + print(f" - {task['task_id']}: {task['unique_id']}") + elif command.lower() == 'completed': + tasks = task_status_store.search_tasks(status="completed") + print(f"已完成任务 ({len(tasks)} 个):") + for task in tasks: + print(f" - {task['task_id']}: {task['unique_id']}") + elif command.lower() == 'failed': + tasks = task_status_store.search_tasks(status="failed") + print(f"失败任务 ({len(tasks)} 个):") + for task in tasks: + print(f" - {task['task_id']}: {task['unique_id']}") + elif command.lower().startswith('sql '): + sql_query = command[4:] + run_query(sql_query) + elif command.lower().startswith('cleanup '): + try: + days = int(command[8:]) + count = task_status_store.cleanup_old_tasks(days) + print(f"✅ 已清理 {count} 条 {days} 天前的记录") + except ValueError: + print("❌ 请输入有效的天数") + elif command.lower() == 'count': + all_tasks = task_status_store.list_all() + print(f"总任务数: {len(all_tasks)}") + else: + print("❌ 未知命令,输入 'help' 查看帮助") + + except KeyboardInterrupt: + print("\n👋 再见!") + break + except Exception as e: + print(f"❌ 执行错误: {e}") + +def main(): + """主函数""" + import sys + + if len(sys.argv) > 1: + if sys.argv[1] == 'view': + view_database() + elif sys.argv[1] == 'interactive': + interactive_shell() + else: + print("用法: python db_manager.py [view|interactive]") + else: + view_database() + interactive_shell() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/fastapi_app.py b/fastapi_app.py index e159d3a..2d0eb1f 100644 --- a/fastapi_app.py +++ b/fastapi_app.py @@ -17,7 +17,8 @@ from pydantic import BaseModel, Field from utils import ( # Models Message, DatasetRequest, ChatRequest, FileProcessRequest, - FileProcessResponse, ChatResponse, + FileProcessResponse, ChatResponse, QueueTaskRequest, QueueTaskResponse, + QueueStatusResponse, TaskStatusResponse, # File utilities download_file, remove_file_or_directory, get_document_preview, @@ -38,6 +39,11 @@ from utils import ( # Import gbase_agent from gbase_agent import update_agent_llm +# Import queue manager +from task_queue.manager import queue_manager +from task_queue.integration_tasks import process_files_async, cleanup_project_async +from task_queue.task_status import task_status_store + os.environ["TOKENIZERS_PARALLELISM"] = "false" # Custom version for qwen-agent messages - keep this function as it's specific to this app @@ -233,6 +239,14 @@ async def process_files(request: FileProcessRequest, authorization: Optional[str json.dump(request.mcp_settings, f, ensure_ascii=False, indent=2) print(f"Saved mcp_settings for unique_id: {unique_id}") + # 生成项目README.md文件 + try: + save_project_readme(unique_id) + print(f"Generated README.md for unique_id: {unique_id}") + except Exception as e: + print(f"Failed to generate README.md for unique_id: {unique_id}, error: {str(e)}") + # 不影响主要处理流程,继续执行 + # 返回结果包含按key分组的文件信息 result_files = [] for key in processed_files_by_key.keys(): @@ -261,6 +275,225 @@ async def process_files(request: FileProcessRequest, authorization: Optional[str raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") +@app.post("/api/v1/files/process/async") +async def process_files_async_endpoint(request: QueueTaskRequest, authorization: Optional[str] = Header(None)): + """ + 异步处理文件的队列版本API + 与 /api/v1/files/process 功能相同,但使用队列异步处理 + + Args: + request: QueueTaskRequest containing unique_id, files, system_prompt, mcp_settings, and queue options + authorization: Authorization header containing API key (Bearer ) + + Returns: + QueueTaskResponse: Processing result with task ID for tracking + """ + try: + unique_id = request.unique_id + if not unique_id: + raise HTTPException(status_code=400, detail="unique_id is required") + + # 估算处理时间(基于文件数量) + estimated_time = 0 + if request.files: + total_files = sum(len(file_list) for file_list in request.files.values()) + estimated_time = max(30, total_files * 10) # 每个文件预估10秒,最少30秒 + + # 提交异步任务 + task_id = queue_manager.enqueue_multiple_files( + project_id=unique_id, + file_paths=[], + original_filenames=[] + ) + + # 创建任务状态记录 + import uuid + task_id = str(uuid.uuid4()) + task_status_store.set_status( + task_id=task_id, + unique_id=unique_id, + status="pending" + ) + + # 提交异步任务 + task = process_files_async( + unique_id=unique_id, + files=request.files, + system_prompt=request.system_prompt, + mcp_settings=request.mcp_settings, + task_id=task_id + ) + + return QueueTaskResponse( + success=True, + message=f"文件处理任务已提交到队列,项目ID: {unique_id}", + unique_id=unique_id, + task_id=task_id, # 使用我们自己的task_id + task_status="pending", + estimated_processing_time=estimated_time + ) + + except HTTPException: + raise + except Exception as e: + print(f"Error submitting async file processing task: {str(e)}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + +@app.get("/api/v1/task/{task_id}/status") +async def get_task_status(task_id: str): + """获取任务状态 - 简单可靠""" + try: + status_data = task_status_store.get_status(task_id) + + if not status_data: + return { + "success": False, + "message": "任务不存在或已过期", + "task_id": task_id, + "status": "not_found" + } + + return { + "success": True, + "message": "任务状态获取成功", + "task_id": task_id, + "status": status_data["status"], + "unique_id": status_data["unique_id"], + "created_at": status_data["created_at"], + "updated_at": status_data["updated_at"], + "result": status_data.get("result"), + "error": status_data.get("error") + } + + except Exception as e: + print(f"Error getting task status: {str(e)}") + raise HTTPException(status_code=500, detail=f"获取任务状态失败: {str(e)}") + + +@app.delete("/api/v1/task/{task_id}") +async def delete_task(task_id: str): + """删除任务记录""" + try: + success = task_status_store.delete_status(task_id) + if success: + return { + "success": True, + "message": f"任务记录已删除: {task_id}", + "task_id": task_id + } + else: + return { + "success": False, + "message": f"任务记录不存在: {task_id}", + "task_id": task_id + } + except Exception as e: + print(f"Error deleting task: {str(e)}") + raise HTTPException(status_code=500, detail=f"删除任务记录失败: {str(e)}") + + +@app.get("/api/v1/tasks") +async def list_tasks(status: Optional[str] = None, unique_id: Optional[str] = None, limit: int = 100): + """列出任务,支持筛选""" + try: + if status or unique_id: + # 使用搜索功能 + tasks = task_status_store.search_tasks(status=status, unique_id=unique_id, limit=limit) + else: + # 获取所有任务 + all_tasks = task_status_store.list_all() + tasks = list(all_tasks.values())[:limit] + + return { + "success": True, + "message": "任务列表获取成功", + "total_tasks": len(tasks), + "tasks": tasks, + "filters": { + "status": status, + "unique_id": unique_id, + "limit": limit + } + } + + except Exception as e: + print(f"Error listing tasks: {str(e)}") + raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}") + + +@app.get("/api/v1/tasks/statistics") +async def get_task_statistics(): + """获取任务统计信息""" + try: + stats = task_status_store.get_statistics() + + return { + "success": True, + "message": "统计信息获取成功", + "statistics": stats + } + + except Exception as e: + print(f"Error getting statistics: {str(e)}") + raise HTTPException(status_code=500, detail=f"获取统计信息失败: {str(e)}") + + +@app.post("/api/v1/tasks/cleanup") +async def cleanup_tasks(older_than_days: int = 7): + """清理旧任务记录""" + try: + deleted_count = task_status_store.cleanup_old_tasks(older_than_days=older_than_days) + + return { + "success": True, + "message": f"已清理 {deleted_count} 条旧任务记录", + "deleted_count": deleted_count, + "older_than_days": older_than_days + } + + except Exception as e: + print(f"Error cleaning up tasks: {str(e)}") + raise HTTPException(status_code=500, detail=f"清理任务记录失败: {str(e)}") + + +@app.get("/api/v1/projects/{unique_id}/tasks") +async def get_project_tasks(unique_id: str): + """获取指定项目的所有任务""" + try: + tasks = task_status_store.get_by_unique_id(unique_id) + + return { + "success": True, + "message": "项目任务获取成功", + "unique_id": unique_id, + "total_tasks": len(tasks), + "tasks": tasks + } + + except Exception as e: + print(f"Error getting project tasks: {str(e)}") + raise HTTPException(status_code=500, detail=f"获取项目任务失败: {str(e)}") + + +@app.post("/api/v1/files/{unique_id}/cleanup/async") +async def cleanup_project_async_endpoint(unique_id: str, remove_all: bool = False): + """异步清理项目文件""" + try: + task = cleanup_project_async(unique_id=unique_id, remove_all=remove_all) + + return { + "success": True, + "message": f"项目清理任务已提交到队列,项目ID: {unique_id}", + "unique_id": unique_id, + "task_id": task.id, + "action": "remove_all" if remove_all else "cleanup_logs" + } + except Exception as e: + print(f"Error submitting cleanup task: {str(e)}") + raise HTTPException(status_code=500, detail=f"提交清理任务失败: {str(e)}") + + @app.post("/api/v1/chat/completions") async def chat_completions(request: ChatRequest, authorization: Optional[str] = Header(None)): """ diff --git a/poetry.lock b/poetry.lock index 9b248e6..2080bdd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -942,6 +942,21 @@ files = [ {file = "httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d"}, ] +[[package]] +name = "huey" +version = "2.5.3" +description = "huey, a little task queue" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "huey-2.5.3.tar.gz", hash = "sha256:089fc72b97fd26a513f15b09925c56fad6abe4a699a1f0e902170b37e85163c7"}, +] + +[package.extras] +backends = ["redis (>=3.0.0)"] +redis = ["redis (>=3.0.0)"] + [[package]] name = "huggingface-hub" version = "0.35.3" @@ -3961,4 +3976,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = "3.12.0" -content-hash = "06c3b78c8107692eb5944b144ae4df02862fa5e4e8a198f6ccfa07c6743a49cf" +content-hash = "2c1ee5d2aabc25d2ae830cf2663df07ec1a7a25156b1b958d04e4d97f982e274" diff --git a/pyproject.toml b/pyproject.toml index a6d4049..1ad1007 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "numpy<2", "aiohttp", "aiofiles", + "huey (>=2.5.3,<3.0.0)", ] diff --git a/start_queue.py b/start_queue.py new file mode 100755 index 0000000..f435ffa --- /dev/null +++ b/start_queue.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +""" +Task Queue启动脚本 +""" + +import sys +import os +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from task_queue.consumer import main as consumer_main + +if __name__ == "__main__": + print("启动文件处理队列系统...") + consumer_main() \ No newline at end of file diff --git a/task_queue/README.md b/task_queue/README.md new file mode 100644 index 0000000..5c51542 --- /dev/null +++ b/task_queue/README.md @@ -0,0 +1,154 @@ +# 队列系统使用说明 + +## 概述 + +本项目集成了基于 huey 和 SqliteHuey 的异步队列系统,用于处理文件的异步处理任务。 + +## 安装依赖 + +```bash +pip install huey +``` + +## 目录结构 + +``` +queue/ +├── __init__.py # 包初始化文件 +├── config.py # 队列配置(SqliteHuey配置) +├── tasks.py # 文件处理任务定义 +├── manager.py # 队列管理器 +├── consumer.py # 队列消费者(工作进程) +├── example.py # 使用示例 +└── README.md # 说明文档 +``` + +## 核心功能 + +### 1. 队列配置 (config.py) +- 使用 SqliteHuey 作为消息队列 +- 数据库文件存储在 `queue_data/huey.db` +- 支持任务重试和错误存储 + +### 2. 文件处理任务 (tasks.py) +- `process_file_async`: 异步处理单个文件 +- `process_multiple_files_async`: 批量异步处理文件 +- `process_zip_file_async`: 异步处理zip压缩文件 +- `cleanup_processed_files`: 清理旧的文件 + +### 3. 队列管理器 (manager.py) +- 任务提交和管理 +- 队列状态监控 +- 任务结果查询 +- 任务记录清理 + +## 使用方法 + +### 1. 启动队列消费者 + +```bash +# 启动默认配置的消费者 +python queue/consumer.py + +# 指定工作线程数 +python queue/consumer.py --workers 4 + +# 查看队列统计信息 +python queue/consumer.py --stats + +# 检查队列状态 +python queue/consumer.py --check + +# 清空队列 +python queue/consumer.py --flush +``` + +### 2. 在代码中使用队列 + +```python +from queue.manager import queue_manager + +# 处理单个文件 +task_id = queue_manager.enqueue_file( + project_id="my_project", + file_path="/path/to/file.txt", + original_filename="myfile.txt" +) + +# 批量处理文件 +task_ids = queue_manager.enqueue_multiple_files( + project_id="my_project", + file_paths=["/path/file1.txt", "/path/file2.txt"], + original_filenames=["file1.txt", "file2.txt"] +) + +# 处理zip文件 +task_id = queue_manager.enqueue_zip_file( + project_id="my_project", + zip_path="/path/to/archive.zip" +) + +# 查看任务状态 +status = queue_manager.get_task_status(task_id) +print(status) + +# 获取队列统计信息 +stats = queue_manager.get_queue_stats() +print(stats) +``` + +### 3. 运行示例 + +```bash +python queue/example.py +``` + +## 配置说明 + +### 队列配置参数 (config.py) +- `filename`: SQLite数据库文件路径 +- `always_eager`: 是否立即执行任务(开发时可设为True) +- `utc`: 是否使用UTC时间 +- `compression_level`: 压缩级别 +- `store_errors`: 是否存储错误信息 +- `max_retries`: 最大重试次数 +- `retry_delay`: 重试延迟 + +### 消费者参数 (consumer.py) +- `--workers`: 工作线程数(默认2) +- `--worker-type`: 工作类型(threads/greenlets/processes) +- `--stats`: 显示统计信息 +- `--check`: 检查队列状态 +- `--flush`: 清空队列 + +## 任务状态 + +- `pending`: 等待处理 +- `running`: 正在处理 +- `complete/finished`: 处理完成 +- `error`: 处理失败 +- `scheduled`: 定时任务 + +## 最佳实践 + +1. **生产环境建议**: + - 设置合适的工作线程数(建议CPU核心数的1-2倍) + - 定期清理旧的任务记录 + - 监控队列状态和任务执行情况 + +2. **开发环境建议**: + - 可以设置 `always_eager=True` 立即执行任务进行调试 + - 使用 `--check` 参数查看队列状态 + - 运行示例代码了解功能 + +3. **错误处理**: + - 任务失败后会自动重试(最多3次) + - 错误信息会存储在数据库中 + - 可以通过 `get_task_status()` 查看错误详情 + +## 故障排除 + +1. **数据库锁定**: 确保只有一个消费者实例在运行 +2. **任务卡住**: 检查文件路径和权限 +3. **内存不足**: 调整工作线程数或使用进程模式 +4. **磁盘空间**: 定期清理旧文件和任务记录 \ No newline at end of file diff --git a/task_queue/__init__.py b/task_queue/__init__.py new file mode 100644 index 0000000..f099268 --- /dev/null +++ b/task_queue/__init__.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +""" +Queue package initialization. +""" + +from .config import huey +from .manager import QueueManager, queue_manager +from .tasks import ( + process_file_async, + process_multiple_files_async, + process_zip_file_async, + cleanup_processed_files +) + +__all__ = [ + "huey", + "QueueManager", + "queue_manager", + "process_file_async", + "process_multiple_files_async", + "process_zip_file_async", + "cleanup_processed_files" +] \ No newline at end of file diff --git a/task_queue/config.py b/task_queue/config.py new file mode 100644 index 0000000..0e25a38 --- /dev/null +++ b/task_queue/config.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +""" +Queue configuration using SqliteHuey for asynchronous file processing. +""" + +import os +from huey import SqliteHuey +from datetime import timedelta + +# 确保queue_data目录存在 +queue_data_dir = os.path.join(os.path.dirname(__file__), '..', 'queue_data') +os.makedirs(queue_data_dir, exist_ok=True) + +# 初始化SqliteHuey +huey = SqliteHuey( + filename=os.path.join(queue_data_dir, 'huey.db'), + name='file_processor', # 队列名称 + always_eager=False, # 设置为False以启用异步处理 + utc=True, # 使用UTC时间 +) + +# 设置默认任务配置 +huey.store_errors = True # 存储错误信息 +huey.max_retries = 3 # 最大重试次数 +huey.retry_delay = timedelta(seconds=60) # 重试延迟 + +print(f"SqliteHuey队列已初始化,数据库路径: {os.path.join(queue_data_dir, 'huey.db')}") \ No newline at end of file diff --git a/task_queue/consumer.py b/task_queue/consumer.py new file mode 100755 index 0000000..b7c24d5 --- /dev/null +++ b/task_queue/consumer.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +Queue consumer for processing file tasks. +""" + +import sys +import os +import time +import signal +import argparse +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from task_queue.config import huey +from task_queue.manager import queue_manager +from task_queue.integration_tasks import process_files_async, cleanup_project_async +from huey.consumer import Consumer + + +class QueueConsumer: + """队列消费者,用于处理异步任务""" + + def __init__(self, worker_type: str = "threads", workers: int = 2): + self.huey = huey + self.worker_type = worker_type + self.workers = workers + self.running = False + self.consumer = None + + # 注册信号处理 + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _signal_handler(self, signum, frame): + """信号处理器,用于优雅关闭""" + print(f"\n收到信号 {signum},正在关闭队列消费者...") + self.running = False + + def start(self): + """启动队列消费者""" + print(f"启动队列消费者...") + print(f"工作线程数: {self.workers}") + print(f"工作类型: {self.worker_type}") + print(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'queue_data', 'huey.db')}") + print("按 Ctrl+C 停止消费者") + + self.running = True + + try: + # 创建Huey消费者 + self.consumer = Consumer(self.huey, workers=self.workers, worker_type=self.worker_type.rstrip('s')) + + # 显示队列统计信息 + stats = queue_manager.get_queue_stats() + print(f"当前队列状态: {stats}") + + # 启动消费者运行 + print("消费者开始处理任务...") + self.consumer.run() + + except KeyboardInterrupt: + print("\n收到中断信号,正在关闭...") + except Exception as e: + print(f"队列消费者运行时发生错误: {str(e)}") + finally: + self.stop() + + def stop(self): + """停止队列消费者""" + print("正在停止队列消费者...") + try: + if self.consumer: + # 停止消费者 + self.consumer.stop() + self.consumer = None + print("队列消费者已停止") + except Exception as e: + print(f"停止队列消费者时发生错误: {str(e)}") + + def process_scheduled_tasks(self): + """处理定时任务""" + print("处理定时任务...") + # 这里可以添加额外的定时任务处理逻辑 + + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description="文件处理队列消费者") + parser.add_argument( + "--workers", + type=int, + default=2, + help="工作线程数 (默认: 2)" + ) + parser.add_argument( + "--worker-type", + choices=["threads", "greenlets", "processes"], + default="threads", + help="工作线程类型 (默认: threads)" + ) + parser.add_argument( + "--stats", + action="store_true", + help="显示队列统计信息并退出" + ) + parser.add_argument( + "--flush", + action="store_true", + help="清空队列并退出" + ) + parser.add_argument( + "--check", + action="store_true", + help="检查队列状态并退出" + ) + + args = parser.parse_args() + + # 初始化消费者 + consumer = QueueConsumer( + worker_type=args.worker_type, + workers=args.workers + ) + + # 处理不同的命令行选项 + if args.stats: + print("=== 队列统计信息 ===") + stats = queue_manager.get_queue_stats() + print(f"总任务数: {stats.get('total_tasks', 0)}") + print(f"待处理任务: {stats.get('pending_tasks', 0)}") + print(f"运行中任务: {stats.get('running_tasks', 0)}") + print(f"已完成任务: {stats.get('completed_tasks', 0)}") + print(f"错误任务: {stats.get('error_tasks', 0)}") + print(f"定时任务: {stats.get('scheduled_tasks', 0)}") + print(f"数据库: {stats.get('queue_database', 'N/A')}") + return + + if args.flush: + print("=== 清空队列 ===") + try: + # 清空所有任务 + consumer.huey.flush() + print("队列已清空") + except Exception as e: + print(f"清空队列失败: {str(e)}") + return + + if args.check: + print("=== 检查队列状态 ===") + stats = queue_manager.get_queue_stats() + print(f"队列状态: 正常" if "error" not in stats else f"队列状态: 错误 - {stats['error']}") + + pending_tasks = queue_manager.list_pending_tasks(limit=10) + if pending_tasks: + print(f"\n待处理任务 (最多显示10个):") + for task in pending_tasks: + print(f" 任务ID: {task['task_id']}, 状态: {task['status']}, 创建时间: {task['created_time']}") + else: + print("当前没有待处理任务") + return + + # 启动消费者 + print("=== 启动文件处理队列消费者 ===") + consumer.start() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/task_queue/example.py b/task_queue/example.py new file mode 100755 index 0000000..bc4c250 --- /dev/null +++ b/task_queue/example.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +Example usage of the queue system. +""" + +import sys +import time +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from task_queue.manager import queue_manager +from task_queue.tasks import process_file_async, process_multiple_files_async + + +def example_single_file(): + """示例:处理单个文件""" + print("=== 示例:处理单个文件 ===") + + project_id = "test_project" + file_path = "public/test_document.txt" + + # 将文件加入队列 + task_id = queue_manager.enqueue_file( + project_id=project_id, + file_path=file_path, + original_filename="example_document.txt" + ) + + print(f"任务已提交,任务ID: {task_id}") + + # 检查任务状态 + time.sleep(2) + status = queue_manager.get_task_status(task_id) + print(f"任务状态: {status}") + + +def example_multiple_files(): + """示例:批量处理文件""" + print("\n=== 示例:批量处理文件 ===") + + project_id = "test_project_batch" + file_paths = [ + "public/test_document.txt", + "public/goods.xlsx" # 假设这个文件存在 + ] + original_filenames = [ + "batch_document_1.txt", + "batch_goods.xlsx" + ] + + # 将多个文件加入队列 + task_ids = queue_manager.enqueue_multiple_files( + project_id=project_id, + file_paths=file_paths, + original_filenames=original_filenames + ) + + print(f"批量任务已提交,任务ID: {task_ids}") + + +def example_zip_file(): + """示例:处理zip文件""" + print("\n=== 示例:处理zip文件 ===") + + project_id = "test_project_zip" + zip_path = "public/all_hp_product_spec_book2506.zip" + + # 将zip文件加入队列 + task_id = queue_manager.enqueue_zip_file( + project_id=project_id, + zip_path=zip_path + ) + + print(f"zip任务已提交,任务ID: {task_id}") + + +def example_queue_stats(): + """示例:获取队列统计信息""" + print("\n=== 示例:队列统计信息 ===") + + stats = queue_manager.get_queue_stats() + print(f"队列统计信息:") + for key, value in stats.items(): + if key != "recent_tasks": + print(f" {key}: {value}") + + +def example_cleanup(): + """示例:清理任务""" + print("\n=== 示例:清理任务 ===") + + project_id = "test_project" + + # 将清理任务加入队列(延迟10秒执行) + task_id = queue_manager.enqueue_cleanup_task( + project_id=project_id, + older_than_days=1, # 清理1天前的文件 + delay=10 + ) + + print(f"清理任务已提交,任务ID: {task_id}") + + +def main(): + """主函数""" + print("队列系统使用示例") + print("=" * 50) + + try: + # 运行示例 + example_single_file() + example_multiple_files() + example_zip_file() + example_queue_stats() + example_cleanup() + + print("\n" + "=" * 50) + print("示例运行完成!") + print("\n要查看任务执行情况,请运行:") + print("python queue/consumer.py --check") + print("\n要启动队列消费者,请运行:") + print("python queue/consumer.py") + + except Exception as e: + print(f"运行示例时发生错误: {str(e)}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/task_queue/integration_tasks.py b/task_queue/integration_tasks.py new file mode 100644 index 0000000..8032882 --- /dev/null +++ b/task_queue/integration_tasks.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +""" +Queue tasks for file processing integration. +""" + +import os +import json +import time +from typing import Dict, List, Optional, Any + +from task_queue.config import huey +from task_queue.manager import queue_manager +from task_queue.task_status import task_status_store +from utils import download_dataset_files, save_processed_files_log + + +@huey.task() +def process_files_async( + unique_id: str, + files: Optional[Dict[str, List[str]]] = None, + system_prompt: Optional[str] = None, + mcp_settings: Optional[List[Dict]] = None, + task_id: Optional[str] = None +) -> Dict[str, Any]: + """ + 异步处理文件任务 - 与现有files/process API兼容 + + Args: + unique_id: 项目唯一ID + files: 按key分组的文件路径字典 + system_prompt: 系统提示词 + mcp_settings: MCP设置 + task_id: 任务ID(用于状态跟踪) + + Returns: + 处理结果字典 + """ + try: + print(f"开始异步处理文件任务,项目ID: {unique_id}") + + # 如果有task_id,设置初始状态 + if task_id: + task_status_store.set_status( + task_id=task_id, + unique_id=unique_id, + status="running" + ) + + # 确保项目目录存在 + project_dir = os.path.join("projects", unique_id) + if not os.path.exists(project_dir): + os.makedirs(project_dir, exist_ok=True) + + # 处理文件:使用按key分组格式 + processed_files_by_key = {} + if files: + # 使用请求中的文件(按key分组) + # 由于这是异步任务,需要同步调用 + import asyncio + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + processed_files_by_key = loop.run_until_complete(download_dataset_files(unique_id, files)) + total_files = sum(len(files_list) for files_list in processed_files_by_key.values()) + print(f"异步处理了 {total_files} 个数据集文件,涉及 {len(processed_files_by_key)} 个key,项目ID: {unique_id}") + else: + print(f"请求中未提供文件,项目ID: {unique_id}") + + # 收集项目目录下所有的 document.txt 文件 + document_files = [] + for root, dirs, files_list in os.walk(project_dir): + for file in files_list: + if file == "document.txt": + document_files.append(os.path.join(root, file)) + + # 保存system_prompt和mcp_settings到项目目录(如果提供) + if system_prompt: + system_prompt_file = os.path.join(project_dir, "system_prompt.md") + with open(system_prompt_file, 'w', encoding='utf-8') as f: + f.write(system_prompt) + print(f"已保存system_prompt,项目ID: {unique_id}") + + if mcp_settings: + mcp_settings_file = os.path.join(project_dir, "mcp_settings.json") + with open(mcp_settings_file, 'w', encoding='utf-8') as f: + json.dump(mcp_settings, f, ensure_ascii=False, indent=2) + print(f"已保存mcp_settings,项目ID: {unique_id}") + + # 生成项目README.md文件 + try: + from utils.project_manager import save_project_readme + save_project_readme(unique_id) + print(f"已生成README.md文件,项目ID: {unique_id}") + except Exception as e: + print(f"生成README.md失败,项目ID: {unique_id}, 错误: {str(e)}") + # 不影响主要处理流程,继续执行 + + # 构建结果文件列表 + result_files = [] + for key in processed_files_by_key.keys(): + # 添加对应的dataset document.txt路径 + document_path = os.path.join("projects", unique_id, "dataset", key, "document.txt") + if os.path.exists(document_path): + result_files.append(document_path) + + # 对于没有在processed_files_by_key中但存在的document.txt文件,也添加到结果中 + existing_document_paths = set(result_files) # 避免重复 + for doc_file in document_files: + if doc_file not in existing_document_paths: + result_files.append(doc_file) + + result = { + "status": "success", + "message": f"成功异步处理了 {len(result_files)} 个文档文件,涉及 {len(processed_files_by_key)} 个key", + "unique_id": unique_id, + "processed_files": result_files, + "processed_files_by_key": processed_files_by_key, + "document_files": document_files, + "total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()), + "processing_time": time.time() + } + + # 更新任务状态为完成 + if task_id: + task_status_store.update_status( + task_id=task_id, + status="completed", + result=result + ) + + print(f"异步文件处理任务完成: {unique_id}") + return result + + except Exception as e: + error_msg = f"异步处理文件时发生错误: {str(e)}" + print(error_msg) + + # 更新任务状态为错误 + if task_id: + task_status_store.update_status( + task_id=task_id, + status="failed", + error=error_msg + ) + + return { + "status": "error", + "message": error_msg, + "unique_id": unique_id, + "error": str(e) + } + + +@huey.task() +def cleanup_project_async( + unique_id: str, + remove_all: bool = False +) -> Dict[str, Any]: + """ + 异步清理项目文件 + + Args: + unique_id: 项目唯一ID + remove_all: 是否删除整个项目目录 + + Returns: + 清理结果字典 + """ + try: + print(f"开始异步清理项目,项目ID: {unique_id}") + + project_dir = os.path.join("projects", unique_id) + removed_items = [] + + if remove_all and os.path.exists(project_dir): + import shutil + shutil.rmtree(project_dir) + removed_items.append(project_dir) + result = { + "status": "success", + "message": f"已删除整个项目目录: {project_dir}", + "unique_id": unique_id, + "removed_items": removed_items, + "action": "remove_all" + } + else: + # 只清理处理日志 + log_file = os.path.join(project_dir, "processed_files.json") + if os.path.exists(log_file): + os.remove(log_file) + removed_items.append(log_file) + + result = { + "status": "success", + "message": f"已清理项目处理日志,项目ID: {unique_id}", + "unique_id": unique_id, + "removed_items": removed_items, + "action": "cleanup_logs" + } + + print(f"异步清理任务完成: {unique_id}") + return result + + except Exception as e: + error_msg = f"异步清理项目时发生错误: {str(e)}" + print(error_msg) + return { + "status": "error", + "message": error_msg, + "unique_id": unique_id, + "error": str(e) + } \ No newline at end of file diff --git a/task_queue/manager.py b/task_queue/manager.py new file mode 100644 index 0000000..9583009 --- /dev/null +++ b/task_queue/manager.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +""" +Queue manager for handling file processing queues. +""" + +import os +import json +import time +from typing import Dict, List, Optional, Any +from huey import Huey +from huey.api import Task +from datetime import datetime, timedelta + +from .config import huey +from .tasks import process_file_async, process_multiple_files_async, process_zip_file_async, cleanup_processed_files + + +class QueueManager: + """队列管理器,用于管理文件处理任务""" + + def __init__(self): + self.huey = huey + print(f"队列管理器已初始化,使用数据库: {os.path.join(os.path.dirname(__file__), '..', 'queue_data', 'huey.db')}") + + def enqueue_file( + self, + project_id: str, + file_path: str, + original_filename: str = None, + delay: int = 0 + ) -> str: + """ + 将文件加入处理队列 + + Args: + project_id: 项目ID + file_path: 文件路径 + original_filename: 原始文件名 + delay: 延迟执行时间(秒) + + Returns: + 任务ID + """ + if delay > 0: + task = process_file_async.schedule( + args=(project_id, file_path, original_filename), + delay=timedelta(seconds=delay) + ) + else: + task = process_file_async(project_id, file_path, original_filename) + + print(f"文件已加入队列: {file_path}, 任务ID: {task.id}") + return task.id + + def enqueue_multiple_files( + self, + project_id: str, + file_paths: List[str], + original_filenames: List[str] = None, + delay: int = 0 + ) -> List[str]: + """ + 将多个文件加入处理队列 + + Args: + project_id: 项目ID + file_paths: 文件路径列表 + original_filenames: 原始文件名列表 + delay: 延迟执行时间(秒) + + Returns: + 任务ID列表 + """ + if delay > 0: + task = process_multiple_files_async.schedule( + args=(project_id, file_paths, original_filenames), + delay=timedelta(seconds=delay) + ) + else: + task = process_multiple_files_async(project_id, file_paths, original_filenames) + + print(f"批量文件已加入队列: {len(file_paths)} 个文件, 任务ID: {task.id}") + return [task.id] + + def enqueue_zip_file( + self, + project_id: str, + zip_path: str, + extract_to: str = None, + delay: int = 0 + ) -> str: + """ + 将zip文件加入处理队列 + + Args: + project_id: 项目ID + zip_path: zip文件路径 + extract_to: 解压目标目录 + delay: 延迟执行时间(秒) + + Returns: + 任务ID + """ + if delay > 0: + task = process_zip_file_async.schedule( + args=(project_id, zip_path, extract_to), + delay=timedelta(seconds=delay) + ) + else: + task = process_zip_file_async(project_id, zip_path, extract_to) + + print(f"zip文件已加入队列: {zip_path}, 任务ID: {task.id}") + return task.id + + def get_task_status(self, task_id: str) -> Dict[str, Any]: + """ + 获取任务状态 + + Args: + task_id: 任务ID + + Returns: + 任务状态信息 + """ + try: + # 尝试从结果存储中获取任务结果 + try: + # 使用huey的内置方法检查结果 + if hasattr(self.huey, 'result') and self.huey.result: + result = self.huey.result(task_id) + if result is not None: + return { + "task_id": task_id, + "status": "complete", + "result": result + } + except Exception: + pass + + # 检查任务是否在待处理队列中 + try: + pending_tasks = list(self.huey.pending()) + for task in pending_tasks: + if hasattr(task, 'id') and task.id == task_id: + return { + "task_id": task_id, + "status": "pending" + } + except Exception: + pass + + # 检查任务是否在定时队列中 + try: + scheduled_tasks = list(self.huey.scheduled()) + for task in scheduled_tasks: + if hasattr(task, 'id') and task.id == task_id: + return { + "task_id": task_id, + "status": "scheduled" + } + except Exception: + pass + + # 如果都找不到,可能任务不存在或已完成但结果已清理 + return { + "task_id": task_id, + "status": "unknown", + "message": "任务状态未知,可能已完成或不存在" + } + + except Exception as e: + return { + "task_id": task_id, + "status": "error", + "message": f"获取任务状态失败: {str(e)}" + } + + def get_queue_stats(self) -> Dict[str, Any]: + """ + 获取队列统计信息 + + Returns: + 队列统计信息 + """ + try: + # 使用简化的统计方法 + stats = { + "total_tasks": 0, + "pending_tasks": 0, + "running_tasks": 0, + "completed_tasks": 0, + "error_tasks": 0, + "scheduled_tasks": 0, + "recent_tasks": [], + "queue_database": os.path.join(os.path.dirname(__file__), '..', 'queue_data', 'huey.db') + } + + # 尝试获取待处理任务数量 + try: + pending_tasks = list(self.huey.pending()) + stats["pending_tasks"] = len(pending_tasks) + stats["total_tasks"] += len(pending_tasks) + except Exception as e: + print(f"获取pending任务失败: {e}") + + # 尝试获取定时任务数量 + try: + scheduled_tasks = list(self.huey.scheduled()) + stats["scheduled_tasks"] = len(scheduled_tasks) + stats["total_tasks"] += len(scheduled_tasks) + except Exception as e: + print(f"获取scheduled任务失败: {e}") + + return stats + + except Exception as e: + return { + "error": f"获取队列统计信息失败: {str(e)}" + } + + def cancel_task(self, task_id: str) -> bool: + """ + 取消任务 + + Args: + task_id: 任务ID + + Returns: + 是否成功取消 + """ + try: + task = self.huey.get_task(task_id) + if task and task.get_status() in ["pending", "scheduled"]: + # Huey不直接支持取消任务,但可以从队列中移除 + # 这里返回False表示不能直接取消 + return False + else: + return False + except Exception as e: + print(f"取消任务失败: {str(e)}") + return False + + def cleanup_old_tasks(self, older_than_days: int = 7) -> Dict[str, Any]: + """ + 清理旧的任务记录 + + Args: + older_than_days: 清理多少天前的任务记录 + + Returns: + 清理结果 + """ + try: + # 简化的清理方法 - 清空整个队列 + self.huey.flush() + + return { + "status": "success", + "message": f"已清空队列(简化清理)", + "older_than_days": older_than_days, + "cleaned_count": "unknown (queue flushed)" + } + + except Exception as e: + return { + "status": "error", + "message": f"清理任务记录失败: {str(e)}" + } + + def enqueue_cleanup_task( + self, + project_id: str, + older_than_days: int = 30, + delay: int = 0 + ) -> str: + """ + 将清理任务加入队列 + + Args: + project_id: 项目ID + older_than_days: 清理多少天前的文件 + delay: 延迟执行时间(秒) + + Returns: + 任务ID + """ + if delay > 0: + task = cleanup_processed_files.schedule( + args=(project_id, older_than_days), + delay=timedelta(seconds=delay) + ) + else: + task = cleanup_processed_files(project_id, older_than_days) + + print(f"清理任务已加入队列: 项目 {project_id}, 任务ID: {task.id}") + return task.id + + def list_pending_tasks(self, limit: int = 50) -> List[Dict[str, Any]]: + """ + 列出待处理的任务 + + Args: + limit: 返回的最大任务数 + + Returns: + 待处理任务列表 + """ + try: + pending_tasks = [] + + # 获取pending任务 + try: + tasks = list(self.huey.pending()) + for i, task in enumerate(tasks[:limit]): + if hasattr(task, 'id'): + pending_tasks.append({ + "task_id": task.id, + "status": "pending", + }) + except Exception as e: + print(f"获取pending任务失败: {e}") + + # 获取scheduled任务 + try: + tasks = list(self.huey.scheduled()) + for i, task in enumerate(tasks[:limit - len(pending_tasks)]): + if hasattr(task, 'id'): + pending_tasks.append({ + "task_id": task.id, + "status": "scheduled", + }) + except Exception as e: + print(f"获取scheduled任务失败: {e}") + + return pending_tasks + + except Exception as e: + print(f"获取待处理任务失败: {str(e)}") + return [] + + def get_task_result(self, task_id: str) -> Optional[Any]: + """ + 获取任务结果 + + Args: + task_id: 任务ID + + Returns: + 任务结果,如果任务未完成则返回None + """ + try: + task = self.huey.get_task(task_id) + if task and task.get_status() in ["complete", "finished"]: + return task.get() + return None + except Exception as e: + print(f"获取任务结果失败: {str(e)}") + return None + + +# 全局队列管理器实例 +queue_manager = QueueManager() \ No newline at end of file diff --git a/task_queue/task_status.py b/task_queue/task_status.py new file mode 100644 index 0000000..1437d28 --- /dev/null +++ b/task_queue/task_status.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +任务状态SQLite存储系统 +""" + +import json +import os +import sqlite3 +import time +from typing import Dict, Optional, Any, List +from pathlib import Path + + +class TaskStatusStore: + """基于SQLite的任务状态存储器""" + + def __init__(self, db_path: str = "queue_data/task_status.db"): + self.db_path = db_path + # 确保目录存在 + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + self._init_database() + + def _init_database(self): + """初始化数据库表""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(''' + CREATE TABLE IF NOT EXISTS task_status ( + task_id TEXT PRIMARY KEY, + unique_id TEXT NOT NULL, + status TEXT NOT NULL, + created_at REAL NOT NULL, + updated_at REAL NOT NULL, + result TEXT, + error TEXT + ) + ''') + conn.commit() + + def set_status(self, task_id: str, unique_id: str, status: str, + result: Optional[Dict] = None, error: Optional[str] = None): + """设置任务状态""" + current_time = time.time() + + with sqlite3.connect(self.db_path) as conn: + conn.execute(''' + INSERT OR REPLACE INTO task_status + (task_id, unique_id, status, created_at, updated_at, result, error) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', ( + task_id, unique_id, status, current_time, current_time, + json.dumps(result) if result else None, + error + )) + conn.commit() + + def get_status(self, task_id: str) -> Optional[Dict]: + """获取任务状态""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + 'SELECT * FROM task_status WHERE task_id = ?', (task_id,) + ) + row = cursor.fetchone() + + if not row: + return None + + result = dict(row) + # 解析JSON字段 + if result['result']: + result['result'] = json.loads(result['result']) + + return result + + def update_status(self, task_id: str, status: str, + result: Optional[Dict] = None, error: Optional[str] = None): + """更新任务状态""" + with sqlite3.connect(self.db_path) as conn: + # 检查任务是否存在 + cursor = conn.execute( + 'SELECT task_id FROM task_status WHERE task_id = ?', (task_id,) + ) + if not cursor.fetchone(): + return False + + # 更新状态 + conn.execute(''' + UPDATE task_status + SET status = ?, updated_at = ?, result = ?, error = ? + WHERE task_id = ? + ''', ( + status, time.time(), + json.dumps(result) if result else None, + error, task_id + )) + conn.commit() + return True + + def delete_status(self, task_id: str): + """删除任务状态""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + 'DELETE FROM task_status WHERE task_id = ?', (task_id,) + ) + conn.commit() + return cursor.rowcount > 0 + + def list_all(self) -> Dict[str, Dict]: + """列出所有任务状态""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + 'SELECT * FROM task_status ORDER BY updated_at DESC' + ) + all_tasks = {} + for row in cursor: + result = dict(row) + # 解析JSON字段 + if result['result']: + result['result'] = json.loads(result['result']) + all_tasks[result['task_id']] = result + return all_tasks + + def get_by_unique_id(self, unique_id: str) -> List[Dict]: + """根据项目ID获取所有相关任务""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + 'SELECT * FROM task_status WHERE unique_id = ? ORDER BY updated_at DESC', + (unique_id,) + ) + tasks = [] + for row in cursor: + result = dict(row) + if result['result']: + result['result'] = json.loads(result['result']) + tasks.append(result) + return tasks + + def cleanup_old_tasks(self, older_than_days: int = 7) -> int: + """清理旧任务记录""" + cutoff_time = time.time() - (older_than_days * 24 * 3600) + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + 'DELETE FROM task_status WHERE updated_at < ?', + (cutoff_time,) + ) + conn.commit() + return cursor.rowcount + + def get_statistics(self) -> Dict[str, Any]: + """获取任务统计信息""" + with sqlite3.connect(self.db_path) as conn: + # 总任务数 + total = conn.execute('SELECT COUNT(*) FROM task_status').fetchone()[0] + + # 按状态分组统计 + status_stats = conn.execute(''' + SELECT status, COUNT(*) as count + FROM task_status + GROUP BY status + ''').fetchall() + + # 最近24小时的任务 + recent = time.time() - (24 * 3600) + recent_tasks = conn.execute( + 'SELECT COUNT(*) FROM task_status WHERE updated_at > ?', + (recent,) + ).fetchone()[0] + + return { + 'total_tasks': total, + 'status_breakdown': dict(status_stats), + 'recent_24h': recent_tasks, + 'database_path': self.db_path + } + + def search_tasks(self, status: Optional[str] = None, + unique_id: Optional[str] = None, + limit: int = 100) -> List[Dict]: + """搜索任务""" + query = 'SELECT * FROM task_status WHERE 1=1' + params = [] + + if status: + query += ' AND status = ?' + params.append(status) + + if unique_id: + query += ' AND unique_id = ?' + params.append(unique_id) + + query += ' ORDER BY updated_at DESC LIMIT ?' + params.append(limit) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(query, params) + tasks = [] + for row in cursor: + result = dict(row) + if result['result']: + result['result'] = json.loads(result['result']) + tasks.append(result) + return tasks + + +# 全局状态存储实例 +task_status_store = TaskStatusStore() \ No newline at end of file diff --git a/task_queue/tasks.py b/task_queue/tasks.py new file mode 100644 index 0000000..8c35ef9 --- /dev/null +++ b/task_queue/tasks.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +""" +File processing tasks for the queue system. +""" + +import os +import json +import time +import shutil +from pathlib import Path +from typing import Dict, List, Optional, Any +from huey import crontab + +from .config import huey +from utils.file_utils import ( + extract_zip_file, + get_file_hash, + is_file_already_processed, + load_processed_files_log, + save_processed_files_log, + get_document_preview +) + + +@huey.task() +def process_file_async( + project_id: str, + file_path: str, + original_filename: str = None, + target_directory: str = "files" +) -> Dict[str, Any]: + """ + 异步处理单个文件 + + Args: + project_id: 项目ID + file_path: 文件路径 + original_filename: 原始文件名 + target_directory: 目标目录 + + Returns: + 处理结果字典 + """ + try: + print(f"开始处理文件: {file_path}") + + # 确保项目目录存在 + project_dir = os.path.join("projects", project_id) + files_dir = os.path.join(project_dir, target_directory) + os.makedirs(files_dir, exist_ok=True) + + # 获取文件hash作为标识 + file_hash = get_file_hash(file_path) + + # 检查文件是否已处理 + processed_log = load_processed_files_log(project_id) + if file_hash in processed_log: + print(f"文件已处理,跳过: {file_path}") + return { + "status": "skipped", + "message": "文件已处理", + "file_hash": file_hash, + "project_id": project_id + } + + # 处理文件 + result = _process_single_file( + file_path, + files_dir, + original_filename or os.path.basename(file_path) + ) + + # 更新处理日志 + if result["status"] == "success": + processed_log[file_hash] = { + "original_path": file_path, + "original_filename": original_filename or os.path.basename(file_path), + "processed_at": str(time.time()), + "status": "processed", + "result": result + } + save_processed_files_log(project_id, processed_log) + + result["file_hash"] = file_hash + result["project_id"] = project_id + + print(f"文件处理完成: {file_path}, 状态: {result['status']}") + return result + + except Exception as e: + error_msg = f"处理文件时发生错误: {str(e)}" + print(error_msg) + return { + "status": "error", + "message": error_msg, + "file_path": file_path, + "project_id": project_id + } + + +@huey.task() +def process_multiple_files_async( + project_id: str, + file_paths: List[str], + original_filenames: List[str] = None +) -> List[Dict[str, Any]]: + """ + 批量异步处理多个文件 + + Args: + project_id: 项目ID + file_paths: 文件路径列表 + original_filenames: 原始文件名列表 + + Returns: + 处理结果列表 + """ + try: + print(f"开始批量处理 {len(file_paths)} 个文件") + + results = [] + for i, file_path in enumerate(file_paths): + original_filename = original_filenames[i] if original_filenames and i < len(original_filenames) else None + + # 为每个文件创建异步任务 + result = process_file_async(project_id, file_path, original_filename) + results.append(result) + + print(f"批量文件处理任务已提交,共 {len(results)} 个文件") + return results + + except Exception as e: + error_msg = f"批量处理文件时发生错误: {str(e)}" + print(error_msg) + return [{ + "status": "error", + "message": error_msg, + "project_id": project_id + }] + + +@huey.task() +def process_zip_file_async( + project_id: str, + zip_path: str, + extract_to: str = None +) -> Dict[str, Any]: + """ + 异步处理zip压缩文件 + + Args: + project_id: 项目ID + zip_path: zip文件路径 + extract_to: 解压目标目录 + + Returns: + 处理结果字典 + """ + try: + print(f"开始处理zip文件: {zip_path}") + + # 设置解压目录 + if extract_to is None: + extract_to = os.path.join("projects", project_id, "extracted", os.path.basename(zip_path)) + + os.makedirs(extract_to, exist_ok=True) + + # 解压文件 + extracted_files = extract_zip_file(zip_path, extract_to) + + if not extracted_files: + return { + "status": "error", + "message": "解压失败或没有找到支持的文件", + "zip_path": zip_path, + "project_id": project_id + } + + # 批量处理解压后的文件 + result = process_multiple_files_async(project_id, extracted_files) + + return { + "status": "success", + "message": f"zip文件处理完成,解压出 {len(extracted_files)} 个文件", + "zip_path": zip_path, + "extract_to": extract_to, + "extracted_files": extracted_files, + "project_id": project_id, + "batch_task_result": result + } + + except Exception as e: + error_msg = f"处理zip文件时发生错误: {str(e)}" + print(error_msg) + return { + "status": "error", + "message": error_msg, + "zip_path": zip_path, + "project_id": project_id + } + + +@huey.task() +def cleanup_processed_files( + project_id: str, + older_than_days: int = 30 +) -> Dict[str, Any]: + """ + 清理旧的处理文件 + + Args: + project_id: 项目ID + older_than_days: 清理多少天前的文件 + + Returns: + 清理结果字典 + """ + try: + print(f"开始清理项目 {project_id} 中 {older_than_days} 天前的文件") + + project_dir = os.path.join("projects", project_id) + if not os.path.exists(project_dir): + return { + "status": "error", + "message": "项目目录不存在", + "project_id": project_id + } + + current_time = time.time() + cutoff_time = current_time - (older_than_days * 24 * 3600) + cleaned_files = [] + + # 遍历项目目录 + for root, dirs, files in os.walk(project_dir): + for file in files: + file_path = os.path.join(root, file) + file_mtime = os.path.getmtime(file_path) + + if file_mtime < cutoff_time: + try: + os.remove(file_path) + cleaned_files.append(file_path) + print(f"已删除旧文件: {file_path}") + except Exception as e: + print(f"删除文件失败 {file_path}: {str(e)}") + + # 清理空目录 + for root, dirs, files in os.walk(project_dir, topdown=False): + for dir in dirs: + dir_path = os.path.join(root, dir) + try: + if not os.listdir(dir_path): + os.rmdir(dir_path) + print(f"已删除空目录: {dir_path}") + except Exception as e: + print(f"删除目录失败 {dir_path}: {str(e)}") + + return { + "status": "success", + "message": f"清理完成,删除了 {len(cleaned_files)} 个文件", + "project_id": project_id, + "cleaned_files": cleaned_files, + "older_than_days": older_than_days + } + + except Exception as e: + error_msg = f"清理文件时发生错误: {str(e)}" + print(error_msg) + return { + "status": "error", + "message": error_msg, + "project_id": project_id + } + + +def _process_single_file( + file_path: str, + target_dir: str, + original_filename: str +) -> Dict[str, Any]: + """ + 处理单个文件的内部方法 + + Args: + file_path: 源文件路径 + target_dir: 目标目录 + original_filename: 原始文件名 + + Returns: + 处理结果字典 + """ + try: + # 检查文件是否存在 + if not os.path.exists(file_path): + return { + "status": "error", + "message": "源文件不存在", + "file_path": file_path + } + + # 获取文件信息 + file_size = os.path.getsize(file_path) + file_ext = os.path.splitext(original_filename)[1].lower() + + # 根据文件类型进行不同处理 + supported_extensions = ['.txt', '.md', '.pdf', '.doc', '.docx', '.zip'] + + if file_ext not in supported_extensions: + return { + "status": "error", + "message": f"不支持的文件类型: {file_ext}", + "file_path": file_path, + "supported_extensions": supported_extensions + } + + # 复制文件到目标目录 + target_file_path = os.path.join(target_dir, original_filename) + + # 如果目标文件已存在,添加时间戳 + if os.path.exists(target_file_path): + name, ext = os.path.splitext(original_filename) + timestamp = int(time.time()) + target_file_path = os.path.join(target_dir, f"{name}_{timestamp}{ext}") + + shutil.copy2(file_path, target_file_path) + + # 获取文件预览(如果是文本文件) + preview = None + if file_ext in ['.txt', '.md']: + preview = get_document_preview(target_file_path, max_lines=5) + + return { + "status": "success", + "message": "文件处理成功", + "original_path": file_path, + "target_path": target_file_path, + "file_size": file_size, + "file_extension": file_ext, + "preview": preview + } + + except Exception as e: + return { + "status": "error", + "message": f"处理文件时发生错误: {str(e)}", + "file_path": file_path + } + + +# 定期任务示例:每天凌晨2点清理30天前的文件 +@huey.periodic_task(crontab(hour=2, minute=0)) +def daily_cleanup(): + """每日清理任务""" + print("执行每日清理任务") + # 这里可以添加清理逻辑 + return {"status": "completed", "message": "每日清理任务完成"} \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py index 7dfe718..5b81ca0 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -69,6 +69,10 @@ from .api_models import ( ProjectListResponse, ProjectStatsResponse, ProjectActionResponse, + QueueTaskRequest, + QueueTaskResponse, + QueueStatusResponse, + TaskStatusResponse, create_success_response, create_error_response, create_chat_response @@ -134,6 +138,10 @@ __all__ = [ 'ProjectListResponse', 'ProjectStatsResponse', 'ProjectActionResponse', + 'QueueTaskRequest', + 'QueueTaskResponse', + 'QueueStatusResponse', + 'TaskStatusResponse', 'create_success_response', 'create_error_response', 'create_chat_response' diff --git a/utils/api_models.py b/utils/api_models.py index 6bae887..81774c1 100644 --- a/utils/api_models.py +++ b/utils/api_models.py @@ -199,6 +199,66 @@ def create_error_response(message: str, error_type: str = "error", **kwargs) -> } +class QueueTaskRequest(BaseModel): + """队列任务请求模型""" + unique_id: str + files: Optional[Dict[str, List[str]]] = Field(default=None, description="Files organized by key groups. Each key maps to a list of file paths (supports zip files)") + system_prompt: Optional[str] = None + mcp_settings: Optional[List[Dict]] = None + priority: Optional[int] = Field(default=0, description="Task priority (higher number = higher priority)") + delay: Optional[int] = Field(default=0, description="Delay execution by N seconds") + + model_config = ConfigDict(extra='allow') + + @field_validator('files', mode='before') + @classmethod + def validate_files(cls, v): + """Validate dict format with key-grouped files""" + if v is None: + return None + if isinstance(v, dict): + # Validate dict format + for key, value in v.items(): + if not isinstance(key, str): + raise ValueError(f"Key in files dict must be string, got {type(key)}") + if not isinstance(value, list): + raise ValueError(f"Value in files dict must be list, got {type(value)} for key '{key}'") + for item in value: + if not isinstance(item, str): + raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'") + return v + else: + raise ValueError(f"Files must be a dict with key groups, got {type(v)}") + + +class QueueTaskResponse(BaseModel): + """队列任务响应模型""" + success: bool + message: str + unique_id: str + task_id: Optional[str] = None + task_status: Optional[str] = None + estimated_processing_time: Optional[int] = None # seconds + + +class QueueStatusResponse(BaseModel): + """队列状态响应模型""" + success: bool + message: str + queue_stats: Dict[str, Any] + pending_tasks: List[Dict[str, Any]] + + +class TaskStatusResponse(BaseModel): + """任务状态响应模型""" + success: bool + message: str + task_id: str + task_status: Optional[str] = None + task_result: Optional[Dict[str, Any]] = None + error: Optional[str] = None + + def create_chat_response( messages: List[Message], model: str,