diff --git a/.features/skill/MEMORY.md b/.features/skill/MEMORY.md index f7516e9..915fac2 100644 --- a/.features/skill/MEMORY.md +++ b/.features/skill/MEMORY.md @@ -1,6 +1,7 @@ # Skill 功能 > 负责范围:技能包管理服务 - 核心实现 +> 最后更新:2026-05-26 > 最后更新:2026-05-23 > 最后更新:2026-04-20 @@ -30,6 +31,10 @@ MCP UI 类 skill 已按 MCP Apps 模式改造:工具返回数据,静态 HTML ## 最近重要事项 +- [2026-05-26](changelog/2026-Q2.md): skill 引入 `category` 字段——`routes/skill_manager.py` 在 `SkillItem` / `SkillValidationResult` 增加 `category`,从 `plugin.json` 与 `SKILL.md` frontmatter 解析,official skill 默认 `"other"`、user skill 默认 `"custom"`;并通过 batch 给 common/developing/onprem/support 路径下大量 skill 元数据补 `category`,`data-dashboard` / `mcp-ui` 归类 `Interactive UI`(`203dcf4`, `3ada55a`, `9658588`) +- [2026-05-26](changelog/2026-Q2.md): developing 分支大合并新增多个 skill:`ai-ppt-generator`(百度 AI PPT)、`nfc-medicine-lookup`(NFC 药品检索)、`ppt-outline`(PPT 大纲 / HTML 演示文稿)、`z-card-image`(配图 / 卡片图),同时 `skills/linggan/*` 系列 skill 经合并回归(`3ada55a`) +- [2026-05-23](changelog/2026-Q2.md): 新增 MCP App 型 `skills/developing/ecommerce-storefront/`——含 `product-list` / `order-confirm` 两个 HTML App + 自带 `ecommerce_server.py` MCP server;同时落地 `docs/mcp-app-training.md`(约 1063 行)作为 MCP App 培训材料(`9d001c8`) +- [2026-05-21](changelog/2026-Q2.md): Daytona 沙箱模式下 `init_agent` 在沙箱内写入 `BASH_ENV` 文件,注入 `ASSISTANT_ID` / `USER_IDENTIFIER` / `TRACE_ID` / `ENABLE_SELF_KNOWLEDGE` 与 `config.shell_env` 的 shell 环境变量(`776acc2`) - [2026-05-12](changelog/2026-Q2.md): 跨 6→10 个 skill 变体批量精修 `retrieval-policy*.md`,统一 onprem/support/autoload 各路径下的 policy 口径(`be96f24`, `7b4f03d`) - [2026-05-11](changelog/2026-Q2.md): 新增子 agent (SubAgent) 支持——skill 包通过 `agents/*.md` 暴露子 agent,由 `SubAgentMiddleware` 加载;附 `pmda-drug-info` skill 示例(`5b634bc`) - [2026-05-11](changelog/2026-Q2.md): `pmda-drug-info` 的 `pmda_server.py` 大改为 mock 实现(`a92096a`) @@ -77,6 +82,9 @@ MCP UI 类 skill 已按 MCP Apps 模式改造:工具返回数据,静态 HTML - ⚠️ **MCP `_meta.trace_id` 是全局 monkey-patch 注入**:`agent/mcp_trace_meta.patch_mcp_client_session_trace_meta()` 在 `get_tools_from_mcp()` 入口调用一次后,会把 `mcp.ClientSession.call_tool` 永久包装;仅对工具名在 `{"rag_retrieve", "table_rag_retrieve"}` 集合内的调用注入 `_meta.trace_id`,扩展白名单要直接改 `_TRACE_META_TOOL_NAMES` 常量。 - ⚠️ **PrePrompt hook 内容位置由模板决定**:自 2026-04-23 起 hook 产出通过 `{hook_content}` 占位符注入 `prompt/system_prompt.md`,不再追加在 prompt 末尾;自定义模板必须包含 `{hook_content}` 占位符否则 hook 内容会丢失。 - ⚠️ **`init_agent` 返回值已变 3 元素**:Daytona 改造后 `init_agent` 返回 `(agent, checkpointer, sandbox)`;调用方解构必须更新。 +- ⚠️ **skill `category` 默认值**:API 返回的 `SkillItem.category`——official skill fallback 为 `"other"`、user skill fallback 为 `"custom"`;前端做分类视图时需要同时识别这两个 sentinel,不要假设官方/用户 skill 用同一套缺省值。 +- ⚠️ **`category` 字段双入口**:同一 skill 可以同时在 `.claude-plugin/plugin.json` 和 `SKILL.md` frontmatter 写 `category`;`get_skill_metadata` 优先走 `parse_plugin_json`,若 skill 包没有 plugin.json 才回落到 `parse_skill_frontmatter`——两者写不一致时以 plugin.json 为准。 +- ⚠️ **Daytona shell_env 是文件注入而非 process env**:`init_agent` 通过 `cat > $REMOTE_BASH_ENV_PATH` 写入 `export VAR=...` 行,沙箱内必须由 shell(bash)的 `BASH_ENV` 加载才能生效;非 daytona 模式或不走 bash 启动的脚本拿不到这些变量。扩展注入项需直接改 `init_agent` 里的 `_shell_env` 字典。 ## Skill 目录结构 diff --git a/.features/skill/changelog/2026-Q2.md b/.features/skill/changelog/2026-Q2.md index 3d70869..ca34c8e 100644 --- a/.features/skill/changelog/2026-Q2.md +++ b/.features/skill/changelog/2026-Q2.md @@ -4,6 +4,126 @@ --- +## 2026-05-26: skill `category` 字段全面接入 + +**类型**:新功能 + +**背景**:skill 数量越来越多(common / developing / onprem / support / linggan / autoload 各路径下数十个),列表 API 需要前端能按类别分组展示,元数据层面缺少 `category` 字段。 + +**改动**: +- `routes/skill_manager.py`: + - `SkillItem` model 新增 `category: str = "other"`。 + - `SkillValidationResult` dataclass 新增可选 `category: Optional[str]`。 + - `parse_plugin_json` 解析 `plugin_config.get('category')`;`parse_skill_frontmatter` 解析 frontmatter 的 `metadata.get('category')`。 + - `get_official_skills` 中 fallback 为 `"other"`;`get_user_skills` 中 fallback 为 `"custom"`。 + - `get_skill_metadata_legacy` 在 `category` 非空时写入返回 dict(保持向后兼容)。 +- 批量给 common / developing / onprem / support 多个 skill 的 `.claude-plugin/plugin.json` 与 `SKILL.md` frontmatter 添加 `category` 字段。 +- `data-dashboard` 与 `mcp-ui` 的 `category` 从 `"Data & Retrieval"` 修正为 `"Interactive UI"`(更贴切 MCP App 的渲染语义)。 + +**根因**:N/A(新功能) + +**影响**: +- `GET /api/v1/skill/list` 返回项现在包含 `category` 字段;前端可按 category 维度做分组/筛选。 +- skill 元数据约定扩展——新 skill 应在 plugin.json 或 SKILL.md frontmatter 中写明 `category`,否则会落到 `"other"` / `"custom"` 兜底。 +- `plugin.json.category` 与 `SKILL.md.category` 同时存在时以前者为准(`get_skill_metadata` 优先 plugin.json)。 + +**相关文件**: +- `routes/skill_manager.py` +- `skills/common/data-dashboard/.claude-plugin/plugin.json` +- `skills/common/mcp-ui/.claude-plugin/plugin.json` +- 以及一批 `skills/{common,developing,onprem,support}/*/SKILL.md` 与 `.claude-plugin/plugin.json` + +**Commit/PR**:`203dcf4`, `3ada55a`, `9658588` + +--- + +## 2026-05-26: developing 分支批量新增多类 skill + +**类型**:新功能 + +**背景**:[待补充]——经 developing→staging 合并集中落地一批新 skill 与 linggan 系列 skill 回归。 + +**改动**: +- 新增 `skills/developing/ai-ppt-generator/`:调用百度 AI 生成 PPT,按 topic 自动选模板(商务/科技/教育/创意/中国风等);`category: Document Processing`。 +- 新增 `skills/developing/nfc-medicine-lookup/`:通过 NFC 芯片 ID 或药品名称查询药品信息,面向老年用户的语音助手交互口径;`category: Developer Tools`。 +- 新增 `skills/developing/ppt-outline/`:PPT 大纲与独立 HTML 演示文稿生成(dark/light/tech/minimal 四种风格);`category: Document Processing`。 +- 新增 `skills/developing/z-card-image/`:生成配图、封面图、卡片图、社媒帖子分享图等;依赖 `python3` + `google-chrome`。 +- `skills/developing/static-hosting/SKILL.md` 由 1 行说明扩展为完整 80 行 skill;同时一批已有 SKILL.md / plugin.json 补 `category`。 +- `skills/linggan/*` 系列 skill(baidu-search / bot-self-modifier / caiyun-weather / competitor-news-intel / contract-document-generator / financial-report-generator / market-academic-insight / ragflow-loader / sales-decision-report / seedream / static-hosting / static-site-deploy / voice-notification / weather-china)经合并回归 staging。 + +**根因**:N/A + +**影响**: +- developing skill 池扩张约 5 个新业务 skill;linggan 系列重新出现在 staging。 +- 新 skill 多为 SKILL.md 型业务 skill,符合"workflow + 模板"的纯 markdown 模式;其中 `ai-ppt-generator`、`z-card-image` 依赖外部 `BAIDU_API_KEY` 或 `google-chrome` 二进制。 + +**相关文件**: +- `skills/developing/ai-ppt-generator/SKILL.md` +- `skills/developing/nfc-medicine-lookup/SKILL.md` +- `skills/developing/ppt-outline/SKILL.md` +- `skills/developing/z-card-image/SKILL.md` +- `skills/developing/static-hosting/SKILL.md` +- `skills/linggan/**`(回归) + +**Commit/PR**:`3ada55a` + +--- + +## 2026-05-23: 新增 ecommerce-storefront skill(MCP App 型)+ MCP App 培训文档 + +**类型**:新功能 + +**背景**:MCP App 模式(host 加载静态 HTML + postMessage 传数据)已经在 `mcp-ui`、`data-dashboard` 上跑通,需要一个面向电商场景的样例 skill,演示产品浏览 / 选购 / 下单确认这类多步交互的 App 渲染;同时沉淀一份 MCP App 开发指南。 + +**改动**: +- 新增 `skills/developing/ecommerce-storefront/`: + - `apps/product-list.html`(288 行)与 `apps/order-confirm.html`(233 行)两个静态 App。 + - `ecommerce_server.py`(213 行)作为自带 MCP server,`ecommerce_tools.json` 定义工具 schema。 + - `hooks/ecommerce_guide.md` + `hooks/pre_prompt.py` 注入 skill 使用指引到 system prompt。 + - `mcp_common.py`(252 行)复用 MCP 通用工具基类。 + - `.claude-plugin/plugin.json` 配置 PrePrompt hook 与 stdio MCP server,`category: Developer Tools`。 +- 新增 `docs/mcp-app-training.md`(约 1063 行):MCP App 模式的开发培训材料。 + +**根因**:N/A + +**影响**: +- developing skill 池新增一个 MCP App 型 skill,体例对齐 `mcp-ui` / `data-dashboard`。 +- MCP App 开发者有完整培训材料可参考。 + +**相关文件**: +- `skills/developing/ecommerce-storefront/**` +- `docs/mcp-app-training.md` + +**Commit/PR**:`9d001c8` + +--- + +## 2026-05-21: Daytona 沙箱注入 shell_env 到 BASH_ENV + +**类型**:新功能 + +**背景**:Daytona 沙箱内的 skill 脚本需要能读取 `ASSISTANT_ID` / `USER_IDENTIFIER` / `TRACE_ID` 等运行时上下文,但宿主 process env 无法直接透传到沙箱里。 + +**改动**: +- `agent/deep_assistant.py` `init_agent`:当 `sandbox is not None and sandbox_type == "daytona"` 时,组装 `_shell_env` 字典(`ASSISTANT_ID` / `USER_IDENTIFIER` / `TRACE_ID` / `ENABLE_SELF_KNOWLEDGE` 加上 `config.shell_env`),构造 `cd {REMOTE_WORKSPACE_ROOT}\n` + `export VAR="..."` 行,通过 `sandbox.execute("cat > $REMOTE_BASH_ENV_PATH << 'ENVEOF' ... ENVEOF")` 写入沙箱内。 +- `utils/daytona_sync.py` 提供常量 `REMOTE_BASH_ENV_PATH` / `REMOTE_WORKSPACE_ROOT`。 +- `AgentConfig` 增加 `shell_env: Optional[Dict[str, str]]`(调用方可追加自定义 env)。 + +**根因**:N/A + +**影响**: +- 沙箱内通过 bash 启动的 skill 脚本可以 `os.environ.get("ASSISTANT_ID")` 等读到运行时上下文。 +- 仅 daytona 沙箱模式生效;本地或非 bash 启动的进程不会收到 `BASH_ENV` 注入的变量。 +- 扩展注入项(新增固定环境变量)需要直接改 `init_agent` 里的 `_shell_env` 字典。 + +**相关文件**: +- `agent/deep_assistant.py` +- `utils/daytona_sync.py` + +**Commit/PR**:`776acc2` + +--- + ## 2026-05-12: 批量精修 retrieval policy 文案 **类型**:内容调整 diff --git a/db_manager.py b/db_manager.py deleted file mode 100755 index c7708fb..0000000 --- a/db_manager.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python3 -""" -SQLite task status database management tool -""" - -import sqlite3 -import json -import time -from task_queue.task_status import task_status_store - -def view_database(): - """View database contents""" - print("SQLite task status database contents") - print("=" * 40) - print(f"Database path: {task_status_store.db_path}") - - # Connect to the database - conn = sqlite3.connect(task_status_store.db_path) - cursor = conn.cursor() - - # View table schema - print(f"\nTable schema:") - cursor.execute("PRAGMA table_info(task_status)") - columns = cursor.fetchall() - for col in columns: - print(f" {col[1]} ({col[2]})") - - # View all records - print(f"\nAll records:") - cursor.execute("SELECT * FROM task_status ORDER BY updated_at DESC") - rows = cursor.fetchall() - - if not rows: - print(" (empty database)") - else: - print(f" Total {len(rows)} records:") - for i, row in enumerate(rows): - task_id, unique_id, status, created_at, updated_at, result, error = row - created_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at)) - updated_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(updated_at)) - - print(f" {i+1}. {task_id}") - print(f" Project ID: {unique_id}") - print(f" Status: {status}") - print(f" Created: {created_str}") - print(f" Updated: {updated_str}") - if result: - try: - result_data = json.loads(result) - print(f" Result: {result_data.get('message', 'N/A')}") - except: - print(f" Result: {result[:50]}...") - if error: - print(f" Error: {error}") - print() - - conn.close() - -def run_query(sql_query: str): - """Run a custom query""" - print(f"Running query: {sql_query}") - - try: - conn = sqlite3.connect(task_status_store.db_path) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - cursor.execute(sql_query) - rows = cursor.fetchall() - - if not rows: - print(" (no results)") - else: - print(f" {len(rows)} results:") - for row in rows: - print(f" {dict(row)}") - - conn.close() - - except Exception as e: - print(f"Query failed: {e}") - -def interactive_shell(): - """Interactive database management""" - print("\n🖥️ Interactive database management") - print("Type 'help' to view available commands, or 'quit' to exit") - - while True: - try: - command = input("\n> ").strip() - - if command.lower() in ['quit', 'exit', 'q']: - break - elif command.lower() == 'help': - print(""" -Available commands: - view - View all records - stats - View statistics - pending - View pending tasks - completed - View completed tasks - failed - View failed tasks - sql - Run an SQL query - cleanup - Clean up records older than N days - count - Count total tasks - help - Show help - quit/exit/q - Exit - """) - elif command.lower() == 'view': - view_database() - elif command.lower() == 'stats': - stats = task_status_store.get_statistics() - print(f"Statistics:") - print(f" Total tasks: {stats['total_tasks']}") - print(f" Status breakdown: {stats['status_breakdown']}") - print(f" Last 24 hours: {stats['recent_24h']}") - elif command.lower() == 'pending': - tasks = task_status_store.search_tasks(status="pending") - print(f"Pending tasks ({len(tasks)}):") - for task in tasks: - print(f" - {task['task_id']}: {task['unique_id']}") - elif command.lower() == 'completed': - tasks = task_status_store.search_tasks(status="completed") - print(f"Completed tasks ({len(tasks)}):") - for task in tasks: - print(f" - {task['task_id']}: {task['unique_id']}") - elif command.lower() == 'failed': - tasks = task_status_store.search_tasks(status="failed") - print(f"Failed tasks ({len(tasks)}):") - for task in tasks: - print(f" - {task['task_id']}: {task['unique_id']}") - elif command.lower().startswith('sql '): - sql_query = command[4:] - run_query(sql_query) - elif command.lower().startswith('cleanup '): - try: - days = int(command[8:]) - count = task_status_store.cleanup_old_tasks(days) - print(f"Cleaned up {count} records older than {days} days") - except ValueError: - print("Please enter a valid number of days") - elif command.lower() == 'count': - all_tasks = task_status_store.list_all() - print(f"Total tasks: {len(all_tasks)}") - else: - print("Unknown command. Type 'help' for help") - - except KeyboardInterrupt: - print("\nGoodbye!") - break - except Exception as e: - print(f"Execution error: {e}") - -def main(): - """Main function""" - import sys - - if len(sys.argv) > 1: - if sys.argv[1] == 'view': - view_database() - elif sys.argv[1] == 'interactive': - interactive_shell() - else: - print("Usage: python db_manager.py [view|interactive]") - else: - view_database() - interactive_shell() - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index a6920a7..7c6806d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1790,21 +1790,6 @@ files = [ {file = "httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d"}, ] -[[package]] -name = "huey" -version = "2.5.3" -description = "huey, a little task queue" -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "huey-2.5.3.tar.gz", hash = "sha256:089fc72b97fd26a513f15b09925c56fad6abe4a699a1f0e902170b37e85163c7"}, -] - -[package.extras] -backends = ["redis (>=3.0.0)"] -redis = ["redis (>=3.0.0)"] - [[package]] name = "huggingface-hub" version = "0.35.3" @@ -4825,6 +4810,23 @@ urllib3 = ">=1.26.14,<3" fastembed = ["fastembed (>=0.7,<0.8)"] fastembed-gpu = ["fastembed-gpu (>=0.7,<0.8)"] +[[package]] +name = "redis" +version = "6.4.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f"}, + {file = "redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010"}, +] + +[package.extras] +hiredis = ["hiredis (>=3.2.0)"] +jwt = ["pyjwt (>=2.9.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"] + [[package]] name = "referencing" version = "0.37.0" @@ -7132,4 +7134,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt [metadata] lock-version = "2.1" python-versions = ">=3.12,<3.15" -content-hash = "dc130664802ad1344adc341931036a343f9892934a41bbc15c48663d0146696b" +content-hash = "f5b01d5a1e60672741f2c5e8cc6e2ec55534963a9a3791fd1cdf67d3c2fbd70b" diff --git a/pyproject.toml b/pyproject.toml index 3b30d87..d5163fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ dependencies = [ "numpy<2", "aiohttp", "aiofiles", - "huey (>=2.5.3,<3.0.0)", + "redis (>=4.0,<7.0)", "pandas>=1.5.0", "openpyxl>=3.0.0", "xlrd>=2.0.0", diff --git a/requirements.txt b/requirements.txt index 0b6933b..4237427 100644 --- a/requirements.txt +++ b/requirements.txt @@ -58,7 +58,6 @@ httpcore==1.0.9 ; python_version >= "3.12" and python_version < "3.15" httptools==0.7.1 ; python_version >= "3.12" and python_version < "3.15" and platform_system != "Windows" httpx-sse==0.4.3 ; python_version >= "3.12" and python_version < "3.15" httpx==0.28.1 ; python_version >= "3.12" and python_version < "3.15" -huey==2.5.3 ; python_version >= "3.12" and python_version < "3.15" huggingface-hub==0.35.3 ; python_version >= "3.12" and python_version < "3.15" hyperframe==6.1.0 ; python_version >= "3.12" and python_version < "3.15" idna==3.11 ; python_version >= "3.12" and python_version < "3.15" @@ -161,6 +160,7 @@ pywin32==311 ; python_version >= "3.12" and python_version < "3.15" and (sys_pla pyyaml==6.0.3 ; python_version >= "3.12" and python_version < "3.15" qdrant-client==1.12.1 ; python_version >= "3.13" and python_version < "3.15" qdrant-client==1.16.2 ; python_version == "3.12" +redis==6.4.0 ; python_version >= "3.12" and python_version < "3.15" referencing==0.37.0 ; python_version >= "3.12" and python_version < "3.15" regex==2025.9.18 ; python_version >= "3.12" and python_version < "3.15" requests-toolbelt==1.0.0 ; python_version >= "3.12" and python_version < "3.15" diff --git a/routes/files.py b/routes/files.py index 943b5af..09c7813 100644 --- a/routes/files.py +++ b/routes/files.py @@ -1,273 +1,18 @@ import os import uuid import shutil -import zipfile from datetime import datetime -from typing import Optional, List -from fastapi import APIRouter, HTTPException, Header, UploadFile, File, Form -from pydantic import BaseModel +from typing import Optional +from fastapi import APIRouter, HTTPException, UploadFile, File, Form import logging logger = logging.getLogger('app') -from utils import ( - DatasetRequest, QueueTaskRequest, IncrementalTaskRequest, QueueTaskResponse, - load_processed_files_log, remove_file_or_directory, remove_dataset_directory_by_key -) from utils.fastapi_utils import get_versioned_filename -from task_queue.manager import queue_manager -from task_queue.integration_tasks import process_files_async, process_files_incremental_async, cleanup_project_async -from task_queue.task_status import task_status_store router = APIRouter() -@router.post("/api/v1/files/process/async") -async def process_files_async_endpoint(request: QueueTaskRequest, authorization: Optional[str] = Header(None)): - """ - Queue-based API for asynchronous file processing. - Same functionality as /api/v1/files/process, but processed asynchronously through the queue. - - Args: - request: QueueTaskRequest containing dataset_id, files, system_prompt, mcp_settings, and queue options - authorization: Authorization header containing API key (Bearer ) - - Returns: - QueueTaskResponse: Processing result with task ID for tracking - """ - try: - dataset_id = request.dataset_id - if not dataset_id: - raise HTTPException(status_code=400, detail="dataset_id is required") - - # Estimate processing time (based on file count) - estimated_time = 0 - if request.upload_folder: - # For upload_folder, file count cannot be estimated in advance, so use the default time - estimated_time = 120 # Default: 2 minutes - elif request.files: - total_files = sum(len(file_list) for file_list in request.files.values()) - estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds - - # Create task status record - import uuid - task_id = str(uuid.uuid4()) - task_status_store.set_status( - task_id=task_id, - unique_id=dataset_id, - status="pending" - ) - - # Submit async task - task = process_files_async( - dataset_id=dataset_id, - files=request.files, - upload_folder=request.upload_folder, - task_id=task_id - ) - - # Build a more detailed message - message = f"File processing task has been submitted to the queue, project ID: {dataset_id}" - if request.upload_folder: - group_count = len(request.upload_folder) - message += f", files will be scanned automatically from {group_count} uploaded folders" - elif request.files: - total_files = sum(len(file_list) for file_list in request.files.values()) - message += f", including {total_files} files" - - return QueueTaskResponse( - success=True, - message=message, - dataset_id=dataset_id, - task_id=task_id, # Use our own task_id - task_status="pending", - estimated_processing_time=estimated_time - ) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error submitting async file processing task: {str(e)}") - raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") - - -@router.post("/api/v1/files/process/incremental") -async def process_files_incremental_endpoint(request: IncrementalTaskRequest, authorization: Optional[str] = Header(None)): - """ - Queue-based API for incremental file processing, supporting file additions and deletions. - - Args: - request: IncrementalTaskRequest containing dataset_id, files_to_add, files_to_remove, system_prompt, mcp_settings, and queue options - authorization: Authorization header containing API key (Bearer ) - - Returns: - QueueTaskResponse: Processing result with task ID for tracking - """ - try: - dataset_id = request.dataset_id - if not dataset_id: - raise HTTPException(status_code=400, detail="dataset_id is required") - - # Validate that there is at least one add or delete operation - if not request.files_to_add and not request.files_to_remove: - raise HTTPException(status_code=400, detail="At least one of files_to_add or files_to_remove must be provided") - - # Estimate processing time (based on file count) - estimated_time = 0 - total_add_files = sum(len(file_list) for file_list in (request.files_to_add or {}).values()) - total_remove_files = sum(len(file_list) for file_list in (request.files_to_remove or {}).values()) - total_files = total_add_files + total_remove_files - estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds - - # Create task status record - import uuid - task_id = str(uuid.uuid4()) - task_status_store.set_status( - task_id=task_id, - unique_id=dataset_id, - status="pending" - ) - - # Submit incremental async task - task = process_files_incremental_async( - dataset_id=dataset_id, - files_to_add=request.files_to_add, - files_to_remove=request.files_to_remove, - system_prompt=request.system_prompt, - mcp_settings=request.mcp_settings, - task_id=task_id - ) - - return QueueTaskResponse( - success=True, - message=f"Incremental file processing task has been submitted to the queue - added {total_add_files} files, removed {total_remove_files} files, project ID: {dataset_id}", - dataset_id=dataset_id, - task_id=task_id, - task_status="pending", - estimated_processing_time=estimated_time - ) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error submitting incremental file processing task: {str(e)}") - raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") - - -@router.get("/api/v1/files/{dataset_id}/status") -async def get_files_processing_status(dataset_id: str): - """Get the file processing status for the project.""" - try: - # Load processed files log - processed_log = load_processed_files_log(dataset_id) - - # Get project directory info - project_dir = os.path.join("projects", "data", dataset_id) - project_exists = os.path.exists(project_dir) - - # Collect document.txt files - document_files = [] - if project_exists: - for root, dirs, files in os.walk(project_dir): - for file in files: - if file == "document.txt": - document_files.append(os.path.join(root, file)) - - return { - "dataset_id": dataset_id, - "project_exists": project_exists, - "processed_files_count": len(processed_log), - "processed_files": processed_log, - "document_files_count": len(document_files), - "document_files": document_files, - "log_file_exists": os.path.exists(os.path.join("projects", "data", dataset_id, "processed_files.json")) - } - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to retrieve file processing status: {str(e)}") - - -@router.post("/api/v1/files/{dataset_id}/reset") -async def reset_files_processing(dataset_id: str): - """Reset the project's file processing status by deleting the processing log and all files.""" - try: - project_dir = os.path.join("projects", "data", dataset_id) - log_file = os.path.join("projects", "data", dataset_id, "processed_files.json") - - # Load processed log to know what files to remove - processed_log = load_processed_files_log(dataset_id) - - removed_files = [] - # Remove all processed files and their dataset directories - for file_hash, file_info in processed_log.items(): - # Remove local file in files directory - if 'local_path' in file_info: - if remove_file_or_directory(file_info['local_path']): - removed_files.append(file_info['local_path']) - - # Handle new key-based structure first - if 'key' in file_info: - # Remove dataset directory by key - key = file_info['key'] - if remove_dataset_directory_by_key(dataset_id, key): - removed_files.append(f"dataset/{key}") - elif 'filename' in file_info: - # Fallback to old filename-based structure - filename_without_ext = os.path.splitext(file_info['filename'])[0] - dataset_dir = os.path.join("projects", "data", dataset_id, "datasets", filename_without_ext) - if remove_file_or_directory(dataset_dir): - removed_files.append(dataset_dir) - - # Also remove any specific dataset path if exists (fallback) - if 'dataset_path' in file_info: - if remove_file_or_directory(file_info['dataset_path']): - removed_files.append(file_info['dataset_path']) - - # Remove the log file - if remove_file_or_directory(log_file): - removed_files.append(log_file) - - # Remove the entire files directory - files_dir = os.path.join(project_dir, "files") - if remove_file_or_directory(files_dir): - removed_files.append(files_dir) - - # Also remove the entire dataset directory (clean up any remaining files) - dataset_dir = os.path.join(project_dir, "datasets") - if remove_file_or_directory(dataset_dir): - removed_files.append(dataset_dir) - - # Remove README.md if exists - readme_file = os.path.join(project_dir, "README.md") - if remove_file_or_directory(readme_file): - removed_files.append(readme_file) - - return { - "message": f"File processing status reset successfully: {dataset_id}", - "removed_files_count": len(removed_files), - "removed_files": removed_files - } - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to reset file processing status: {str(e)}") - - -@router.post("/api/v1/files/{dataset_id}/cleanup/async") -async def cleanup_project_async_endpoint(dataset_id: str, remove_all: bool = False): - """Asynchronously clean up project files.""" - try: - task = cleanup_project_async(dataset_id=dataset_id, remove_all=remove_all) - - return { - "success": True, - "message": f"Project cleanup task has been submitted to the queue, project ID: {dataset_id}", - "dataset_id": dataset_id, - "task_id": task.id, - "action": "remove_all" if remove_all else "cleanup_logs" - } - except Exception as e: - logger.error(f"Error submitting cleanup task: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to submit cleanup task: {str(e)}") - - @router.post("/api/v1/upload") async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form(None)): """ @@ -348,121 +93,3 @@ async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form except Exception as e: logger.error(f"Error uploading file: {str(e)}") raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}") - - -# Task management routes that are related to file processing -@router.get("/api/v1/task/{task_id}/status") -async def get_task_status(task_id: str): - """Get task status - simple and reliable.""" - try: - status_data = task_status_store.get_status(task_id) - - if not status_data: - return { - "success": False, - "message": "Task does not exist or has expired", - "task_id": task_id, - "status": "not_found" - } - - return { - "success": True, - "message": "Task status retrieved successfully", - "task_id": task_id, - "status": status_data["status"], - "unique_id": status_data["unique_id"], - "created_at": status_data["created_at"], - "updated_at": status_data["updated_at"], - "result": status_data.get("result"), - "error": status_data.get("error") - } - - except Exception as e: - logger.error(f"Error getting task status: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to retrieve task status: {str(e)}") - - -@router.delete("/api/v1/task/{task_id}") -async def delete_task(task_id: str): - """Delete task record.""" - try: - success = task_status_store.delete_status(task_id) - if success: - return { - "success": True, - "message": f"Task record deleted: {task_id}", - "task_id": task_id - } - else: - return { - "success": False, - "message": f"Task record does not exist: {task_id}", - "task_id": task_id - } - except Exception as e: - logger.error(f"Error deleting task: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to delete task record: {str(e)}") - - -@router.get("/api/v1/tasks") -async def list_tasks(status: Optional[str] = None, dataset_id: Optional[str] = None, limit: int = 100): - """List tasks with optional filters.""" - try: - if status or dataset_id: - # Use search function - tasks = task_status_store.search_tasks(status=status, unique_id=dataset_id, limit=limit) - else: - # Get all tasks - all_tasks = task_status_store.list_all() - tasks = list(all_tasks.values())[:limit] - - return { - "success": True, - "message": "Task list retrieved successfully", - "total_tasks": len(tasks), - "tasks": tasks, - "filters": { - "status": status, - "dataset_id": dataset_id, - "limit": limit - } - } - - except Exception as e: - logger.error(f"Error listing tasks: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to retrieve task list: {str(e)}") - - -@router.get("/api/v1/tasks/statistics") -async def get_task_statistics(): - """Get task statistics.""" - try: - stats = task_status_store.get_statistics() - - return { - "success": True, - "message": "Statistics retrieved successfully", - "statistics": stats - } - - except Exception as e: - logger.error(f"Error getting statistics: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to retrieve statistics: {str(e)}") - - -@router.post("/api/v1/tasks/cleanup") -async def cleanup_tasks(older_than_days: int = 7): - """Clean up old task records.""" - try: - deleted_count = task_status_store.cleanup_old_tasks(older_than_days=older_than_days) - - return { - "success": True, - "message": f"Cleaned up {deleted_count} old task records", - "deleted_count": deleted_count, - "older_than_days": older_than_days - } - - except Exception as e: - logger.error(f"Error cleaning up tasks: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to clean up task records: {str(e)}") diff --git a/routes/projects.py b/routes/projects.py index 2e4f3ba..a194668 100644 --- a/routes/projects.py +++ b/routes/projects.py @@ -6,8 +6,6 @@ import logging logger = logging.getLogger('app') -from task_queue.task_status import task_status_store - router = APIRouter() @@ -155,22 +153,3 @@ async def list_datasets(): except Exception as e: logger.error(f"Error listing datasets: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to retrieve dataset list: {str(e)}") - - -@router.get("/api/v1/projects/{dataset_id}/tasks") -async def get_project_tasks(dataset_id: str): - """Get all tasks for the specified project.""" - try: - tasks = task_status_store.get_by_unique_id(dataset_id) - - return { - "success": True, - "message": "Project tasks retrieved successfully", - "dataset_id": dataset_id, - "total_tasks": len(tasks), - "tasks": tasks - } - - except Exception as e: - logger.error(f"Error getting project tasks: {str(e)}") - raise HTTPException(status_code=500, detail=f"Failed to retrieve project tasks: {str(e)}") diff --git a/start_all_optimized.sh b/start_all_optimized.sh index 0e663d0..8933232 100755 --- a/start_all_optimized.sh +++ b/start_all_optimized.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Optimized startup script - integrates the FastAPI application and queue consumer +# Optimized startup script for the FastAPI application set -e @@ -7,7 +7,6 @@ set -e DEFAULT_HOST="0.0.0.0" DEFAULT_PORT="8001" DEFAULT_API_WORKERS="4" -DEFAULT_QUEUE_WORKERS="2" DEFAULT_PROFILE="balanced" DEFAULT_LOG_LEVEL="info" DEFAULT_MAX_RESTARTS="3" @@ -17,7 +16,6 @@ DEFAULT_CHECK_INTERVAL="5" HOST=${HOST:-$DEFAULT_HOST} PORT=${PORT:-$DEFAULT_PORT} API_WORKERS=${API_WORKERS:-$DEFAULT_API_WORKERS} -QUEUE_WORKERS=${QUEUE_WORKERS:-$DEFAULT_QUEUE_WORKERS} PROFILE=${PROFILE:-$DEFAULT_PROFILE} LOG_LEVEL=${LOG_LEVEL:-$DEFAULT_LOG_LEVEL} MAX_RESTARTS=${MAX_RESTARTS:-$DEFAULT_MAX_RESTARTS} @@ -47,7 +45,6 @@ print_config() { print_color $GREEN "Startup configuration:" echo "- API server: http://$HOST:$PORT" echo "- API worker processes: $API_WORKERS" - echo "- Queue worker threads: $QUEUE_WORKERS" echo "- Performance profile: $PROFILE" echo "- Log level: $LOG_LEVEL" echo "- Maximum restarts: $MAX_RESTARTS" @@ -87,7 +84,6 @@ create_directories() { print_color $YELLOW "Creating project directories..." directories=( - "projects/queue_data" "projects/data" "projects/uploads" "projects/robot" @@ -161,16 +157,6 @@ start_services() { API_PID=$! echo "API server PID: $API_PID" - # Start the queue consumer - print_color $BLUE "Starting queue consumer..." - python3 task_queue/consumer.py \ - --workers=$QUEUE_WORKERS \ - --worker-type=threads \ - > queue_consumer.log 2>&1 & - - CONSUMER_PID=$! - echo "Queue consumer PID: $CONSUMER_PID" - echo print_color $GREEN "All services started successfully!" print_color $GREEN "API server: http://$HOST:$PORT" @@ -179,7 +165,7 @@ start_services() { } monitor_services() { - local restart_counts=(0 0) # API, Consumer + local restart_counts=(0) # API while true; do # Check the API server @@ -205,26 +191,6 @@ monitor_services() { fi fi - # Check the queue consumer - if ! kill -0 $CONSUMER_PID 2>/dev/null; then - print_color $RED "Queue consumer stopped unexpectedly" - - if [ ${restart_counts[1]} -lt $MAX_RESTARTS ]; then - print_color $YELLOW "Restarting queue consumer (${restart_counts[1]} + 1/$MAX_RESTARTS)..." - python3 task_queue/consumer.py \ - --workers=$QUEUE_WORKERS \ - --worker-type=threads \ - >> queue_consumer.log 2>&1 & - - CONSUMER_PID=$! - restart_counts[1]=$((restart_counts[1] + 1)) - print_color $GREEN "Queue consumer restarted successfully, PID: $CONSUMER_PID" - else - print_color $RED "Queue consumer restart limit reached, stopping all services" - break - fi - fi - # Wait for the next check interval sleep $CHECK_INTERVAL done @@ -253,25 +219,6 @@ cleanup() { fi fi - # Stop the queue consumer - if [ ! -z "$CONSUMER_PID" ] && kill -0 $CONSUMER_PID 2>/dev/null; then - print_color $BLUE "Stopping queue consumer (PID: $CONSUMER_PID)..." - kill $CONSUMER_PID 2>/dev/null || true - - # Wait for graceful shutdown - local count=0 - while kill -0 $CONSUMER_PID 2>/dev/null && [ $count -lt 10 ]; do - sleep 1 - count=$((count + 1)) - done - - # Force terminate if it is still running - if kill -0 $CONSUMER_PID 2>/dev/null; then - print_color $RED "Force stopping queue consumer..." - kill -9 $CONSUMER_PID 2>/dev/null || true - fi - fi - print_color $GREEN "All services have been stopped" exit 0 } @@ -288,7 +235,6 @@ main() { echo " HOST API bind host address (default: $DEFAULT_HOST)" echo " PORT API bind port (default: $DEFAULT_PORT)" echo " API_WORKERS Number of API worker processes (default: $DEFAULT_API_WORKERS)" - echo " QUEUE_WORKERS Number of queue worker threads (default: $DEFAULT_QUEUE_WORKERS)" echo " PROFILE Performance profile: low_memory, balanced, high_performance (default: $DEFAULT_PROFILE)" echo " LOG_LEVEL Log level: debug, info, warning, error (default: $DEFAULT_LOG_LEVEL)" echo " MAX_RESTARTS Maximum restart count (default: $DEFAULT_MAX_RESTARTS)" @@ -296,7 +242,7 @@ main() { echo echo "Examples:" echo " PROFILE=high_performance API_WORKERS=8 $0" - echo " PORT=8080 QUEUE_WORKERS=4 $0" + echo " PORT=8080 API_WORKERS=4 $0" exit 0 fi diff --git a/start_unified.py b/start_unified.py index 27efe83..74d7be0 100755 --- a/start_unified.py +++ b/start_unified.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Optimized unified startup script combining the FastAPI application and queue consumer. +Optimized unified startup script for the FastAPI application. Supports performance monitoring, automatic restart, graceful shutdown, and related features. """ @@ -17,7 +17,7 @@ from typing import List, Optional, Dict, Any class ProcessManager: - """Process manager that controls the API service and queue consumer.""" + """Process manager that controls the API service.""" def __init__(self): self.processes: Dict[str, subprocess.Popen] = {} @@ -78,44 +78,6 @@ class ProcessManager: print(f"Failed to start API server: {e}") return None - def start_queue_consumer(self, args) -> Optional[subprocess.Popen]: - """Start the queue consumer.""" - print("Starting queue consumer...") - - consumer_script = Path("task_queue/consumer.py") - if not consumer_script.exists(): - consumer_script = consumer_script.with_suffix(".pyc") - - # Build the queue consumer command - cmd = [ - sys.executable, - str(consumer_script), - "--workers", str(args.queue_workers), - "--worker-type", args.worker_type - ] - - try: - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1 - ) - - # Start the output monitoring thread - threading.Thread( - target=self._monitor_output, - args=(process, "Queue consumer"), - daemon=True - ).start() - - return process - - except Exception as e: - print(f"Failed to start queue consumer: {e}") - return None - def _monitor_output(self, process: subprocess.Popen, name: str): """Monitor process output.""" try: @@ -138,8 +100,6 @@ class ProcessManager: if name == "API server": new_process = self.start_api_server(args) - elif name == "Queue consumer": - new_process = self.start_queue_consumer(args) else: return False @@ -169,27 +129,19 @@ class ProcessManager: print("Failed to start API server; exiting") return False - queue_process = self.start_queue_consumer(args) - if not queue_process: - print("Failed to start queue consumer; exiting") - api_process.terminate() - return False - self.processes["API server"] = api_process - self.processes["Queue consumer"] = queue_process print("\n" + "=" * 70) print("All services started successfully!") print(f"API server: http://{args.host}:{args.port}") print(f"API PID: {api_process.pid}") - print(f"Queue consumer PID: {queue_process.pid}") print("Press Ctrl+C to stop all services") print("=" * 70 + "\n") self.running = True # Main monitoring loop - restart_counts = {"API server": 0, "Queue consumer": 0} + restart_counts = {"API server": 0} max_restarts = args.max_restarts while self.running and not self.shutdown_event.is_set(): @@ -262,7 +214,6 @@ class ProcessManager: def create_directories(self): """Create the required directories.""" directories = [ - "projects/queue_data", "projects/data", "projects/uploads", "projects/robot", @@ -313,11 +264,6 @@ def parse_args(): parser.add_argument("--log-level", type=str, default="info", choices=["debug", "info", "warning", "error"], help="Log level") - # Queue consumer configuration - parser.add_argument("--queue-workers", type=int, default=2, help="Number of queue consumer worker threads") - parser.add_argument("--worker-type", type=str, default="threads", - choices=["threads", "greenlets", "gevent"], help="Queue worker type") - # Performance profile parser.add_argument("--profile", type=str, default="low_memory", choices=["low_memory", "balanced", "high_performance"], help="Performance profile") diff --git a/task_queue/README.md b/task_queue/README.md deleted file mode 100644 index 5c51542..0000000 --- a/task_queue/README.md +++ /dev/null @@ -1,154 +0,0 @@ -# 队列系统使用说明 - -## 概述 - -本项目集成了基于 huey 和 SqliteHuey 的异步队列系统,用于处理文件的异步处理任务。 - -## 安装依赖 - -```bash -pip install huey -``` - -## 目录结构 - -``` -queue/ -├── __init__.py # 包初始化文件 -├── config.py # 队列配置(SqliteHuey配置) -├── tasks.py # 文件处理任务定义 -├── manager.py # 队列管理器 -├── consumer.py # 队列消费者(工作进程) -├── example.py # 使用示例 -└── README.md # 说明文档 -``` - -## 核心功能 - -### 1. 队列配置 (config.py) -- 使用 SqliteHuey 作为消息队列 -- 数据库文件存储在 `queue_data/huey.db` -- 支持任务重试和错误存储 - -### 2. 文件处理任务 (tasks.py) -- `process_file_async`: 异步处理单个文件 -- `process_multiple_files_async`: 批量异步处理文件 -- `process_zip_file_async`: 异步处理zip压缩文件 -- `cleanup_processed_files`: 清理旧的文件 - -### 3. 队列管理器 (manager.py) -- 任务提交和管理 -- 队列状态监控 -- 任务结果查询 -- 任务记录清理 - -## 使用方法 - -### 1. 启动队列消费者 - -```bash -# 启动默认配置的消费者 -python queue/consumer.py - -# 指定工作线程数 -python queue/consumer.py --workers 4 - -# 查看队列统计信息 -python queue/consumer.py --stats - -# 检查队列状态 -python queue/consumer.py --check - -# 清空队列 -python queue/consumer.py --flush -``` - -### 2. 在代码中使用队列 - -```python -from queue.manager import queue_manager - -# 处理单个文件 -task_id = queue_manager.enqueue_file( - project_id="my_project", - file_path="/path/to/file.txt", - original_filename="myfile.txt" -) - -# 批量处理文件 -task_ids = queue_manager.enqueue_multiple_files( - project_id="my_project", - file_paths=["/path/file1.txt", "/path/file2.txt"], - original_filenames=["file1.txt", "file2.txt"] -) - -# 处理zip文件 -task_id = queue_manager.enqueue_zip_file( - project_id="my_project", - zip_path="/path/to/archive.zip" -) - -# 查看任务状态 -status = queue_manager.get_task_status(task_id) -print(status) - -# 获取队列统计信息 -stats = queue_manager.get_queue_stats() -print(stats) -``` - -### 3. 运行示例 - -```bash -python queue/example.py -``` - -## 配置说明 - -### 队列配置参数 (config.py) -- `filename`: SQLite数据库文件路径 -- `always_eager`: 是否立即执行任务(开发时可设为True) -- `utc`: 是否使用UTC时间 -- `compression_level`: 压缩级别 -- `store_errors`: 是否存储错误信息 -- `max_retries`: 最大重试次数 -- `retry_delay`: 重试延迟 - -### 消费者参数 (consumer.py) -- `--workers`: 工作线程数(默认2) -- `--worker-type`: 工作类型(threads/greenlets/processes) -- `--stats`: 显示统计信息 -- `--check`: 检查队列状态 -- `--flush`: 清空队列 - -## 任务状态 - -- `pending`: 等待处理 -- `running`: 正在处理 -- `complete/finished`: 处理完成 -- `error`: 处理失败 -- `scheduled`: 定时任务 - -## 最佳实践 - -1. **生产环境建议**: - - 设置合适的工作线程数(建议CPU核心数的1-2倍) - - 定期清理旧的任务记录 - - 监控队列状态和任务执行情况 - -2. **开发环境建议**: - - 可以设置 `always_eager=True` 立即执行任务进行调试 - - 使用 `--check` 参数查看队列状态 - - 运行示例代码了解功能 - -3. **错误处理**: - - 任务失败后会自动重试(最多3次) - - 错误信息会存储在数据库中 - - 可以通过 `get_task_status()` 查看错误详情 - -## 故障排除 - -1. **数据库锁定**: 确保只有一个消费者实例在运行 -2. **任务卡住**: 检查文件路径和权限 -3. **内存不足**: 调整工作线程数或使用进程模式 -4. **磁盘空间**: 定期清理旧文件和任务记录 \ No newline at end of file diff --git a/task_queue/__init__.py b/task_queue/__init__.py deleted file mode 100644 index f099268..0000000 --- a/task_queue/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python3 -""" -Queue package initialization. -""" - -from .config import huey -from .manager import QueueManager, queue_manager -from .tasks import ( - process_file_async, - process_multiple_files_async, - process_zip_file_async, - cleanup_processed_files -) - -__all__ = [ - "huey", - "QueueManager", - "queue_manager", - "process_file_async", - "process_multiple_files_async", - "process_zip_file_async", - "cleanup_processed_files" -] \ No newline at end of file diff --git a/task_queue/config.py b/task_queue/config.py deleted file mode 100644 index d1cbfec..0000000 --- a/task_queue/config.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python3 -""" -Queue configuration using SqliteHuey for asynchronous file processing. -""" - -import os -import logging -from huey import SqliteHuey -from datetime import timedelta - -# Configure logging -logger = logging.getLogger('app') - -# Ensure projects/queue_data directory exists -queue_data_dir = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data') -os.makedirs(queue_data_dir, exist_ok=True) - -# Initialize SqliteHuey -huey = SqliteHuey( - filename=os.path.join(queue_data_dir, 'huey.db'), - name='file_processor', # Queue name - always_eager=False, # Set to False to enable async processing - utc=True, # Use UTC time -) - -# Set default task configuration -huey.store_errors = True # Store error information -huey.max_retries = 3 # Maximum retry count -huey.retry_delay = timedelta(seconds=60) # Retry delay - -logger.info(f"SqliteHuey queue initialized, database path: {os.path.join(queue_data_dir, 'huey.db')}") \ No newline at end of file diff --git a/task_queue/consumer.py b/task_queue/consumer.py deleted file mode 100755 index a13a7b8..0000000 --- a/task_queue/consumer.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 -""" -Queue consumer for processing file tasks. -""" - -import sys -import os -import time -import signal -import argparse -from pathlib import Path - -# Add project root directory to Python path -project_root = Path(__file__).parent.parent -sys.path.insert(0, str(project_root)) - -from task_queue.config import huey -from task_queue.manager import queue_manager -from task_queue.integration_tasks import process_files_async, cleanup_project_async -from huey.consumer import Consumer - - -class QueueConsumer: - """Queue consumer for processing async tasks.""" - - def __init__(self, worker_type: str = "threads", workers: int = 2): - self.huey = huey - self.worker_type = worker_type - self.workers = workers - self.running = False - self.consumer = None - - # Register signal handlers - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGTERM, self._signal_handler) - - def _signal_handler(self, signum, frame): - """Signal handler for graceful shutdown.""" - print(f"\nReceived signal {signum}, shutting down queue consumer...") - self.running = False - - def start(self): - """Start the queue consumer.""" - print(f"Starting queue consumer...") - print(f"Worker threads: {self.workers}") - print(f"Worker type: {self.worker_type}") - print(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") - print("Press Ctrl+C to stop the consumer") - - self.running = True - - try: - # Create Huey consumer - self.consumer = Consumer(self.huey, workers=self.workers, worker_type=self.worker_type.rstrip('s')) - - # Display queue statistics - stats = queue_manager.get_queue_stats() - print(f"Current queue status: {stats}") - - # Start consumer run loop - print("Consumer starting task processing...") - self.consumer.run() - - except KeyboardInterrupt: - print("\nReceived interrupt signal, shutting down...") - except Exception as e: - print(f"Queue consumer runtime error: {str(e)}") - finally: - self.stop() - - def stop(self): - """Stop the queue consumer.""" - print("Stopping queue consumer...") - try: - if self.consumer: - # Stop the consumer - self.consumer.stop() - self.consumer = None - print("Queue consumer stopped") - except Exception as e: - print(f"Error stopping queue consumer: {str(e)}") - - def process_scheduled_tasks(self): - """Process scheduled tasks.""" - print("Processing scheduled tasks...") - # Additional scheduled task processing logic can be added here - - -def main(): - """Main entry point.""" - parser = argparse.ArgumentParser(description="File processing queue consumer") - parser.add_argument( - "--workers", - type=int, - default=2, - help="Number of worker threads (default: 2)" - ) - parser.add_argument( - "--worker-type", - choices=["threads", "greenlets", "processes"], - default="threads", - help="Worker thread type (default: threads)" - ) - parser.add_argument( - "--stats", - action="store_true", - help="Display queue statistics and exit" - ) - parser.add_argument( - "--flush", - action="store_true", - help="Flush the queue and exit" - ) - parser.add_argument( - "--check", - action="store_true", - help="Check queue status and exit" - ) - - args = parser.parse_args() - - # Initialize consumer - consumer = QueueConsumer( - worker_type=args.worker_type, - workers=args.workers - ) - - # Handle different command-line options - if args.stats: - print("=== Queue Statistics ===") - stats = queue_manager.get_queue_stats() - print(f"Total tasks: {stats.get('total_tasks', 0)}") - print(f"Pending tasks: {stats.get('pending_tasks', 0)}") - print(f"Running tasks: {stats.get('running_tasks', 0)}") - print(f"Completed tasks: {stats.get('completed_tasks', 0)}") - print(f"Error tasks: {stats.get('error_tasks', 0)}") - print(f"Scheduled tasks: {stats.get('scheduled_tasks', 0)}") - print(f"Database: {stats.get('queue_database', 'N/A')}") - return - - if args.flush: - print("=== Flushing Queue ===") - try: - # Flush all tasks - consumer.huey.flush() - print("Queue flushed") - except Exception as e: - print(f"Failed to flush queue: {str(e)}") - return - - if args.check: - print("=== Checking Queue Status ===") - stats = queue_manager.get_queue_stats() - print(f"Queue status: OK" if "error" not in stats else f"Queue status: ERROR - {stats['error']}") - - pending_tasks = queue_manager.list_pending_tasks(limit=10) - if pending_tasks: - print(f"\nPending tasks (showing up to 10):") - for task in pending_tasks: - print(f" Task ID: {task['task_id']}, Status: {task['status']}, Created: {task['created_time']}") - else: - print("No pending tasks") - return - - # Start consumer - print("=== Starting File Processing Queue Consumer ===") - consumer.start() - - -if __name__ == "__main__": - main() diff --git a/task_queue/example.py b/task_queue/example.py deleted file mode 100755 index 27b1f9d..0000000 --- a/task_queue/example.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 -""" -Example usage of the queue system. -""" - -import sys -import time -from pathlib import Path - -# Add project root directory to Python path -project_root = Path(__file__).parent.parent -sys.path.insert(0, str(project_root)) - -from task_queue.manager import queue_manager -from task_queue.tasks import process_file_async, process_multiple_files_async - - -def example_single_file(): - """Example: Process a single file.""" - print("=== Example: Process a single file ===") - - project_id = "test_project" - file_path = "public/test_document.txt" - - # Enqueue file for processing - task_id = queue_manager.enqueue_file( - project_id=project_id, - file_path=file_path, - original_filename="example_document.txt" - ) - - print(f"Task submitted, task ID: {task_id}") - - # Check task status - time.sleep(2) - status = queue_manager.get_task_status(task_id) - print(f"Task status: {status}") - - -def example_multiple_files(): - """Example: Batch process files.""" - print("\n=== Example: Batch process files ===") - - project_id = "test_project_batch" - file_paths = [ - "public/test_document.txt", - "public/goods.xlsx" # Assuming this file exists - ] - original_filenames = [ - "batch_document_1.txt", - "batch_goods.xlsx" - ] - - # Enqueue multiple files for processing - task_ids = queue_manager.enqueue_multiple_files( - project_id=project_id, - file_paths=file_paths, - original_filenames=original_filenames - ) - - print(f"Batch tasks submitted, task IDs: {task_ids}") - - -def example_zip_file(): - """Example: Process a zip file.""" - print("\n=== Example: Process a zip file ===") - - project_id = "test_project_zip" - zip_path = "public/all_hp_product_spec_book2506.zip" - - # Enqueue zip file for processing - task_id = queue_manager.enqueue_zip_file( - project_id=project_id, - zip_path=zip_path - ) - - print(f"Zip task submitted, task ID: {task_id}") - - -def example_queue_stats(): - """Example: Get queue statistics.""" - print("\n=== Example: Queue statistics ===") - - stats = queue_manager.get_queue_stats() - print("Queue statistics:") - for key, value in stats.items(): - if key != "recent_tasks": - print(f" {key}: {value}") - - -def example_cleanup(): - """Example: Cleanup tasks.""" - print("\n=== Example: Cleanup tasks ===") - - project_id = "test_project" - - # Enqueue cleanup task (delayed 10 seconds) - task_id = queue_manager.enqueue_cleanup_task( - project_id=project_id, - older_than_days=1, # Clean files older than 1 day - delay=10 - ) - - print(f"Cleanup task submitted, task ID: {task_id}") - - -def main(): - """Main entry point.""" - print("Queue System Usage Examples") - print("=" * 50) - - try: - # Run examples - example_single_file() - example_multiple_files() - example_zip_file() - example_queue_stats() - example_cleanup() - - print("\n" + "=" * 50) - print("Examples completed!") - print("\nTo check task execution, run:") - print("python queue/consumer.py --check") - print("\nTo start the queue consumer, run:") - print("python queue/consumer.py") - - except Exception as e: - print(f"Error running examples: {str(e)}") - - -if __name__ == "__main__": - main() diff --git a/task_queue/integration_tasks.py b/task_queue/integration_tasks.py deleted file mode 100644 index 1520576..0000000 --- a/task_queue/integration_tasks.py +++ /dev/null @@ -1,499 +0,0 @@ -#!/usr/bin/env python3 -""" -Queue tasks for file processing integration. -""" - -import os -import json -import time -import hashlib -import shutil -from typing import Dict, List, Optional, Any - -from task_queue.config import huey -from task_queue.manager import queue_manager -from task_queue.task_status import task_status_store -from utils import download_dataset_files, save_processed_files_log, load_processed_files_log -from utils.dataset_manager import remove_dataset_directory_by_key - - -def scan_upload_folder(upload_dir: str) -> List[str]: - """ - Scan all supported file formats in the upload folder. - - Args: - upload_dir: Upload folder path - - Returns: - List[str]: List of supported file paths - """ - supported_extensions = { - # Text files - '.txt', '.md', '.rtf', - # Document files - '.doc', '.docx', '.pdf', '.odt', - # Spreadsheet files - '.xls', '.xlsx', '.csv', '.ods', - # Presentation files - '.ppt', '.pptx', '.odp', - # E-books - '.epub', '.mobi', - # Web files - '.html', '.htm', - # Config files - '.json', '.xml', '.yaml', '.yml', - # Code files - '.py', '.js', '.java', '.cpp', '.c', '.go', '.rs', - # Archive files - '.zip', '.rar', '.7z', '.tar', '.gz' - } - - scanned_files = [] - - if not os.path.exists(upload_dir): - return scanned_files - - for root, dirs, files in os.walk(upload_dir): - for file in files: - # Skip hidden files and system files - if file.startswith('.') or file.startswith('~'): - continue - - file_path = os.path.join(root, file) - file_extension = os.path.splitext(file)[1].lower() - - # Check if file extension is supported - if file_extension in supported_extensions: - scanned_files.append(file_path) - else: - # For files without extension, try to process them (may be text files) - if not file_extension: - try: - # Try reading the file header to determine if it's a text file - with open(file_path, 'r', encoding='utf-8') as f: - f.read(1024) # Read the first 1KB - scanned_files.append(file_path) - except (UnicodeDecodeError, PermissionError): - # Not a text file or unreadable, skip - pass - - return scanned_files - - -@huey.task() -def process_files_async( - dataset_id: str, - files: Optional[Dict[str, List[str]]] = None, - upload_folder: Optional[Dict[str, str]] = None, - task_id: Optional[str] = None -) -> Dict[str, Any]: - """ - Asynchronously process file tasks - compatible with existing files/process API. - - Args: - dataset_id: Unique project ID - files: Dictionary of file paths grouped by key - upload_folder: Upload folder dictionary organized by group name, e.g. {'group1': 'my_project1', 'group2': 'my_project2'} - task_id: Task ID (for status tracking) - - Returns: - Processing result dictionary - """ - try: - print(f"Starting async file processing task, project ID: {dataset_id}") - - # If task_id is provided, set initial status - if task_id: - task_status_store.set_status( - task_id=task_id, - unique_id=dataset_id, - status="running" - ) - - # Ensure project directory exists - project_dir = os.path.join("projects", "data", dataset_id) - if not os.path.exists(project_dir): - os.makedirs(project_dir, exist_ok=True) - - # Process files: use key-grouped format - processed_files_by_key = {} - - # If upload_folder is provided, scan files in those folders - if upload_folder and not files: - scanned_files_by_group = {} - total_scanned_files = 0 - - for group_name, folder_name in upload_folder.items(): - # Security check: prevent path traversal attacks - safe_folder_name = os.path.basename(folder_name) - upload_dir = os.path.join("projects", "uploads", safe_folder_name) - - if os.path.exists(upload_dir): - scanned_files = scan_upload_folder(upload_dir) - if scanned_files: - scanned_files_by_group[group_name] = scanned_files - total_scanned_files += len(scanned_files) - print(f"Scanned {len(scanned_files)} files from upload folder '{safe_folder_name}' (group: {group_name})") - else: - print(f"No supported files found in upload folder '{safe_folder_name}' (group: {group_name})") - else: - print(f"Upload folder does not exist: {upload_dir} (group: {group_name})") - - if scanned_files_by_group: - files = scanned_files_by_group - print(f"Total scanned {total_scanned_files} files from {len(scanned_files_by_group)} groups") - else: - print("No supported files found in any upload folder") - - if files: - # Use files from the request (grouped by key) - # Since this is an async task, call synchronously - import asyncio - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files)) - total_files = sum(len(files_list) for files_list in processed_files_by_key.values()) - print(f"Async processed {total_files} dataset files across {len(processed_files_by_key)} keys, project ID: {dataset_id}") - else: - print(f"No files provided in request, project ID: {dataset_id}") - - # Collect all document.txt files in the project directory - document_files = [] - for root, dirs, files_list in os.walk(project_dir): - for file in files_list: - if file == "document.txt": - document_files.append(os.path.join(root, file)) - - # Generate project README.md file - try: - from utils.project_manager import save_project_readme - save_project_readme(dataset_id) - print(f"README.md generated, project ID: {dataset_id}") - except Exception as e: - print(f"Failed to generate README.md, project ID: {dataset_id}, error: {str(e)}") - # Does not affect main processing flow, continue - - # Build result file list - result_files = [] - for key in processed_files_by_key.keys(): - # Add corresponding dataset document.txt path - document_path = os.path.join("projects", "data", dataset_id, "datasets", key, "document.txt") - if os.path.exists(document_path): - result_files.append(document_path) - - # Also add document.txt files that exist but are not in processed_files_by_key - existing_document_paths = set(result_files) # Avoid duplicates - for doc_file in document_files: - if doc_file not in existing_document_paths: - result_files.append(doc_file) - - result = { - "status": "success", - "message": f"Successfully processed {len(result_files)} document files across {len(processed_files_by_key)} keys", - "dataset_id": dataset_id, - "processed_files": result_files, - "processed_files_by_key": processed_files_by_key, - "document_files": document_files, - "total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()), - "processing_time": time.time() - } - - # Update task status to completed - if task_id: - task_status_store.update_status( - task_id=task_id, - status="completed", - result=result - ) - - print(f"Async file processing task completed: {dataset_id}") - return result - - except Exception as e: - error_msg = f"Error during async file processing: {str(e)}" - print(error_msg) - - # Update task status to error - if task_id: - task_status_store.update_status( - task_id=task_id, - status="failed", - error=error_msg - ) - - return { - "status": "error", - "message": error_msg, - "dataset_id": dataset_id, - "error": str(e) - } - - -@huey.task() -def process_files_incremental_async( - dataset_id: str, - files_to_add: Optional[Dict[str, List[str]]] = None, - files_to_remove: Optional[Dict[str, List[str]]] = None, - system_prompt: Optional[str] = None, - mcp_settings: Optional[List[Dict]] = None, - task_id: Optional[str] = None -) -> Dict[str, Any]: - """ - Incremental file processing task - supports adding and removing files. - - Args: - dataset_id: Unique project ID - files_to_add: Dictionary of file paths to add, grouped by key - files_to_remove: Dictionary of file paths to remove, grouped by key - system_prompt: System prompt - mcp_settings: MCP settings - task_id: Task ID (for status tracking) - - Returns: - Processing result dictionary - """ - try: - print(f"Starting incremental file processing task, project ID: {dataset_id}") - - # If task_id is provided, set initial status - if task_id: - task_status_store.set_status( - task_id=task_id, - unique_id=dataset_id, - status="running" - ) - - # Ensure project directory exists - project_dir = os.path.join("projects", "data", dataset_id) - if not os.path.exists(project_dir): - os.makedirs(project_dir, exist_ok=True) - - # Load existing processing log - processed_log = load_processed_files_log(dataset_id) - print(f"Loaded existing processing log with {len(processed_log)} file records") - - removed_files = [] - added_files = [] - - # 1. Process removals - if files_to_remove: - print(f"Starting removal processing across {len(files_to_remove)} key groups") - for key, file_list in files_to_remove.items(): - if not file_list: # If file list is empty, remove the entire key group - print(f"Removing entire key group: {key}") - if remove_dataset_directory_by_key(dataset_id, key): - removed_files.append(f"dataset/{key}") - - # Remove all records for this key from the processing log - keys_to_remove = [file_hash for file_hash, file_info in processed_log.items() - if file_info.get('key') == key] - for file_hash in keys_to_remove: - del processed_log[file_hash] - removed_files.append(f"log_entry:{file_hash}") - else: - # Remove specific files - for file_path in file_list: - print(f"Removing specific file: {key}/{file_path}") - - # Actually delete the file - filename = os.path.basename(file_path) - - # Delete original file - source_file = os.path.join("projects", "data", dataset_id, "files", key, filename) - if os.path.exists(source_file): - os.remove(source_file) - removed_files.append(f"file:{key}/{filename}") - - # Delete processed file directory - processed_dir = os.path.join("projects", "data", dataset_id, "processed", key, filename) - if os.path.exists(processed_dir): - shutil.rmtree(processed_dir) - removed_files.append(f"processed:{key}/{filename}") - - # Compute file hash to find in log - file_hash = hashlib.md5(file_path.encode('utf-8')).hexdigest() - - # Remove from processing log - if file_hash in processed_log: - del processed_log[file_hash] - removed_files.append(f"log_entry:{file_hash}") - - # 2. Process additions - processed_files_by_key = {} - if files_to_add: - print(f"Starting addition processing across {len(files_to_add)} key groups") - # Use async processing to download files - import asyncio - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files_to_add, incremental_mode=True)) - total_added_files = sum(len(files_list) for files_list in processed_files_by_key.values()) - print(f"Async processed {total_added_files} dataset files across {len(processed_files_by_key)} keys, project ID: {dataset_id}") - - # Record added files - for key, files_list in processed_files_by_key.items(): - for file_path in files_list: - added_files.append(f"{key}/{file_path}") - else: - print(f"No files to add provided in request, project ID: {dataset_id}") - - # Save updated processing log - save_processed_files_log(dataset_id, processed_log) - print(f"Updated processing log, now contains {len(processed_log)} file records") - - # Save system_prompt and mcp_settings to project directory (if provided) - if system_prompt: - system_prompt_file = os.path.join(project_dir, "system_prompt.md") - with open(system_prompt_file, 'w', encoding='utf-8') as f: - f.write(system_prompt) - print(f"Saved system_prompt, project ID: {dataset_id}") - - if mcp_settings: - mcp_settings_file = os.path.join(project_dir, "mcp_settings.json") - with open(mcp_settings_file, 'w', encoding='utf-8') as f: - json.dump(mcp_settings, f, ensure_ascii=False, indent=2) - print(f"Saved mcp_settings, project ID: {dataset_id}") - - # Generate project README.md file - try: - from utils.project_manager import save_project_readme - save_project_readme(dataset_id) - print(f"README.md generated, project ID: {dataset_id}") - except Exception as e: - print(f"Failed to generate README.md, project ID: {dataset_id}, error: {str(e)}") - # Does not affect main processing flow, continue - - # Collect all document.txt files in the project directory - document_files = [] - for root, dirs, files_list in os.walk(project_dir): - for file in files_list: - if file == "document.txt": - document_files.append(os.path.join(root, file)) - - # Build result file list - result_files = [] - for key in processed_files_by_key.keys(): - # Add corresponding dataset document.txt path - document_path = os.path.join("projects", "data", dataset_id, "datasets", key, "document.txt") - if os.path.exists(document_path): - result_files.append(document_path) - - # Also add document.txt files that exist but are not in processed_files_by_key - existing_document_paths = set(result_files) # Avoid duplicates - for doc_file in document_files: - if doc_file not in existing_document_paths: - result_files.append(doc_file) - - result = { - "status": "success", - "message": f"Incremental processing complete - added {len(added_files)} files, removed {len(removed_files)} files, {len(result_files)} document files remaining", - "dataset_id": dataset_id, - "removed_files": removed_files, - "added_files": added_files, - "processed_files": result_files, - "processed_files_by_key": processed_files_by_key, - "document_files": document_files, - "total_files_added": sum(len(files_list) for files_list in processed_files_by_key.values()), - "total_files_removed": len(removed_files), - "final_files_count": len(result_files), - "processing_time": time.time() - } - - # Update task status to completed - if task_id: - task_status_store.update_status( - task_id=task_id, - status="completed", - result=result - ) - - print(f"Incremental file processing task completed: {dataset_id}") - return result - - except Exception as e: - error_msg = f"Error during incremental file processing: {str(e)}" - print(error_msg) - - # Update task status to error - if task_id: - task_status_store.update_status( - task_id=task_id, - status="failed", - error=error_msg - ) - - return { - "status": "error", - "message": error_msg, - "dataset_id": dataset_id, - "error": str(e) - } - - -@huey.task() -def cleanup_project_async( - dataset_id: str, - remove_all: bool = False -) -> Dict[str, Any]: - """ - Asynchronously clean up project files. - - Args: - dataset_id: Unique project ID - remove_all: Whether to remove the entire project directory - - Returns: - Cleanup result dictionary - """ - try: - print(f"Starting async project cleanup, project ID: {dataset_id}") - - project_dir = os.path.join("projects", "data", dataset_id) - removed_items = [] - - if remove_all and os.path.exists(project_dir): - import shutil - shutil.rmtree(project_dir) - removed_items.append(project_dir) - result = { - "status": "success", - "message": f"Deleted entire project directory: {project_dir}", - "dataset_id": dataset_id, - "removed_items": removed_items, - "action": "remove_all" - } - else: - # Only clean processing log - log_file = os.path.join(project_dir, "processed_files.json") - if os.path.exists(log_file): - os.remove(log_file) - removed_items.append(log_file) - - result = { - "status": "success", - "message": f"Cleaned project processing log, project ID: {dataset_id}", - "dataset_id": dataset_id, - "removed_items": removed_items, - "action": "cleanup_logs" - } - - print(f"Async cleanup task completed: {dataset_id}") - return result - - except Exception as e: - error_msg = f"Error during async project cleanup: {str(e)}" - print(error_msg) - return { - "status": "error", - "message": error_msg, - "dataset_id": dataset_id, - "error": str(e) - } diff --git a/task_queue/manager.py b/task_queue/manager.py deleted file mode 100644 index 3eb2626..0000000 --- a/task_queue/manager.py +++ /dev/null @@ -1,228 +0,0 @@ -#!/usr/bin/env python3 -""" -Queue manager for handling file processing queues. -""" - -import os -import json -import time -import logging -from typing import Dict, List, Optional, Any -from huey import Huey -from huey.api import Task -from datetime import datetime, timedelta - -# Configure logging -logger = logging.getLogger('app') - -from .config import huey -from .tasks import process_file_async, process_multiple_files_async, process_zip_file_async, cleanup_processed_files - - -class QueueManager: - """Queue manager for file processing tasks.""" - - def __init__(self): - self.huey = huey - logger.info(f"Queue manager initialized with database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") - - def enqueue_file( - self, - project_id: str, - file_path: str, - original_filename: str = None, - delay: int = 0 - ) -> str: - """ - Add a file to the processing queue. - - Args: - project_id: Project ID - file_path: File path - original_filename: Original filename - delay: Delay before execution in seconds - - Returns: - Task ID - """ - if delay > 0: - task = process_file_async.schedule( - args=(project_id, file_path, original_filename), - delay=timedelta(seconds=delay) - ) - else: - task = process_file_async(project_id, file_path, original_filename) - - logger.info(f"File queued for processing: {file_path}, task ID: {task.id}") - return task.id - - def enqueue_multiple_files( - self, - project_id: str, - file_paths: List[str], - original_filenames: List[str] = None, - delay: int = 0 - ) -> List[str]: - """ - Add multiple files to the processing queue. - - Args: - project_id: Project ID - file_paths: List of file paths - original_filenames: List of original filenames - delay: Delay before execution in seconds - - Returns: - List of task IDs - """ - if delay > 0: - task = process_multiple_files_async.schedule( - args=(project_id, file_paths, original_filenames), - delay=timedelta(seconds=delay) - ) - else: - task = process_multiple_files_async(project_id, file_paths, original_filenames) - - logger.info(f"Batch files queued for processing: {len(file_paths)} files, task ID: {task.id}") - return [task.id] - - def enqueue_zip_file( - self, - project_id: str, - zip_path: str, - extract_to: str = None, - delay: int = 0 - ) -> str: - """ - Add a zip file to the processing queue. - - Args: - project_id: Project ID - zip_path: Path to the zip file - extract_to: Extraction target directory - delay: Delay before execution in seconds - - Returns: - Task ID - """ - if delay > 0: - task = process_zip_file_async.schedule( - args=(project_id, zip_path, extract_to), - delay=timedelta(seconds=delay) - ) - else: - task = process_zip_file_async(project_id, zip_path, extract_to) - - logger.info(f"Zip file queued for processing: {zip_path}, task ID: {task.id}") - return task.id - - def get_task_status(self, task_id: str) -> Dict[str, Any]: - """ - Get task status. - - Args: - task_id: Task ID - - Returns: - Task status information - """ - try: - # Try getting the task result from result storage - try: - # Use Huey's built-in result lookup when available - if hasattr(self.huey, 'result') and self.huey.result: - result = self.huey.result(task_id) - if result is not None: - return { - "task_id": task_id, - "status": "complete", - "result": result - } - except Exception: - pass - - # Check whether the task is in the pending queue - try: - pending_tasks = list(self.huey.pending()) - for task in pending_tasks: - if hasattr(task, 'id') and task.id == task_id: - return { - "task_id": task_id, - "status": "pending" - } - except Exception: - pass - - # Check whether the task is in the scheduled queue - try: - scheduled_tasks = list(self.huey.scheduled()) - for task in scheduled_tasks: - if hasattr(task, 'id') and task.id == task_id: - return { - "task_id": task_id, - "status": "scheduled" - } - except Exception: - pass - - # If not found anywhere, it may not exist or may have completed with cleaned results - return { - "task_id": task_id, - "status": "unknown", - "message": "Task status is unknown; it may already be complete or may not exist" - } - - except Exception as e: - return { - "task_id": task_id, - "status": "error", - "message": f"Failed to get task status: {str(e)}" - } - - def get_queue_stats(self) -> Dict[str, Any]: - """ - Get queue statistics. - - Returns: - Queue statistics information - """ - try: - # Use a simplified approach for queue statistics - stats = { - "total_tasks": 0, - "pending_tasks": 0, - "running_tasks": 0, - "completed_tasks": 0, - "error_tasks": 0, - "scheduled_tasks": 0, - "recent_tasks": [], - "queue_database": os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') - } - - # Try to get the number of pending tasks - try: - pending_tasks = list(self.huey.pending()) - stats["pending_tasks"] = len(pending_tasks) - stats["total_tasks"] += len(pending_tasks) - except Exception as e: - logger.error(f"Failed to get pending tasks: {e}") - - # Try to get the number of scheduled tasks - try: - scheduled_tasks = list(self.huey.scheduled()) - stats["scheduled_tasks"] = len(scheduled_tasks) - stats["total_tasks"] += len(scheduled_tasks) - except Exception as e: - logger.error(f"Failed to get scheduled tasks: {e}") - - return stats - - except Exception as e: - return { - "error": str(e), - "queue_database": os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') - } - - -# Global singleton instance -queue_manager = QueueManager() diff --git a/task_queue/optimized_consumer.py b/task_queue/optimized_consumer.py deleted file mode 100755 index ab66b08..0000000 --- a/task_queue/optimized_consumer.py +++ /dev/null @@ -1,286 +0,0 @@ -#!/usr/bin/env python3 -""" -Optimized queue consumer with integrated performance monitoring. -""" - -import sys -import os -import time -import signal -import argparse -import multiprocessing -import logging -from pathlib import Path -from concurrent.futures import ThreadPoolExecutor -import threading - -# Configure logging -logger = logging.getLogger('app') - -# Add project root directory to Python path -project_root = Path(__file__).parent.parent -sys.path.insert(0, str(project_root)) - -from task_queue.config import huey -from task_queue.manager import queue_manager -from task_queue.integration_tasks import process_files_async, cleanup_project_async -from huey.consumer import Consumer - - -class OptimizedQueueConsumer: - """Optimized queue consumer with integrated performance monitoring.""" - - def __init__(self, worker_type: str = "threads", workers: int = 2): - self.huey = huey - self.worker_type = worker_type - self.workers = workers - self.running = False - self.consumer = None - self.processed_count = 0 - self.start_time = None - - # Performance monitoring - self.performance_stats = { - 'tasks_processed': 0, - 'tasks_failed': 0, - 'avg_processing_time': 0, - 'start_time': None, - 'last_activity': None - } - - # Register signal handlers - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGTERM, self._signal_handler) - - def _signal_handler(self, signum, frame): - """Signal handler for graceful shutdown.""" - logger.info(f"\nReceived signal {signum}, shutting down queue consumer...") - self.running = False - if self.consumer: - self.consumer.stop() - - def setup_optimizations(self): - """Set up performance optimizations.""" - # Set environment variables - env_vars = { - 'PYTHONUNBUFFERED': '1', - 'PYTHONDONTWRITEBYTECODE': '1', - } - - for key, value in env_vars.items(): - os.environ[key] = value - - # Optimize huey configuration - if hasattr(huey, 'immediate'): - huey.immediate = False - - # Adjust based on worker type - if self.worker_type == "threads": - # Thread pool optimization - if hasattr(huey, 'worker_type'): - huey.worker_type = 'threads' - - # Set thread pool size - if hasattr(huey, 'always_eager'): - huey.always_eager = False - - logger.info("Queue consumer optimization setup complete:") - logger.info(f"- Worker type: {self.worker_type}") - logger.info(f"- Worker count: {self.workers}") - - def monitor_performance(self): - """Performance monitoring thread.""" - while self.running: - time.sleep(30) # Output statistics every 30 seconds - - if self.start_time: - elapsed = time.time() - self.start_time - rate = self.performance_stats['tasks_processed'] / max(1, elapsed) - - logger.info(f"\n[Performance Stats]") - logger.info(f"- Uptime: {elapsed:.1f}s") - logger.info(f"- Tasks processed: {self.performance_stats['tasks_processed']}") - logger.info(f"- Failed tasks: {self.performance_stats['tasks_failed']}") - logger.info(f"- Average processing rate: {rate:.2f} tasks/s") - - if self.performance_stats['avg_processing_time'] > 0: - logger.info(f"- Average processing time: {self.performance_stats['avg_processing_time']:.2f}s") - - def start(self): - """Start the queue consumer.""" - logger.info("=" * 60) - logger.info("Optimized queue consumer starting") - logger.info("=" * 60) - - # Apply optimizations - self.setup_optimizations() - - logger.info(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") - logger.info("Press Ctrl+C to stop the consumer") - - self.running = True - self.start_time = time.time() - self.performance_stats['start_time'] = self.start_time - - # Start performance monitoring thread - monitor_thread = threading.Thread(target=self.monitor_performance, daemon=True) - monitor_thread.start() - - try: - # Create consumer - self.consumer = Consumer( - self.huey, - workers=self.workers, - worker_type=self.worker_type, - max_delay=60.0, # Maximum delay - check_delay=1.0, # Check interval - periodic=True, # Enable periodic tasks - ) - - logger.info("Queue consumer started, waiting for tasks...") - - # Start the consumer - self.consumer.run() - - except KeyboardInterrupt: - logger.info("\nReceived keyboard interrupt signal") - except Exception as e: - logger.error(f"Queue consumer runtime error: {e}") - import traceback - traceback.print_exc() - finally: - self.shutdown() - - def shutdown(self): - """Shut down the queue consumer.""" - logger.info("\nShutting down queue consumer...") - self.running = False - - if self.consumer: - try: - self.consumer.stop() - logger.info("Queue consumer stopped") - except Exception as e: - logger.error(f"Error stopping queue consumer: {e}") - - # Output final statistics - if self.start_time: - elapsed = time.time() - self.start_time - logger.info(f"\n[Final Stats]") - logger.info(f"- Total uptime: {elapsed:.1f}s") - logger.info(f"- Total tasks processed: {self.performance_stats['tasks_processed']}") - logger.info(f"- Total failed tasks: {self.performance_stats['tasks_failed']}") - - if self.performance_stats['tasks_processed'] > 0: - rate = self.performance_stats['tasks_processed'] / elapsed - logger.info(f"- Average processing rate: {rate:.2f} tasks/s") - - -def calculate_optimal_workers(): - """Calculate the optimal number of worker threads.""" - cpu_count = multiprocessing.cpu_count() - - # Based on CPU core count and system resources - if cpu_count <= 2: - return 2 - elif cpu_count <= 4: - return 4 - else: - return min(8, cpu_count) - - -def check_queue_status(): - """Check queue status.""" - try: - stats = queue_manager.get_queue_stats() - - logger.info("\n[Queue Status]") - if isinstance(stats, dict): - if 'total_tasks' in stats: - logger.info(f"- Total tasks: {stats['total_tasks']}") - if 'pending_tasks' in stats: - logger.info(f"- Pending tasks: {stats['pending_tasks']}") - if 'scheduled_tasks' in stats: - logger.info(f"- Scheduled tasks: {stats['scheduled_tasks']}") - - # Check database file - db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') - if os.path.exists(db_path): - size = os.path.getsize(db_path) - logger.info(f"- Database size: {size} bytes") - else: - logger.info("- Database file: not found") - - except Exception as e: - logger.error(f"Failed to get queue status: {e}") - - -def main(): - """Main entry point.""" - parser = argparse.ArgumentParser(description="Optimized queue consumer") - parser.add_argument( - "--workers", - type=int, - default=calculate_optimal_workers(), - help=f"Number of worker threads (default: {calculate_optimal_workers()})" - ) - parser.add_argument( - "--worker-type", - type=str, - default="threads", - choices=["threads", "greenlets", "gevent"], - help="Worker type (default: threads)" - ) - parser.add_argument( - "--check-status", - action="store_true", - help="Check queue status and exit" - ) - parser.add_argument( - "--profile", - type=str, - default="balanced", - choices=["low_memory", "balanced", "high_performance"], - help="Performance profile" - ) - - args = parser.parse_args() - - # Apply performance profile - if args.profile == "low_memory": - os.environ['PYTHONOPTIMIZE'] = '1' - if args.workers > 2: - args.workers = 2 - logger.info(f"Low memory mode: adjusted worker count to {args.workers}") - elif args.profile == "high_performance": - if args.workers < 4: - args.workers = 4 - logger.info(f"High performance mode: adjusted worker count to {args.workers}") - - # Check queue status - if args.check_status: - check_queue_status() - return - - # Check environment - try: - import psutil - memory = psutil.virtual_memory() - logger.info("[System Info]") - logger.info(f"- CPU cores: {multiprocessing.cpu_count()}") - logger.info(f"- Available memory: {memory.available / (1024**3):.1f}GB") - logger.info(f"- Memory usage: {memory.percent:.1f}%") - except ImportError: - logger.info("[Tip] Install psutil to display system info: pip install psutil") - - # Create and start the queue consumer - consumer = OptimizedQueueConsumer( - worker_type=args.worker_type, - workers=args.workers - ) - - consumer.start() - - -if __name__ == "__main__": - main() diff --git a/task_queue/task_status.py b/task_queue/task_status.py deleted file mode 100644 index bd21a3f..0000000 --- a/task_queue/task_status.py +++ /dev/null @@ -1,210 +0,0 @@ -#!/usr/bin/env python3 -""" -Task status SQLite storage system. -""" - -import json -import os -import sqlite3 -import time -from typing import Dict, Optional, Any, List -from pathlib import Path - - -class TaskStatusStore: - """SQLite-based task status store.""" - - def __init__(self, db_path: str = "projects/queue_data/task_status.db"): - self.db_path = db_path - # Ensure directory exists - Path(db_path).parent.mkdir(parents=True, exist_ok=True) - self._init_database() - - def _init_database(self): - """Initialize database tables.""" - with sqlite3.connect(self.db_path) as conn: - conn.execute(''' - CREATE TABLE IF NOT EXISTS task_status ( - task_id TEXT PRIMARY KEY, - unique_id TEXT NOT NULL, - status TEXT NOT NULL, - created_at REAL NOT NULL, - updated_at REAL NOT NULL, - result TEXT, - error TEXT - ) - ''') - conn.commit() - - def set_status(self, task_id: str, unique_id: str, status: str, - result: Optional[Dict] = None, error: Optional[str] = None): - """Set task status.""" - current_time = time.time() - - with sqlite3.connect(self.db_path) as conn: - conn.execute(''' - INSERT OR REPLACE INTO task_status - (task_id, unique_id, status, created_at, updated_at, result, error) - VALUES (?, ?, ?, ?, ?, ?, ?) - ''', ( - task_id, unique_id, status, current_time, current_time, - json.dumps(result) if result else None, - error - )) - conn.commit() - - def get_status(self, task_id: str) -> Optional[Dict]: - """Get task status.""" - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - cursor = conn.execute( - 'SELECT * FROM task_status WHERE task_id = ?', (task_id,) - ) - row = cursor.fetchone() - - if not row: - return None - - result = dict(row) - # Parse JSON field - if result['result']: - result['result'] = json.loads(result['result']) - - return result - - def update_status(self, task_id: str, status: str, - result: Optional[Dict] = None, error: Optional[str] = None): - """Update task status.""" - with sqlite3.connect(self.db_path) as conn: - # Check if task exists - cursor = conn.execute( - 'SELECT task_id FROM task_status WHERE task_id = ?', (task_id,) - ) - if not cursor.fetchone(): - return False - - # Update status - conn.execute(''' - UPDATE task_status - SET status = ?, updated_at = ?, result = ?, error = ? - WHERE task_id = ? - ''', ( - status, time.time(), - json.dumps(result) if result else None, - error, task_id - )) - conn.commit() - return True - - def delete_status(self, task_id: str): - """Delete task status.""" - with sqlite3.connect(self.db_path) as conn: - cursor = conn.execute( - 'DELETE FROM task_status WHERE task_id = ?', (task_id,) - ) - conn.commit() - return cursor.rowcount > 0 - - def list_all(self) -> Dict[str, Dict]: - """List all task statuses.""" - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - cursor = conn.execute( - 'SELECT * FROM task_status ORDER BY updated_at DESC' - ) - all_tasks = {} - for row in cursor: - result = dict(row) - # Parse JSON field - if result['result']: - result['result'] = json.loads(result['result']) - all_tasks[result['task_id']] = result - return all_tasks - - def get_by_unique_id(self, unique_id: str) -> List[Dict]: - """Get all tasks for a given project ID.""" - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - cursor = conn.execute( - 'SELECT * FROM task_status WHERE unique_id = ? ORDER BY updated_at DESC', - (unique_id,) - ) - tasks = [] - for row in cursor: - result = dict(row) - if result['result']: - result['result'] = json.loads(result['result']) - tasks.append(result) - return tasks - - def cleanup_old_tasks(self, older_than_days: int = 7) -> int: - """Clean up old task records.""" - cutoff_time = time.time() - (older_than_days * 24 * 3600) - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.execute( - 'DELETE FROM task_status WHERE updated_at < ?', - (cutoff_time,) - ) - conn.commit() - return cursor.rowcount - - def get_statistics(self) -> Dict[str, Any]: - """Get task statistics.""" - with sqlite3.connect(self.db_path) as conn: - # Total tasks - total = conn.execute('SELECT COUNT(*) FROM task_status').fetchone()[0] - - # Status breakdown - status_stats = conn.execute(''' - SELECT status, COUNT(*) as count - FROM task_status - GROUP BY status - ''').fetchall() - - # Tasks in the last 24 hours - recent = time.time() - (24 * 3600) - recent_tasks = conn.execute( - 'SELECT COUNT(*) FROM task_status WHERE updated_at > ?', - (recent,) - ).fetchone()[0] - - return { - 'total_tasks': total, - 'status_breakdown': dict(status_stats), - 'recent_24h': recent_tasks, - 'database_path': self.db_path - } - - def search_tasks(self, status: Optional[str] = None, - unique_id: Optional[str] = None, - limit: int = 100) -> List[Dict]: - """Search tasks.""" - query = 'SELECT * FROM task_status WHERE 1=1' - params = [] - - if status: - query += ' AND status = ?' - params.append(status) - - if unique_id: - query += ' AND unique_id = ?' - params.append(unique_id) - - query += ' ORDER BY updated_at DESC LIMIT ?' - params.append(limit) - - with sqlite3.connect(self.db_path) as conn: - conn.row_factory = sqlite3.Row - cursor = conn.execute(query, params) - tasks = [] - for row in cursor: - result = dict(row) - if result['result']: - result['result'] = json.loads(result['result']) - tasks.append(result) - return tasks - - -# Global task status store instance -task_status_store = TaskStatusStore() diff --git a/task_queue/tasks.py b/task_queue/tasks.py deleted file mode 100644 index 5fd7810..0000000 --- a/task_queue/tasks.py +++ /dev/null @@ -1,359 +0,0 @@ -#!/usr/bin/env python3 -""" -File processing tasks for the queue system. -""" - -import os -import json -import time -import shutil -import logging -from pathlib import Path -from typing import Dict, List, Optional, Any -from huey import crontab - -# Configure logging -logger = logging.getLogger('app') - -from .config import huey -from utils.file_utils import ( - extract_zip_file, - get_file_hash, - load_processed_files_log, - save_processed_files_log, - get_document_preview -) - - -@huey.task() -def process_file_async( - project_id: str, - file_path: str, - original_filename: str = None, - target_directory: str = "files" -) -> Dict[str, Any]: - """ - Asynchronously process a single file. - - Args: - project_id: Project ID - file_path: File path - original_filename: Original filename - target_directory: Target directory - - Returns: - Processing result dictionary - """ - try: - logger.info(f"Starting file processing: {file_path}") - - # Ensure project directory exists - project_dir = os.path.join("projects", project_id) - files_dir = os.path.join(project_dir, target_directory) - os.makedirs(files_dir, exist_ok=True) - - # Get file hash as identifier - file_hash = get_file_hash(file_path) - - # Check if file has already been processed - processed_log = load_processed_files_log(project_id) - if file_hash in processed_log: - logger.info(f"File already processed, skipping: {file_path}") - return { - "status": "skipped", - "message": "File already processed", - "file_hash": file_hash, - "project_id": project_id - } - - # Process the file - result = _process_single_file( - file_path, - files_dir, - original_filename or os.path.basename(file_path) - ) - - # Update processing log - if result["status"] == "success": - processed_log[file_hash] = { - "original_path": file_path, - "original_filename": original_filename or os.path.basename(file_path), - "processed_at": str(time.time()), - "status": "processed", - "result": result - } - save_processed_files_log(project_id, processed_log) - - result["file_hash"] = file_hash - result["project_id"] = project_id - - logger.info(f"File processing complete: {file_path}, status: {result['status']}") - return result - - except Exception as e: - error_msg = f"Error processing file: {str(e)}" - logger.error(error_msg) - return { - "status": "error", - "message": error_msg, - "file_path": file_path, - "project_id": project_id - } - - -@huey.task() -def process_multiple_files_async( - project_id: str, - file_paths: List[str], - original_filenames: List[str] = None -) -> List[Dict[str, Any]]: - """ - Asynchronously process multiple files in batch. - - Args: - project_id: Project ID - file_paths: List of file paths - original_filenames: List of original filenames - - Returns: - List of processing results - """ - try: - logger.info(f"Starting batch processing of {len(file_paths)} files") - - results = [] - for i, file_path in enumerate(file_paths): - original_filename = original_filenames[i] if original_filenames and i < len(original_filenames) else None - - # Create async task for each file - result = process_file_async(project_id, file_path, original_filename) - results.append(result) - - logger.info(f"Batch file processing tasks submitted, total {len(results)} files") - return results - - except Exception as e: - error_msg = f"Error during batch file processing: {str(e)}" - logger.error(error_msg) - return [{ - "status": "error", - "message": error_msg, - "project_id": project_id - }] - - -@huey.task() -def process_zip_file_async( - project_id: str, - zip_path: str, - extract_to: str = None -) -> Dict[str, Any]: - """ - Asynchronously process a zip archive file. - - Args: - project_id: Project ID - zip_path: Zip file path - extract_to: Extraction target directory - - Returns: - Processing result dictionary - """ - try: - logger.info(f"Starting zip file processing: {zip_path}") - - # Set extraction directory - if extract_to is None: - extract_to = os.path.join("projects", project_id, "extracted", os.path.basename(zip_path)) - - os.makedirs(extract_to, exist_ok=True) - - # Extract files - extracted_files = extract_zip_file(zip_path, extract_to) - - if not extracted_files: - return { - "status": "error", - "message": "Extraction failed or no supported files found", - "zip_path": zip_path, - "project_id": project_id - } - - # Batch process extracted files - result = process_multiple_files_async(project_id, extracted_files) - - return { - "status": "success", - "message": f"Zip file processing complete, extracted {len(extracted_files)} files", - "zip_path": zip_path, - "extract_to": extract_to, - "extracted_files": extracted_files, - "project_id": project_id, - "batch_task_result": result - } - - except Exception as e: - error_msg = f"Error processing zip file: {str(e)}" - logger.error(error_msg) - return { - "status": "error", - "message": error_msg, - "zip_path": zip_path, - "project_id": project_id - } - - -@huey.task() -def cleanup_processed_files( - project_id: str, - older_than_days: int = 30 -) -> Dict[str, Any]: - """ - Clean up old processed files. - - Args: - project_id: Project ID - older_than_days: Clean files older than this many days - - Returns: - Cleanup result dictionary - """ - try: - logger.info(f"Starting cleanup of files older than {older_than_days} days in project {project_id}") - - project_dir = os.path.join("projects", project_id) - if not os.path.exists(project_dir): - return { - "status": "error", - "message": "Project directory does not exist", - "project_id": project_id - } - - current_time = time.time() - cutoff_time = current_time - (older_than_days * 24 * 3600) - cleaned_files = [] - - # Walk through project directory - for root, dirs, files in os.walk(project_dir): - for file in files: - file_path = os.path.join(root, file) - file_mtime = os.path.getmtime(file_path) - - if file_mtime < cutoff_time: - try: - os.remove(file_path) - cleaned_files.append(file_path) - logger.info(f"Deleted old file: {file_path}") - except Exception as e: - logger.error(f"Failed to delete file {file_path}: {str(e)}") - - # Clean up empty directories - for root, dirs, files in os.walk(project_dir, topdown=False): - for dir in dirs: - dir_path = os.path.join(root, dir) - try: - if not os.listdir(dir_path): - os.rmdir(dir_path) - logger.info(f"Deleted empty directory: {dir_path}") - except Exception as e: - logger.error(f"Failed to delete directory {dir_path}: {str(e)}") - - return { - "status": "success", - "message": f"Cleanup complete, deleted {len(cleaned_files)} files", - "project_id": project_id, - "cleaned_files": cleaned_files, - "older_than_days": older_than_days - } - - except Exception as e: - error_msg = f"Error during file cleanup: {str(e)}" - logger.error(error_msg) - return { - "status": "error", - "message": error_msg, - "project_id": project_id - } - - -def _process_single_file( - file_path: str, - target_dir: str, - original_filename: str -) -> Dict[str, Any]: - """ - Internal method for processing a single file. - - Args: - file_path: Source file path - target_dir: Target directory - original_filename: Original filename - - Returns: - Processing result dictionary - """ - try: - # Check if file exists - if not os.path.exists(file_path): - return { - "status": "error", - "message": "Source file does not exist", - "file_path": file_path - } - - # Get file info - file_size = os.path.getsize(file_path) - file_ext = os.path.splitext(original_filename)[1].lower() - - # Different processing based on file type - supported_extensions = ['.txt', '.md', '.csv', '.xlsx', '.zip'] - - if file_ext not in supported_extensions: - return { - "status": "error", - "message": f"Unsupported file type: {file_ext}", - "file_path": file_path, - "supported_extensions": supported_extensions - } - - # Copy file to target directory - target_file_path = os.path.join(target_dir, original_filename) - - # If target file already exists, add timestamp - if os.path.exists(target_file_path): - name, ext = os.path.splitext(original_filename) - timestamp = int(time.time()) - target_file_path = os.path.join(target_dir, f"{name}_{timestamp}{ext}") - - shutil.copy2(file_path, target_file_path) - - # Get file preview (if it's a text file) - preview = None - if file_ext in ['.txt', '.md']: - preview = get_document_preview(target_file_path, max_lines=5) - - return { - "status": "success", - "message": "File processed successfully", - "original_path": file_path, - "target_path": target_file_path, - "file_size": file_size, - "file_extension": file_ext, - "preview": preview - } - - except Exception as e: - return { - "status": "error", - "message": f"Error processing file: {str(e)}", - "file_path": file_path - } - - -# Periodic task example: clean up files older than 30 days daily at 2 AM -@huey.periodic_task(crontab(hour=2, minute=0)) -def daily_cleanup(): - """Daily cleanup task.""" - logger.info("Running daily cleanup task") - # Add cleanup logic here - return {"status": "completed", "message": "Daily cleanup task completed"} diff --git a/utils/__init__.py b/utils/__init__.py index 2bfefd0..7267e8b 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -13,23 +13,6 @@ from .file_utils import ( save_processed_files_log ) -from .dataset_manager import ( - download_dataset_files, - generate_dataset_structure, - remove_dataset_directory, - remove_dataset_directory_by_key -) - -from .project_manager import ( - generate_project_readme, - save_project_readme, - get_project_status, - remove_project, - list_projects, - get_project_stats -) - - from .system_optimizer import ( setup_system_optimizations ) @@ -59,11 +42,6 @@ from .api_models import ( ProjectListResponse, ProjectStatsResponse, ProjectActionResponse, - QueueTaskRequest, - IncrementalTaskRequest, - QueueTaskResponse, - QueueStatusResponse, - TaskStatusResponse, create_success_response, create_error_response, create_chat_response, @@ -90,20 +68,6 @@ __all__ = [ 'load_processed_files_log', 'save_processed_files_log', - # dataset_manager - 'download_dataset_files', - 'generate_dataset_structure', - 'remove_dataset_directory', - 'remove_dataset_directory_by_key', - - # project_manager - 'generate_project_readme', - 'save_project_readme', - 'get_project_status', - 'remove_project', - 'list_projects', - 'get_project_stats', - # agent_pool 'AgentPool', 'get_agent_pool', @@ -128,10 +92,6 @@ __all__ = [ 'ProjectListResponse', 'ProjectStatsResponse', 'ProjectActionResponse', - 'QueueTaskRequest', - 'QueueTaskResponse', - 'QueueStatusResponse', - 'TaskStatusResponse', 'create_success_response', 'create_error_response', 'create_chat_response', diff --git a/utils/api_models.py b/utils/api_models.py index 7fd98aa..7ff4bce 100644 --- a/utils/api_models.py +++ b/utils/api_models.py @@ -224,133 +224,6 @@ def create_error_response(message: str, error_type: str = "error", **kwargs) -> } -class QueueTaskRequest(BaseModel): - """Queue task request model""" - dataset_id: str - files: Optional[Dict[str, List[str]]] = Field(default=None, description="Files organized by key groups. Each key maps to a list of file paths (supports zip files)") - upload_folder: Optional[Dict[str, str]] = Field(default=None, description="Upload folders organized by group names. Each key maps to a folder name. Example: {'group1': 'my_project1', 'group2': 'my_project2'}") - priority: Optional[int] = Field(default=0, description="Task priority (higher number = higher priority)") - delay: Optional[int] = Field(default=0, description="Delay execution by N seconds") - - model_config = ConfigDict(extra='allow') - - @field_validator('upload_folder', mode='before') - @classmethod - def validate_upload_folder(cls, v): - """Validate upload_folder dict format""" - if v is None: - return None - if isinstance(v, dict): - # Validate dict format - for key, value in v.items(): - if not isinstance(key, str): - raise ValueError(f"Key in upload_folder dict must be string, got {type(key)}") - if not isinstance(value, str): - raise ValueError(f"Value in upload_folder dict must be string (folder name), got {type(value)} for key '{key}'") - return v - else: - raise ValueError(f"upload_folder must be a dict with group names as keys and folder names as values, got {type(v)}") - - @field_validator('files', mode='before') - @classmethod - def validate_files(cls, v): - """Validate dict format with key-grouped files""" - if v is None: - return None - if isinstance(v, dict): - # Validate dict format - for key, value in v.items(): - if not isinstance(key, str): - raise ValueError(f"Key in files dict must be string, got {type(key)}") - if not isinstance(value, list): - raise ValueError(f"Value in files dict must be list, got {type(value)} for key '{key}'") - for item in value: - if not isinstance(item, str): - raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'") - return v - else: - raise ValueError(f"Files must be a dict with key groups, got {type(v)}") - - -class IncrementalTaskRequest(BaseModel): - """Incremental file processing request model""" - dataset_id: str = Field(..., description="Dataset ID for the project") - files_to_add: Optional[Dict[str, List[str]]] = Field(default=None, description="Files to add organized by key groups") - files_to_remove: Optional[Dict[str, List[str]]] = Field(default=None, description="Files to remove organized by key groups") - system_prompt: Optional[str] = None - mcp_settings: Optional[List[Dict]] = None - priority: Optional[int] = Field(default=0, description="Task priority (higher number = higher priority)") - delay: Optional[int] = Field(default=0, description="Delay execution by N seconds") - - model_config = ConfigDict(extra='allow') - - @field_validator('files_to_add', mode='before') - @classmethod - def validate_files_to_add(cls, v): - """Validate files_to_add dict format""" - if v is None: - return None - if isinstance(v, dict): - for key, value in v.items(): - if not isinstance(key, str): - raise ValueError(f"Key in files_to_add dict must be string, got {type(key)}") - if not isinstance(value, list): - raise ValueError(f"Value in files_to_add dict must be list, got {type(value)} for key '{key}'") - for item in value: - if not isinstance(item, str): - raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'") - return v - else: - raise ValueError(f"files_to_add must be a dict with key groups, got {type(v)}") - - @field_validator('files_to_remove', mode='before') - @classmethod - def validate_files_to_remove(cls, v): - """Validate files_to_remove dict format""" - if v is None: - return None - if isinstance(v, dict): - for key, value in v.items(): - if not isinstance(key, str): - raise ValueError(f"Key in files_to_remove dict must be string, got {type(key)}") - if not isinstance(value, list): - raise ValueError(f"Value in files_to_remove dict must be list, got {type(value)} for key '{key}'") - for item in value: - if not isinstance(item, str): - raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'") - return v - else: - raise ValueError(f"files_to_remove must be a dict with key groups, got {type(v)}") - - -class QueueTaskResponse(BaseModel): - """Queue task response model""" - success: bool - message: str - dataset_id: str - task_id: Optional[str] = None - task_status: Optional[str] = None - estimated_processing_time: Optional[int] = None # seconds - - -class QueueStatusResponse(BaseModel): - """Queue status response model""" - success: bool - message: str - queue_stats: Dict[str, Any] - pending_tasks: List[Dict[str, Any]] - - -class TaskStatusResponse(BaseModel): - """Task status response model""" - success: bool - message: str - task_id: str - task_status: Optional[str] = None - task_result: Optional[Dict[str, Any]] = None - error: Optional[str] = None - - def create_chat_response( messages: List[Message], model: str, diff --git a/utils/data_merger.py b/utils/data_merger.py deleted file mode 100644 index bccf38a..0000000 --- a/utils/data_merger.py +++ /dev/null @@ -1,439 +0,0 @@ -#!/usr/bin/env python3 -""" -Data merging functions for combining processed file results. -""" - -import os -import pickle -import logging -from typing import Dict -from utils.settings import EMBEDDING_MODEL_NAME - -# Configure logger -logger = logging.getLogger('app') - -# Try to import numpy, but handle if missing -try: - import numpy as np - NUMPY_SUPPORT = True -except ImportError: - logger.warning("NumPy not available, some embedding features may be limited") - NUMPY_SUPPORT = False - - -def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: - """Merge all document.txt files in a group into a single document.""" - - processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) - dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name) - os.makedirs(dataset_group_dir, exist_ok=True) - - merged_document_path = os.path.join(dataset_group_dir, "document.txt") - - result = { - "success": False, - "merged_document_path": merged_document_path, - "source_files": [], - "total_pages": 0, - "total_characters": 0, - "error": None - } - - try: - # Find all document.txt files in the processed directory - document_files = [] - if os.path.exists(processed_group_dir): - for item in os.listdir(processed_group_dir): - item_path = os.path.join(processed_group_dir, item) - if os.path.isdir(item_path): - document_path = os.path.join(item_path, "document.txt") - if os.path.exists(document_path) and os.path.getsize(document_path) > 0: - document_files.append((item, document_path)) - - if not document_files: - result["error"] = "No document files found to merge" - return result - - # Merge all documents with page separators - merged_content = [] - total_characters = 0 - - for filename_stem, document_path in sorted(document_files): - try: - with open(document_path, 'r', encoding='utf-8') as f: - content = f.read().strip() - - if content: - merged_content.append(f"# Page {filename_stem}") - merged_content.append(content) - total_characters += len(content) - result["source_files"].append(filename_stem) - - except Exception as e: - logger.error(f"Error reading document file {document_path}: {str(e)}") - continue - - if merged_content: - # Write merged document - with open(merged_document_path, 'w', encoding='utf-8') as f: - f.write('\n\n'.join(merged_content)) - - result["total_pages"] = len(document_files) - result["total_characters"] = total_characters - result["success"] = True - - else: - result["error"] = "No valid content found in document files" - - except Exception as e: - result["error"] = f"Document merging failed: {str(e)}" - logger.error(f"Error merging documents for group {group_name}: {str(e)}") - - return result - - -def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: - """Merge all pagination.txt files in a group.""" - - processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) - dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name) - os.makedirs(dataset_group_dir, exist_ok=True) - - merged_pagination_path = os.path.join(dataset_group_dir, "pagination.txt") - - result = { - "success": False, - "merged_pagination_path": merged_pagination_path, - "source_files": [], - "total_lines": 0, - "error": None - } - - try: - # Find all pagination.txt files - pagination_files = [] - if os.path.exists(processed_group_dir): - for item in os.listdir(processed_group_dir): - item_path = os.path.join(processed_group_dir, item) - if os.path.isdir(item_path): - pagination_path = os.path.join(item_path, "pagination.txt") - if os.path.exists(pagination_path) and os.path.getsize(pagination_path) > 0: - pagination_files.append((item, pagination_path)) - - if not pagination_files: - result["error"] = "No pagination files found to merge" - return result - - # Merge all pagination files - merged_lines = [] - - for filename_stem, pagination_path in sorted(pagination_files): - try: - with open(pagination_path, 'r', encoding='utf-8') as f: - lines = f.readlines() - - for line in lines: - line = line.strip() - if line: - merged_lines.append(line) - - result["source_files"].append(filename_stem) - - except Exception as e: - logger.error(f"Error reading pagination file {pagination_path}: {str(e)}") - continue - - if merged_lines: - # Write merged pagination - with open(merged_pagination_path, 'w', encoding='utf-8') as f: - for line in merged_lines: - f.write(f"{line}\n") - - result["total_lines"] = len(merged_lines) - result["success"] = True - - else: - result["error"] = "No valid pagination data found" - - except Exception as e: - result["error"] = f"Pagination merging failed: {str(e)}" - logger.error(f"Error merging paginations for group {group_name}: {str(e)}") - - return result - - -def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: - """Merge all embedding.pkl files in a group.""" - - processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) - dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name) - os.makedirs(dataset_group_dir, exist_ok=True) - - merged_embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") - - result = { - "success": False, - "merged_embedding_path": merged_embedding_path, - "source_files": [], - "total_chunks": 0, - "total_dimensions": 0, - "error": None - } - - try: - # Find all embedding.pkl files - embedding_files = [] - if os.path.exists(processed_group_dir): - for item in os.listdir(processed_group_dir): - item_path = os.path.join(processed_group_dir, item) - if os.path.isdir(item_path): - embedding_path = os.path.join(item_path, "embedding.pkl") - if os.path.exists(embedding_path) and os.path.getsize(embedding_path) > 0: - embedding_files.append((item, embedding_path)) - - if not embedding_files: - result["error"] = "No embedding files found to merge" - return result - - # Load and merge all embedding data - all_chunks = [] - all_embeddings = [] # Fix: collect all embedding vectors - total_chunks = 0 - dimensions = 0 - chunking_strategy = 'unknown' - chunking_params = {} - model_path = EMBEDDING_MODEL_NAME - - for filename_stem, embedding_path in sorted(embedding_files): - try: - with open(embedding_path, 'rb') as f: - embedding_data = pickle.load(f) - - if isinstance(embedding_data, dict) and 'chunks' in embedding_data: - chunks = embedding_data['chunks'] - - # Get embedding vectors (critical fix) - if 'embeddings' in embedding_data: - embeddings = embedding_data['embeddings'] - all_embeddings.append(embeddings) - - # Get model metadata from the first file - if 'model_path' in embedding_data: - model_path = embedding_data['model_path'] - if 'chunking_strategy' in embedding_data: - chunking_strategy = embedding_data['chunking_strategy'] - if 'chunking_params' in embedding_data: - chunking_params = embedding_data['chunking_params'] - - # Add source file metadata to each chunk - for chunk in chunks: - if isinstance(chunk, dict): - chunk['source_file'] = filename_stem - chunk['source_group'] = group_name - elif isinstance(chunk, str): - # If the chunk is a string, keep it unchanged - pass - - all_chunks.extend(chunks) - total_chunks += len(chunks) - - result["source_files"].append(filename_stem) - - except Exception as e: - logger.error(f"Error loading embedding file {embedding_path}: {str(e)}") - continue - - if all_chunks and all_embeddings: - # Merge all embedding vectors - try: - # Try merging tensors with torch - import torch - if all(isinstance(emb, torch.Tensor) for emb in all_embeddings): - merged_embeddings = torch.cat(all_embeddings, dim=0) - dimensions = merged_embeddings.shape[1] - else: - # If the values are not tensors, try converting them to numpy - import numpy as np - if NUMPY_SUPPORT: - np_embeddings = [] - for emb in all_embeddings: - if hasattr(emb, 'numpy'): - np_embeddings.append(emb.numpy()) - elif isinstance(emb, np.ndarray): - np_embeddings.append(emb) - else: - # If conversion fails, skip this file - logger.warning(f"Warning: Cannot convert embedding to numpy from file {filename_stem}") - continue - - if np_embeddings: - merged_embeddings = np.concatenate(np_embeddings, axis=0) - dimensions = merged_embeddings.shape[1] - else: - result["error"] = "No valid embedding tensors could be merged" - return result - else: - result["error"] = "NumPy not available for merging embeddings" - return result - - except ImportError: - # If torch is unavailable, try using numpy - if NUMPY_SUPPORT: - import numpy as np - np_embeddings = [] - for emb in all_embeddings: - if hasattr(emb, 'numpy'): - np_embeddings.append(emb.numpy()) - elif isinstance(emb, np.ndarray): - np_embeddings.append(emb) - else: - logger.warning(f"Warning: Cannot convert embedding to numpy from file {filename_stem}") - continue - - if np_embeddings: - merged_embeddings = np.concatenate(np_embeddings, axis=0) - dimensions = merged_embeddings.shape[1] - else: - result["error"] = "No valid embedding tensors could be merged" - return result - else: - result["error"] = "Neither torch nor numpy available for merging embeddings" - return result - except Exception as e: - result["error"] = f"Failed to merge embedding tensors: {str(e)}" - logger.error(f"Error merging embedding tensors: {str(e)}") - return result - - # Create merged embedding data structure - merged_embedding_data = { - 'chunks': all_chunks, - 'embeddings': merged_embeddings, # Critical fix: include the embeddings key - 'total_chunks': total_chunks, - 'dimensions': dimensions, - 'source_files': result["source_files"], - 'group_name': group_name, - 'merged_at': str(__import__('time').time()), - 'chunking_strategy': chunking_strategy, - 'chunking_params': chunking_params, - 'model_path': model_path - } - - # Save merged embeddings - with open(merged_embedding_path, 'wb') as f: - pickle.dump(merged_embedding_data, f) - - result["total_chunks"] = total_chunks - result["total_dimensions"] = dimensions - result["success"] = True - - else: - result["error"] = "No valid embedding data found" - - except Exception as e: - result["error"] = f"Embedding merging failed: {str(e)}" - logger.error(f"Error merging embeddings for group {group_name}: {str(e)}") - - return result - - -def merge_all_data_by_group(unique_id: str, group_name: str) -> Dict: - """Merge documents, paginations, and embeddings for a group.""" - - merge_results = { - "group_name": group_name, - "unique_id": unique_id, - "success": True, - "document_merge": None, - "pagination_merge": None, - "embedding_merge": None, - "errors": [] - } - - # Merge documents - document_result = merge_documents_by_group(unique_id, group_name) - merge_results["document_merge"] = document_result - - if not document_result["success"]: - merge_results["success"] = False - merge_results["errors"].append(f"Document merge failed: {document_result['error']}") - - # Merge paginations - pagination_result = merge_paginations_by_group(unique_id, group_name) - merge_results["pagination_merge"] = pagination_result - - if not pagination_result["success"]: - merge_results["success"] = False - merge_results["errors"].append(f"Pagination merge failed: {pagination_result['error']}") - - # Merge embeddings - embedding_result = merge_embeddings_by_group(unique_id, group_name) - merge_results["embedding_merge"] = embedding_result - - if not embedding_result["success"]: - merge_results["success"] = False - merge_results["errors"].append(f"Embedding merge failed: {embedding_result['error']}") - - return merge_results - - -def get_group_merge_status(unique_id: str, group_name: str) -> Dict: - """Get the status of merged data for a group.""" - - dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name) - - status = { - "group_name": group_name, - "unique_id": unique_id, - "dataset_dir_exists": os.path.exists(dataset_group_dir), - "document_exists": False, - "document_size": 0, - "pagination_exists": False, - "pagination_size": 0, - "embedding_exists": False, - "embedding_size": 0, - "merge_complete": False - } - - if os.path.exists(dataset_group_dir): - document_path = os.path.join(dataset_group_dir, "document.txt") - pagination_path = os.path.join(dataset_group_dir, "pagination.txt") - embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") - - if os.path.exists(document_path): - status["document_exists"] = True - status["document_size"] = os.path.getsize(document_path) - - if os.path.exists(pagination_path): - status["pagination_exists"] = True - status["pagination_size"] = os.path.getsize(pagination_path) - - if os.path.exists(embedding_path): - status["embedding_exists"] = True - status["embedding_size"] = os.path.getsize(embedding_path) - - # Check if all files exist and are not empty - if (status["document_exists"] and status["document_size"] > 0 and - status["pagination_exists"] and status["pagination_size"] > 0 and - status["embedding_exists"] and status["embedding_size"] > 0): - status["merge_complete"] = True - - return status - - -def cleanup_dataset_group(unique_id: str, group_name: str) -> bool: - """Clean up merged dataset files for a group.""" - - dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name) - - try: - if os.path.exists(dataset_group_dir): - import shutil - shutil.rmtree(dataset_group_dir) - logger.info(f"Cleaned up dataset group: {group_name}") - return True - else: - return True # Nothing to clean up - - except Exception as e: - logger.error(f"Error cleaning up dataset group {group_name}: {str(e)}") - return False diff --git a/utils/dataset_manager.py b/utils/dataset_manager.py deleted file mode 100644 index 5526241..0000000 --- a/utils/dataset_manager.py +++ /dev/null @@ -1,297 +0,0 @@ -#!/usr/bin/env python3 -""" -Dataset management functions for organizing and processing datasets. -New implementation with per-file processing and group merging. -""" - -import os -import json -import logging -from typing import Dict, List - -# Configure logger -logger = logging.getLogger('app') - -# Import new modules -from utils.file_manager import ( - ensure_directories, sync_files_to_group, cleanup_orphaned_files, - get_group_files_list -) -from utils.single_file_processor import ( - process_single_file, check_file_already_processed -) -from utils.data_merger import ( - merge_all_data_by_group, cleanup_dataset_group -) - - -async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], incremental_mode: bool = False) -> Dict[str, List[str]]: - """ - Process dataset files with new architecture: - 1. Sync files to group directories - 2. Process each file individually - 3. Merge results by group - 4. Clean up orphaned files (only in non-incremental mode) - - Args: - unique_id: Project ID - files: Dictionary of files to process, grouped by key - incremental_mode: If True, preserve existing files and only process new ones - """ - if not files: - return {} - - logger.info(f"Starting {'incremental' if incremental_mode else 'full'} file processing for project: {unique_id}") - - # Ensure project directories exist - ensure_directories(unique_id) - - # Step 1: Sync files to group directories - logger.info("Step 1: Syncing files to group directories...") - synced_files, failed_files = sync_files_to_group(unique_id, files, incremental_mode) - - # Step 2: Detect changes and cleanup orphaned files (only in non-incremental mode) - from utils.file_manager import detect_file_changes - changes = detect_file_changes(unique_id, files, incremental_mode) - - # Only cleanup orphaned files in non-incremental mode or when files are explicitly removed - if not incremental_mode and any(changes["removed"].values()): - logger.info("Step 2: Cleaning up orphaned files...") - removed_files = cleanup_orphaned_files(unique_id, changes) - logger.info(f"Removed orphaned files: {removed_files}") - elif incremental_mode: - logger.info("Step 2: Skipping cleanup in incremental mode to preserve existing files") - - # Step 3: Process individual files - logger.info("Step 3: Processing individual files...") - processed_files_by_group = {} - processing_results = {} - - for group_name, file_list in files.items(): - processed_files_by_group[group_name] = [] - processing_results[group_name] = [] - - for file_path in file_list: - filename = os.path.basename(file_path) - - # Get local file path - local_path = os.path.join("projects", "data", unique_id, "files", group_name, filename) - - # Skip if file doesn't exist (might be remote file that failed to download) - if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')): - logger.warning(f"Skipping non-existent file: {filename}") - continue - - # Check if already processed - if check_file_already_processed(unique_id, group_name, filename): - logger.info(f"Skipping already processed file: {filename}") - processed_files_by_group[group_name].append(filename) - processing_results[group_name].append({ - "filename": filename, - "status": "existing" - }) - continue - - # Process the file - logger.info(f"Processing file: {filename} (group: {group_name})") - result = await process_single_file(unique_id, group_name, filename, file_path, local_path) - processing_results[group_name].append(result) - - if result["success"]: - processed_files_by_group[group_name].append(filename) - logger.info(f" Successfully processed {filename}") - else: - logger.error(f" Failed to process {filename}: {result['error']}") - - # Step 4: Merge results by group - logger.info("Step 4: Merging results by group...") - merge_results = {} - - for group_name in processed_files_by_group.keys(): - # Get all files in the group (including existing ones) - group_files = get_group_files_list(unique_id, group_name) - - if group_files: - logger.info(f"Merging group: {group_name} with {len(group_files)} files") - merge_result = merge_all_data_by_group(unique_id, group_name) - merge_results[group_name] = merge_result - - if merge_result["success"]: - logger.info(f" Successfully merged group {group_name}") - else: - logger.error(f" Failed to merge group {group_name}: {merge_result['errors']}") - - # Step 5: Save processing log - logger.info("Step 5: Saving processing log...") - await save_processing_log(unique_id, files, synced_files, processing_results, merge_results) - - logger.info(f"File processing completed for project: {unique_id}") - return processed_files_by_group - - -async def save_processing_log( - unique_id: str, - requested_files: Dict[str, List[str]], - synced_files: Dict, - processing_results: Dict, - merge_results: Dict -): - """Save comprehensive processing log.""" - - log_data = { - "unique_id": unique_id, - "timestamp": str(os.path.getmtime("projects") if os.path.exists("projects") else 0), - "requested_files": requested_files, - "synced_files": synced_files, - "processing_results": processing_results, - "merge_results": merge_results, - "summary": { - "total_groups": len(requested_files), - "total_files_requested": sum(len(files) for files in requested_files.values()), - "total_files_processed": sum( - len([r for r in results if r.get("success", False)]) - for results in processing_results.values() - ), - "total_groups_merged": len([r for r in merge_results.values() if r.get("success", False)]) - } - } - - log_file_path = os.path.join("projects", "data", unique_id, "processing_log.json") - try: - with open(log_file_path, 'w', encoding='utf-8') as f: - json.dump(log_data, f, ensure_ascii=False, indent=2) - logger.info(f"Processing log saved to: {log_file_path}") - except Exception as e: - logger.error(f"Error saving processing log: {str(e)}") - - -def generate_dataset_structure(unique_id: str) -> str: - """Generate a string representation of the dataset structure""" - project_dir = os.path.join("projects", "data", unique_id) - structure = [] - - def add_directory_contents(dir_path: str, prefix: str = ""): - try: - if not os.path.exists(dir_path): - structure.append(f"{prefix}└── (not found)") - return - - items = sorted(os.listdir(dir_path)) - for i, item in enumerate(items): - item_path = os.path.join(dir_path, item) - is_last = i == len(items) - 1 - current_prefix = "└── " if is_last else "├── " - structure.append(f"{prefix}{current_prefix}{item}") - - if os.path.isdir(item_path): - next_prefix = prefix + (" " if is_last else "│ ") - add_directory_contents(item_path, next_prefix) - except Exception as e: - structure.append(f"{prefix}└── Error: {str(e)}") - - # Add files directory structure - files_dir = os.path.join(project_dir, "files") - structure.append("files/") - add_directory_contents(files_dir, "") - - # Add processed directory structure - processed_dir = os.path.join(project_dir, "processed") - structure.append("\nprocessed/") - add_directory_contents(processed_dir, "") - - # Add dataset directory structure - dataset_dir = os.path.join(project_dir, "datasets") - structure.append("\ndataset/") - add_directory_contents(dataset_dir, "") - - return "\n".join(structure) - - -def get_processing_status(unique_id: str) -> Dict: - """Get comprehensive processing status for a project.""" - - project_dir = os.path.join("projects", "data", unique_id) - - if not os.path.exists(project_dir): - return { - "project_exists": False, - "unique_id": unique_id - } - - status = { - "project_exists": True, - "unique_id": unique_id, - "directories": { - "files": os.path.exists(os.path.join(project_dir, "files")), - "processed": os.path.exists(os.path.join(project_dir, "processed")), - "dataset": os.path.exists(os.path.join(project_dir, "datasets")) - }, - "groups": {}, - "processing_log_exists": os.path.exists(os.path.join(project_dir, "processing_log.json")) - } - - # Check each group's status - files_dir = os.path.join(project_dir, "files") - if os.path.exists(files_dir): - for group_name in os.listdir(files_dir): - group_path = os.path.join(files_dir, group_name) - if os.path.isdir(group_path): - status["groups"][group_name] = { - "files_count": len([ - f for f in os.listdir(group_path) - if os.path.isfile(os.path.join(group_path, f)) - ]), - "merge_status": "pending" - } - - # Check merge status for each group - dataset_dir = os.path.join(project_dir, "datasets") - if os.path.exists(dataset_dir): - for group_name in os.listdir(dataset_dir): - group_path = os.path.join(dataset_dir, group_name) - if os.path.isdir(group_path): - if group_name in status["groups"]: - # Check if merge is complete - document_path = os.path.join(group_path, "document.txt") - pagination_path = os.path.join(group_path, "pagination.txt") - embedding_path = os.path.join(group_path, "embedding.pkl") - - if (os.path.exists(document_path) and os.path.exists(pagination_path) and - os.path.exists(embedding_path)): - status["groups"][group_name]["merge_status"] = "completed" - else: - status["groups"][group_name]["merge_status"] = "incomplete" - else: - status["groups"][group_name] = { - "files_count": 0, - "merge_status": "completed" - } - - return status - - -def remove_dataset_directory(unique_id: str, filename_without_ext: str): - """Remove a specific dataset directory (deprecated - use new structure)""" - # This function is kept for compatibility but delegates to new structure - dataset_path = os.path.join("projects", "data", unique_id, "processed", filename_without_ext) - if os.path.exists(dataset_path): - import shutil - shutil.rmtree(dataset_path) - - -def remove_dataset_directory_by_key(unique_id: str, key: str): - """Remove dataset directory by key (group name)""" - # Remove files directory - files_group_path = os.path.join("projects", "data", unique_id, "files", key) - if os.path.exists(files_group_path): - import shutil - shutil.rmtree(files_group_path) - - # Remove processed directory - processed_group_path = os.path.join("projects", "data", unique_id, "processed", key) - if os.path.exists(processed_group_path): - import shutil - shutil.rmtree(processed_group_path) - - # Remove dataset directory - cleanup_dataset_group(unique_id, key) diff --git a/utils/project_manager.py b/utils/project_manager.py deleted file mode 100644 index 70a2944..0000000 --- a/utils/project_manager.py +++ /dev/null @@ -1,343 +0,0 @@ -#!/usr/bin/env python3 -""" -Project management functions for handling projects, README generation, and status tracking. -""" - -import os -import json -import logging -from typing import Dict, List, Optional -from pathlib import Path - -# Configure logger -logger = logging.getLogger('app') - -from utils.file_utils import get_document_preview, load_processed_files_log - - -def generate_directory_tree(project_dir: str, unique_id: str, max_depth: int = 3) -> str: - """Generate dataset directory tree structure for the project""" - def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]: - if depth > max_depth: - return [] - - lines = [] - try: - entries = sorted(os.listdir(path)) - # Separate directories and files - dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')] - files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')] - - entries = dirs + files - - for i, entry in enumerate(entries): - entry_path = os.path.join(path, entry) - is_dir = os.path.isdir(entry_path) - is_last_entry = i == len(entries) - 1 - - # Choose the appropriate tree symbols - if is_last_entry: - connector = "└── " - new_prefix = prefix + " " - else: - connector = "├── " - new_prefix = prefix + "│ " - - # Add entry line - line = prefix + connector + entry - if is_dir: - line += "/" - lines.append(line) - - # Recursively add subdirectories - if is_dir and depth < max_depth: - sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1) - lines.extend(sub_lines) - - except PermissionError: - lines.append(prefix + "└── [Permission Denied]") - except Exception as e: - lines.append(prefix + "└── [Error: " + str(e) + "]") - - return lines - - # Start building tree from dataset directory - dataset_dir = os.path.join(project_dir, "datasets") - tree_lines = [] - - if not os.path.exists(dataset_dir): - return "└── [No dataset directory found]" - - try: - entries = sorted(os.listdir(dataset_dir)) - dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')] - files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')] - - entries = dirs + files - - if not entries: - tree_lines.append("└── [Empty dataset directory]") - else: - for i, entry in enumerate(entries): - entry_path = os.path.join(dataset_dir, entry) - is_dir = os.path.isdir(entry_path) - is_last_entry = i == len(entries) - 1 - - if is_last_entry: - connector = "└── " - prefix = " " - else: - connector = "├── " - prefix = "│ " - - line = connector + entry - if is_dir: - line += "/" - tree_lines.append(line) - - # Recursively add subdirectories - if is_dir: - sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1) - tree_lines.extend(sub_lines) - - except Exception as e: - tree_lines.append(f"└── [Error generating tree: {str(e)}]") - - return "\n".join(tree_lines) - - -def generate_project_readme(unique_id: str) -> str: - """Generate README.md content for a project""" - project_dir = os.path.join("projects", "data", unique_id) - readme_content = f"""# Project: {unique_id} - -## Project Overview - -This project contains processed documents and their associated embeddings for semantic search. - -## Directory Structure - -""" - - # Generate directory tree - readme_content += "```\n" - readme_content += generate_directory_tree(project_dir, unique_id) - readme_content += "\n```\n\n" - - readme_content += """## Dataset Structure - -""" - - dataset_dir = os.path.join(project_dir, "datasets") - if not os.path.exists(dataset_dir): - readme_content += "No dataset files available.\n" - else: - # Get all document directories - doc_dirs = [] - try: - for item in sorted(os.listdir(dataset_dir)): - item_path = os.path.join(dataset_dir, item) - if os.path.isdir(item_path): - doc_dirs.append(item) - except Exception as e: - logger.error(f"Error listing dataset directories: {str(e)}") - - if not doc_dirs: - readme_content += "No document directories found.\n" - else: - for doc_dir in doc_dirs: - doc_path = os.path.join(dataset_dir, doc_dir) - document_file = os.path.join(doc_path, "document.txt") - pagination_file = os.path.join(doc_path, "pagination.txt") - embeddings_file = os.path.join(doc_path, "embedding.pkl") - - readme_content += f"### {doc_dir}\n\n" - readme_content += f"**Files:**\n" - readme_content += f"- `{doc_dir}/document.txt`" - if os.path.exists(document_file): - readme_content += " ✓" - readme_content += "\n" - - readme_content += f"- `{doc_dir}/pagination.txt`" - if os.path.exists(pagination_file): - readme_content += " ✓" - readme_content += "\n" - - readme_content += f"- `{doc_dir}/embedding.pkl`" - if os.path.exists(embeddings_file): - readme_content += " ✓" - readme_content += "\n\n" - - # Add document preview - if os.path.exists(document_file): - readme_content += f"**Content Preview (first 10 lines):**\n\n```\n" - preview = get_document_preview(document_file, 10) - readme_content += preview - readme_content += "\n```\n\n" - else: - readme_content += f"**Content Preview:** Not available\n\n" - - readme_content += f"""--- -*Generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* -""" - - return readme_content - - -def save_project_readme(unique_id: str): - """Save README.md for a project""" - readme_content = generate_project_readme(unique_id) - readme_path = os.path.join("projects", "data", unique_id, "README.md") - - try: - os.makedirs(os.path.dirname(readme_path), exist_ok=True) - with open(readme_path, 'w', encoding='utf-8') as f: - f.write(readme_content) - logger.info(f"Generated README.md for project {unique_id}") - return readme_path - except Exception as e: - logger.error(f"Error generating README for project {unique_id}: {str(e)}") - return None - - -def get_project_status(unique_id: str) -> Dict: - """Get comprehensive status of a project""" - project_dir = os.path.join("projects", "data", unique_id) - project_exists = os.path.exists(project_dir) - - if not project_exists: - return { - "unique_id": unique_id, - "project_exists": False, - "error": "Project not found" - } - - # Get processed log - processed_log = load_processed_files_log(unique_id) - - # Collect document.txt files - document_files = [] - dataset_dir = os.path.join(project_dir, "datasets") - if os.path.exists(dataset_dir): - for root, dirs, files in os.walk(dataset_dir): - for file in files: - if file == "document.txt": - document_files.append(os.path.join(root, file)) - - # Check system prompt and MCP settings - system_prompt_file = os.path.join(project_dir, "system_prompt.txt") - mcp_settings_file = os.path.join(project_dir, "mcp_settings.json") - - status = { - "unique_id": unique_id, - "project_exists": True, - "project_path": project_dir, - "processed_files_count": len(processed_log), - "processed_files": processed_log, - "document_files_count": len(document_files), - "document_files": document_files, - "has_system_prompt": os.path.exists(system_prompt_file), - "has_mcp_settings": os.path.exists(mcp_settings_file), - "readme_exists": os.path.exists(os.path.join(project_dir, "README.md")), - "log_file_exists": os.path.exists(os.path.join(project_dir, "processed_files.json")) - } - - # Add dataset structure - try: - from utils.dataset_manager import generate_dataset_structure - status["dataset_structure"] = generate_dataset_structure(unique_id) - except Exception as e: - status["dataset_structure"] = f"Error generating structure: {str(e)}" - - return status - - -def remove_project(unique_id: str) -> bool: - """Remove entire project directory""" - project_dir = os.path.join("projects", "data", unique_id) - try: - if os.path.exists(project_dir): - import shutil - shutil.rmtree(project_dir) - logger.info(f"Removed project directory: {project_dir}") - return True - else: - logger.warning(f"Project directory not found: {project_dir}") - return False - except Exception as e: - logger.error(f"Error removing project {unique_id}: {str(e)}") - return False - - -def list_projects() -> List[str]: - """List all existing project IDs""" - projects_dir = "projects" - if not os.path.exists(projects_dir): - return [] - - try: - return [item for item in os.listdir(projects_dir) - if os.path.isdir(os.path.join(projects_dir, item))] - except Exception as e: - logger.error(f"Error listing projects: {str(e)}") - return [] - - -def get_project_stats(unique_id: str) -> Dict: - """Get statistics for a specific project""" - status = get_project_status(unique_id) - - if not status["project_exists"]: - return status - - stats = { - "unique_id": unique_id, - "total_processed_files": status["processed_files_count"], - "total_document_files": status["document_files_count"], - "has_system_prompt": status["has_system_prompt"], - "has_mcp_settings": status["has_mcp_settings"], - "has_readme": status["readme_exists"] - } - - # Calculate file sizes - total_size = 0 - document_sizes = [] - - for doc_file in status["document_files"]: - try: - size = os.path.getsize(doc_file) - document_sizes.append({ - "file": doc_file, - "size": size, - "size_mb": round(size / (1024 * 1024), 2) - }) - total_size += size - except Exception: - pass - - stats["total_document_size"] = total_size - stats["total_document_size_mb"] = round(total_size / (1024 * 1024), 2) - stats["document_files_detail"] = document_sizes - - # Check embeddings files - embedding_files = [] - dataset_dir = os.path.join("projects", "data", unique_id, "datasets") - if os.path.exists(dataset_dir): - for root, dirs, files in os.walk(dataset_dir): - for file in files: - if file == "embedding.pkl": - file_path = os.path.join(root, file) - try: - size = os.path.getsize(file_path) - embedding_files.append({ - "file": file_path, - "size": size, - "size_mb": round(size / (1024 * 1024), 2) - }) - except Exception: - pass - - stats["embedding_files_count"] = len(embedding_files) - stats["embedding_files_detail"] = embedding_files - - return stats diff --git a/utils/settings.py b/utils/settings.py index 4d73705..b921062 100644 --- a/utils/settings.py +++ b/utils/settings.py @@ -77,6 +77,15 @@ CHECKPOINT_CLEANUP_INACTIVE_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_INACTIVE_DA CHECKPOINT_CLEANUP_INTERVAL_HOURS = int(os.getenv("CHECKPOINT_CLEANUP_INTERVAL_HOURS", "24")) +# ============================================================ +# Redis Configuration (Huey task queue backend) +# ============================================================ + +# Redis connection URL. +# Format: redis://[:password]@host:port/db +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/1") + + # ============================================================ # Mem0 long-term memory configuration # ============================================================ diff --git a/utils/single_file_processor.py b/utils/single_file_processor.py deleted file mode 100644 index 88d92b9..0000000 --- a/utils/single_file_processor.py +++ /dev/null @@ -1,301 +0,0 @@ -#!/usr/bin/env python3 -""" -Single file processing functions for handling individual files. -""" - -import os -import tempfile -import zipfile -import logging -from typing import Dict, List, Tuple, Optional -from pathlib import Path - -# Configure logger -logger = logging.getLogger('app') - -from utils.file_utils import download_file - -# Try to import excel/csv processor, but handle if dependencies are missing -try: - from utils.excel_csv_processor import ( - is_excel_file, is_csv_file, process_excel_file, process_csv_file - ) - EXCEL_CSV_SUPPORT = True -except ImportError as e: - logger.warning(f"Excel/CSV processing not available: {e}") - EXCEL_CSV_SUPPORT = False - - # Fallback functions - def is_excel_file(file_path): - return file_path.lower().endswith(('.xlsx', '.xls')) - - def is_csv_file(file_path): - return file_path.lower().endswith('.csv') - - def process_excel_file(file_path): - return "", [] - - def process_csv_file(file_path): - return "", [] - - -async def process_single_file( - unique_id: str, - group_name: str, - filename: str, - original_path: str, - local_path: str -) -> Dict: - """ - Process a single file and generate document.txt, pagination.txt, and embedding.pkl. - - Returns: - Dict with processing results and file paths - """ - # Create output directory for this file - filename_stem = Path(filename).stem - output_dir = os.path.join("projects", "data", unique_id, "processed", group_name, filename_stem) - os.makedirs(output_dir, exist_ok=True) - - result = { - "success": False, - "filename": filename, - "group": group_name, - "output_dir": output_dir, - "document_path": os.path.join(output_dir, "document.txt"), - "pagination_path": os.path.join(output_dir, "pagination.txt"), - "embedding_path": os.path.join(output_dir, "embedding.pkl"), - "error": None, - "content_size": 0, - "pagination_lines": 0, - "embedding_chunks": 0 - } - - try: - # Download file if it's remote and not yet downloaded - if original_path.startswith(('http://', 'https://')): - if not os.path.exists(local_path): - logger.info(f"Downloading {original_path} -> {local_path}") - success = await download_file(original_path, local_path) - if not success: - result["error"] = "Failed to download file" - return result - - # Extract content from file - content, pagination_lines = await extract_file_content(local_path, filename) - - if not content or not content.strip(): - result["error"] = "No content extracted from file" - return result - - # Write document.txt - with open(result["document_path"], 'w', encoding='utf-8') as f: - f.write(content) - result["content_size"] = len(content) - - # Write pagination.txt - if pagination_lines: - with open(result["pagination_path"], 'w', encoding='utf-8') as f: - for line in pagination_lines: - if line.strip(): - f.write(f"{line}\n") - result["pagination_lines"] = len(pagination_lines) - else: - # Generate pagination from text content - pagination_lines = generate_pagination_from_text(result["document_path"], - result["pagination_path"]) - result["pagination_lines"] = len(pagination_lines) - - # Generate embeddings - try: - embedding_chunks = await generate_embeddings_for_file( - result["document_path"], result["embedding_path"] - ) - result["embedding_chunks"] = len(embedding_chunks) if embedding_chunks else 0 - result["success"] = True - - except Exception as e: - result["error"] = f"Embedding generation failed: {str(e)}" - logger.error(f"Failed to generate embeddings for {filename}: {str(e)}") - - except Exception as e: - result["error"] = f"File processing failed: {str(e)}" - logger.error(f"Error processing file {filename}: {str(e)}") - - return result - - -async def extract_file_content(file_path: str, filename: str) -> Tuple[str, List[str]]: - """Extract content from various file formats.""" - - # Handle zip files - if filename.lower().endswith('.zip'): - return await extract_from_zip(file_path, filename) - - # Handle Excel files - elif is_excel_file(file_path): - return await extract_from_excel(file_path, filename) - - # Handle CSV files - elif is_csv_file(file_path): - return await extract_from_csv(file_path, filename) - - # Handle text files - else: - return await extract_from_text(file_path, filename) - - -async def extract_from_zip(zip_path: str, filename: str) -> Tuple[str, List[str]]: - """Extract content from zip file.""" - content_parts = [] - pagination_lines = [] - - try: - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - # Extract to temporary directory - temp_dir = tempfile.mkdtemp(prefix=f"extract_{Path(filename).stem}_") - zip_ref.extractall(temp_dir) - - # Process extracted files - for root, dirs, files in os.walk(temp_dir): - for file in files: - if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')): - file_path = os.path.join(root, file) - - try: - file_content, file_pagination = await extract_file_content(file_path, file) - - if file_content: - content_parts.append(f"# Page {file}") - content_parts.append(file_content) - pagination_lines.extend(file_pagination) - - except Exception as e: - logger.error(f"Error processing extracted file {file}: {str(e)}") - - # Clean up temporary directory - import shutil - shutil.rmtree(temp_dir) - - except Exception as e: - logger.error(f"Error extracting zip file {filename}: {str(e)}") - return "", [] - - return '\n\n'.join(content_parts), pagination_lines - - -async def extract_from_excel(file_path: str, filename: str) -> Tuple[str, List[str]]: - """Extract content from Excel file.""" - try: - document_content, pagination_lines = process_excel_file(file_path) - - if document_content: - content = f"# Page {filename}\n{document_content}" - return content, pagination_lines - else: - return "", [] - - except Exception as e: - logger.error(f"Error processing Excel file {filename}: {str(e)}") - return "", [] - - -async def extract_from_csv(file_path: str, filename: str) -> Tuple[str, List[str]]: - """Extract content from CSV file.""" - try: - document_content, pagination_lines = process_csv_file(file_path) - - if document_content: - content = f"# Page {filename}\n{document_content}" - return content, pagination_lines - else: - return "", [] - - except Exception as e: - logger.error(f"Error processing CSV file {filename}: {str(e)}") - return "", [] - - -async def extract_from_text(file_path: str, filename: str) -> Tuple[str, List[str]]: - """Extract content from text file.""" - try: - with open(file_path, 'r', encoding='utf-8') as f: - content = f.read().strip() - - if content: - return content, [] - else: - return "", [] - - except Exception as e: - logger.error(f"Error reading text file {filename}: {str(e)}") - return "", [] - - -def generate_pagination_from_text(document_path: str, pagination_path: str) -> List[str]: - """Generate pagination from text document.""" - try: - # Import embedding module for pagination - import sys - sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) - from embedding import split_document_by_pages - - pages = split_document_by_pages(str(document_path), str(pagination_path)) - - # Return pagination lines - pagination_lines = [] - with open(pagination_path, 'r', encoding='utf-8') as f: - for line in f: - if line.strip(): - pagination_lines.append(line.strip()) - - return pagination_lines - - except Exception as e: - logger.error(f"Error generating pagination from text: {str(e)}") - return [] - - -async def generate_embeddings_for_file(document_path: str, embedding_path: str) -> Optional[List]: - """Generate embeddings for a document.""" - try: - # Import embedding module - import sys - sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) - from embedding import embed_document - - # Generate embeddings using paragraph chunking - embedding_data = embed_document( - str(document_path), - str(embedding_path), - chunking_strategy='paragraph' - ) - - if embedding_data and 'chunks' in embedding_data: - return embedding_data['chunks'] - else: - return None - - except Exception as e: - logger.error(f"Error generating embeddings: {str(e)}") - return None - - -def check_file_already_processed(unique_id: str, group_name: str, filename: str) -> bool: - """Check if a file has already been processed.""" - filename_stem = Path(filename).stem - output_dir = os.path.join("projects", "data", unique_id, "processed", group_name, filename_stem) - - document_path = os.path.join(output_dir, "document.txt") - pagination_path = os.path.join(output_dir, "pagination.txt") - embedding_path = os.path.join(output_dir, "embedding.pkl") - - # Check if all files exist and are not empty - if (os.path.exists(document_path) and os.path.exists(pagination_path) and - os.path.exists(embedding_path)): - - if (os.path.getsize(document_path) > 0 and os.path.getsize(pagination_path) > 0 and - os.path.getsize(embedding_path) > 0): - return True - - return False \ No newline at end of file