Merge branch 'feature/enable_redis' into dev

# Conflicts:
#	poetry.lock
This commit is contained in:
朱潮 2026-06-08 19:43:27 +08:00
commit fabb14c66a
27 changed files with 165 additions and 4336 deletions

View File

@ -1,6 +1,7 @@
# Skill 功能
> 负责范围:技能包管理服务 - 核心实现
> 最后更新2026-05-26
> 最后更新2026-05-23
> 最后更新2026-04-20
@ -30,6 +31,10 @@ MCP UI 类 skill 已按 MCP Apps 模式改造:工具返回数据,静态 HTML
## 最近重要事项
- [2026-05-26](changelog/2026-Q2.md): skill 引入 `category` 字段——`routes/skill_manager.py` 在 `SkillItem` / `SkillValidationResult` 增加 `category`,从 `plugin.json``SKILL.md` frontmatter 解析official skill 默认 `"other"`、user skill 默认 `"custom"`;并通过 batch 给 common/developing/onprem/support 路径下大量 skill 元数据补 `category``data-dashboard` / `mcp-ui` 归类 `Interactive UI``203dcf4`, `3ada55a`, `9658588`
- [2026-05-26](changelog/2026-Q2.md): developing 分支大合并新增多个 skill`ai-ppt-generator`(百度 AI PPT、`nfc-medicine-lookup`NFC 药品检索)、`ppt-outline`PPT 大纲 / HTML 演示文稿)、`z-card-image`(配图 / 卡片图),同时 `skills/linggan/*` 系列 skill 经合并回归(`3ada55a`
- [2026-05-23](changelog/2026-Q2.md): 新增 MCP App 型 `skills/developing/ecommerce-storefront/`——含 `product-list` / `order-confirm` 两个 HTML App + 自带 `ecommerce_server.py` MCP server同时落地 `docs/mcp-app-training.md`(约 1063 行)作为 MCP App 培训材料(`9d001c8`
- [2026-05-21](changelog/2026-Q2.md): Daytona 沙箱模式下 `init_agent` 在沙箱内写入 `BASH_ENV` 文件,注入 `ASSISTANT_ID` / `USER_IDENTIFIER` / `TRACE_ID` / `ENABLE_SELF_KNOWLEDGE``config.shell_env` 的 shell 环境变量(`776acc2`
- [2026-05-12](changelog/2026-Q2.md): 跨 6→10 个 skill 变体批量精修 `retrieval-policy*.md`,统一 onprem/support/autoload 各路径下的 policy 口径(`be96f24`, `7b4f03d`
- [2026-05-11](changelog/2026-Q2.md): 新增子 agent (SubAgent) 支持——skill 包通过 `agents/*.md` 暴露子 agent`SubAgentMiddleware` 加载;附 `pmda-drug-info` skill 示例(`5b634bc`
- [2026-05-11](changelog/2026-Q2.md): `pmda-drug-info``pmda_server.py` 大改为 mock 实现(`a92096a`
@ -77,6 +82,9 @@ MCP UI 类 skill 已按 MCP Apps 模式改造:工具返回数据,静态 HTML
- ⚠️ **MCP `_meta.trace_id` 是全局 monkey-patch 注入**`agent/mcp_trace_meta.patch_mcp_client_session_trace_meta()` 在 `get_tools_from_mcp()` 入口调用一次后,会把 `mcp.ClientSession.call_tool` 永久包装;仅对工具名在 `{"rag_retrieve", "table_rag_retrieve"}` 集合内的调用注入 `_meta.trace_id`,扩展白名单要直接改 `_TRACE_META_TOOL_NAMES` 常量。
- ⚠️ **PrePrompt hook 内容位置由模板决定**:自 2026-04-23 起 hook 产出通过 `{hook_content}` 占位符注入 `prompt/system_prompt.md`,不再追加在 prompt 末尾;自定义模板必须包含 `{hook_content}` 占位符否则 hook 内容会丢失。
- ⚠️ **`init_agent` 返回值已变 3 元素**Daytona 改造后 `init_agent` 返回 `(agent, checkpointer, sandbox)`;调用方解构必须更新。
- ⚠️ **skill `category` 默认值**API 返回的 `SkillItem.category`——official skill fallback 为 `"other"`、user skill fallback 为 `"custom"`;前端做分类视图时需要同时识别这两个 sentinel不要假设官方/用户 skill 用同一套缺省值。
- ⚠️ **`category` 字段双入口**:同一 skill 可以同时在 `.claude-plugin/plugin.json``SKILL.md` frontmatter 写 `category``get_skill_metadata` 优先走 `parse_plugin_json`,若 skill 包没有 plugin.json 才回落到 `parse_skill_frontmatter`——两者写不一致时以 plugin.json 为准。
- ⚠️ **Daytona shell_env 是文件注入而非 process env**`init_agent` 通过 `cat > $REMOTE_BASH_ENV_PATH` 写入 `export VAR=...` 行,沙箱内必须由 shellbash`BASH_ENV` 加载才能生效;非 daytona 模式或不走 bash 启动的脚本拿不到这些变量。扩展注入项需直接改 `init_agent` 里的 `_shell_env` 字典。
## Skill 目录结构

View File

@ -4,6 +4,126 @@
---
## 2026-05-26: skill `category` 字段全面接入
**类型**:新功能
**背景**skill 数量越来越多common / developing / onprem / support / linggan / autoload 各路径下数十个),列表 API 需要前端能按类别分组展示,元数据层面缺少 `category` 字段。
**改动**
- `routes/skill_manager.py`
- `SkillItem` model 新增 `category: str = "other"`
- `SkillValidationResult` dataclass 新增可选 `category: Optional[str]`
- `parse_plugin_json` 解析 `plugin_config.get('category')``parse_skill_frontmatter` 解析 frontmatter 的 `metadata.get('category')`
- `get_official_skills` 中 fallback 为 `"other"``get_user_skills` 中 fallback 为 `"custom"`
- `get_skill_metadata_legacy``category` 非空时写入返回 dict保持向后兼容
- 批量给 common / developing / onprem / support 多个 skill 的 `.claude-plugin/plugin.json``SKILL.md` frontmatter 添加 `category` 字段。
- `data-dashboard``mcp-ui``category``"Data & Retrieval"` 修正为 `"Interactive UI"`(更贴切 MCP App 的渲染语义)。
**根因**N/A新功能
**影响**
- `GET /api/v1/skill/list` 返回项现在包含 `category` 字段;前端可按 category 维度做分组/筛选。
- skill 元数据约定扩展——新 skill 应在 plugin.json 或 SKILL.md frontmatter 中写明 `category`,否则会落到 `"other"` / `"custom"` 兜底。
- `plugin.json.category``SKILL.md.category` 同时存在时以前者为准(`get_skill_metadata` 优先 plugin.json
**相关文件**
- `routes/skill_manager.py`
- `skills/common/data-dashboard/.claude-plugin/plugin.json`
- `skills/common/mcp-ui/.claude-plugin/plugin.json`
- 以及一批 `skills/{common,developing,onprem,support}/*/SKILL.md``.claude-plugin/plugin.json`
**Commit/PR**`203dcf4`, `3ada55a`, `9658588`
---
## 2026-05-26: developing 分支批量新增多类 skill
**类型**:新功能
**背景**[待补充]——经 developing→staging 合并集中落地一批新 skill 与 linggan 系列 skill 回归。
**改动**
- 新增 `skills/developing/ai-ppt-generator/`:调用百度 AI 生成 PPT按 topic 自动选模板(商务/科技/教育/创意/中国风等);`category: Document Processing`。
- 新增 `skills/developing/nfc-medicine-lookup/`:通过 NFC 芯片 ID 或药品名称查询药品信息,面向老年用户的语音助手交互口径;`category: Developer Tools`。
- 新增 `skills/developing/ppt-outline/`PPT 大纲与独立 HTML 演示文稿生成dark/light/tech/minimal 四种风格);`category: Document Processing`。
- 新增 `skills/developing/z-card-image/`:生成配图、封面图、卡片图、社媒帖子分享图等;依赖 `python3` + `google-chrome`
- `skills/developing/static-hosting/SKILL.md` 由 1 行说明扩展为完整 80 行 skill同时一批已有 SKILL.md / plugin.json 补 `category`
- `skills/linggan/*` 系列 skillbaidu-search / bot-self-modifier / caiyun-weather / competitor-news-intel / contract-document-generator / financial-report-generator / market-academic-insight / ragflow-loader / sales-decision-report / seedream / static-hosting / static-site-deploy / voice-notification / weather-china经合并回归 staging。
**根因**N/A
**影响**
- developing skill 池扩张约 5 个新业务 skilllinggan 系列重新出现在 staging。
- 新 skill 多为 SKILL.md 型业务 skill符合"workflow + 模板"的纯 markdown 模式;其中 `ai-ppt-generator`、`z-card-image` 依赖外部 `BAIDU_API_KEY``google-chrome` 二进制。
**相关文件**
- `skills/developing/ai-ppt-generator/SKILL.md`
- `skills/developing/nfc-medicine-lookup/SKILL.md`
- `skills/developing/ppt-outline/SKILL.md`
- `skills/developing/z-card-image/SKILL.md`
- `skills/developing/static-hosting/SKILL.md`
- `skills/linggan/**`(回归)
**Commit/PR**`3ada55a`
---
## 2026-05-23: 新增 ecommerce-storefront skillMCP App 型)+ MCP App 培训文档
**类型**:新功能
**背景**MCP App 模式host 加载静态 HTML + postMessage 传数据)已经在 `mcp-ui`、`data-dashboard` 上跑通,需要一个面向电商场景的样例 skill演示产品浏览 / 选购 / 下单确认这类多步交互的 App 渲染;同时沉淀一份 MCP App 开发指南。
**改动**
- 新增 `skills/developing/ecommerce-storefront/`
- `apps/product-list.html`288 行)与 `apps/order-confirm.html`233 行)两个静态 App。
- `ecommerce_server.py`213 行)作为自带 MCP server`ecommerce_tools.json` 定义工具 schema。
- `hooks/ecommerce_guide.md` + `hooks/pre_prompt.py` 注入 skill 使用指引到 system prompt。
- `mcp_common.py`252 行)复用 MCP 通用工具基类。
- `.claude-plugin/plugin.json` 配置 PrePrompt hook 与 stdio MCP server`category: Developer Tools`。
- 新增 `docs/mcp-app-training.md`(约 1063 行MCP App 模式的开发培训材料。
**根因**N/A
**影响**
- developing skill 池新增一个 MCP App 型 skill体例对齐 `mcp-ui` / `data-dashboard`
- MCP App 开发者有完整培训材料可参考。
**相关文件**
- `skills/developing/ecommerce-storefront/**`
- `docs/mcp-app-training.md`
**Commit/PR**`9d001c8`
---
## 2026-05-21: Daytona 沙箱注入 shell_env 到 BASH_ENV
**类型**:新功能
**背景**Daytona 沙箱内的 skill 脚本需要能读取 `ASSISTANT_ID` / `USER_IDENTIFIER` / `TRACE_ID` 等运行时上下文,但宿主 process env 无法直接透传到沙箱里。
**改动**
- `agent/deep_assistant.py` `init_agent`:当 `sandbox is not None and sandbox_type == "daytona"` 时,组装 `_shell_env` 字典(`ASSISTANT_ID` / `USER_IDENTIFIER` / `TRACE_ID` / `ENABLE_SELF_KNOWLEDGE` 加上 `config.shell_env`),构造 `cd {REMOTE_WORKSPACE_ROOT}\n` + `export VAR="..."` 行,通过 `sandbox.execute("cat > $REMOTE_BASH_ENV_PATH << 'ENVEOF' ... ENVEOF")` 写入沙箱内。
- `utils/daytona_sync.py` 提供常量 `REMOTE_BASH_ENV_PATH` / `REMOTE_WORKSPACE_ROOT`
- `AgentConfig` 增加 `shell_env: Optional[Dict[str, str]]`(调用方可追加自定义 env
**根因**N/A
**影响**
- 沙箱内通过 bash 启动的 skill 脚本可以 `os.environ.get("ASSISTANT_ID")` 等读到运行时上下文。
- 仅 daytona 沙箱模式生效;本地或非 bash 启动的进程不会收到 `BASH_ENV` 注入的变量。
- 扩展注入项(新增固定环境变量)需要直接改 `init_agent` 里的 `_shell_env` 字典。
**相关文件**
- `agent/deep_assistant.py`
- `utils/daytona_sync.py`
**Commit/PR**`776acc2`
---
## 2026-05-12: 批量精修 retrieval policy 文案
**类型**:内容调整

View File

@ -1,168 +0,0 @@
#!/usr/bin/env python3
"""
SQLite task status database management tool
"""
import sqlite3
import json
import time
from task_queue.task_status import task_status_store
def view_database():
"""View database contents"""
print("SQLite task status database contents")
print("=" * 40)
print(f"Database path: {task_status_store.db_path}")
# Connect to the database
conn = sqlite3.connect(task_status_store.db_path)
cursor = conn.cursor()
# View table schema
print(f"\nTable schema:")
cursor.execute("PRAGMA table_info(task_status)")
columns = cursor.fetchall()
for col in columns:
print(f" {col[1]} ({col[2]})")
# View all records
print(f"\nAll records:")
cursor.execute("SELECT * FROM task_status ORDER BY updated_at DESC")
rows = cursor.fetchall()
if not rows:
print(" (empty database)")
else:
print(f" Total {len(rows)} records:")
for i, row in enumerate(rows):
task_id, unique_id, status, created_at, updated_at, result, error = row
created_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at))
updated_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(updated_at))
print(f" {i+1}. {task_id}")
print(f" Project ID: {unique_id}")
print(f" Status: {status}")
print(f" Created: {created_str}")
print(f" Updated: {updated_str}")
if result:
try:
result_data = json.loads(result)
print(f" Result: {result_data.get('message', 'N/A')}")
except:
print(f" Result: {result[:50]}...")
if error:
print(f" Error: {error}")
print()
conn.close()
def run_query(sql_query: str):
"""Run a custom query"""
print(f"Running query: {sql_query}")
try:
conn = sqlite3.connect(task_status_store.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(sql_query)
rows = cursor.fetchall()
if not rows:
print(" (no results)")
else:
print(f" {len(rows)} results:")
for row in rows:
print(f" {dict(row)}")
conn.close()
except Exception as e:
print(f"Query failed: {e}")
def interactive_shell():
"""Interactive database management"""
print("\n🖥️ Interactive database management")
print("Type 'help' to view available commands, or 'quit' to exit")
while True:
try:
command = input("\n> ").strip()
if command.lower() in ['quit', 'exit', 'q']:
break
elif command.lower() == 'help':
print("""
Available commands:
view - View all records
stats - View statistics
pending - View pending tasks
completed - View completed tasks
failed - View failed tasks
sql <query> - Run an SQL query
cleanup <days> - Clean up records older than N days
count - Count total tasks
help - Show help
quit/exit/q - Exit
""")
elif command.lower() == 'view':
view_database()
elif command.lower() == 'stats':
stats = task_status_store.get_statistics()
print(f"Statistics:")
print(f" Total tasks: {stats['total_tasks']}")
print(f" Status breakdown: {stats['status_breakdown']}")
print(f" Last 24 hours: {stats['recent_24h']}")
elif command.lower() == 'pending':
tasks = task_status_store.search_tasks(status="pending")
print(f"Pending tasks ({len(tasks)}):")
for task in tasks:
print(f" - {task['task_id']}: {task['unique_id']}")
elif command.lower() == 'completed':
tasks = task_status_store.search_tasks(status="completed")
print(f"Completed tasks ({len(tasks)}):")
for task in tasks:
print(f" - {task['task_id']}: {task['unique_id']}")
elif command.lower() == 'failed':
tasks = task_status_store.search_tasks(status="failed")
print(f"Failed tasks ({len(tasks)}):")
for task in tasks:
print(f" - {task['task_id']}: {task['unique_id']}")
elif command.lower().startswith('sql '):
sql_query = command[4:]
run_query(sql_query)
elif command.lower().startswith('cleanup '):
try:
days = int(command[8:])
count = task_status_store.cleanup_old_tasks(days)
print(f"Cleaned up {count} records older than {days} days")
except ValueError:
print("Please enter a valid number of days")
elif command.lower() == 'count':
all_tasks = task_status_store.list_all()
print(f"Total tasks: {len(all_tasks)}")
else:
print("Unknown command. Type 'help' for help")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Execution error: {e}")
def main():
"""Main function"""
import sys
if len(sys.argv) > 1:
if sys.argv[1] == 'view':
view_database()
elif sys.argv[1] == 'interactive':
interactive_shell()
else:
print("Usage: python db_manager.py [view|interactive]")
else:
view_database()
interactive_shell()
if __name__ == "__main__":
main()

34
poetry.lock generated
View File

@ -1790,21 +1790,6 @@ files = [
{file = "httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d"},
]
[[package]]
name = "huey"
version = "2.5.3"
description = "huey, a little task queue"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "huey-2.5.3.tar.gz", hash = "sha256:089fc72b97fd26a513f15b09925c56fad6abe4a699a1f0e902170b37e85163c7"},
]
[package.extras]
backends = ["redis (>=3.0.0)"]
redis = ["redis (>=3.0.0)"]
[[package]]
name = "huggingface-hub"
version = "0.35.3"
@ -4825,6 +4810,23 @@ urllib3 = ">=1.26.14,<3"
fastembed = ["fastembed (>=0.7,<0.8)"]
fastembed-gpu = ["fastembed-gpu (>=0.7,<0.8)"]
[[package]]
name = "redis"
version = "6.4.0"
description = "Python client for Redis database and key-value store"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f"},
{file = "redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010"},
]
[package.extras]
hiredis = ["hiredis (>=3.2.0)"]
jwt = ["pyjwt (>=2.9.0)"]
ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"]
[[package]]
name = "referencing"
version = "0.37.0"
@ -7132,4 +7134,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<3.15"
content-hash = "dc130664802ad1344adc341931036a343f9892934a41bbc15c48663d0146696b"
content-hash = "f5b01d5a1e60672741f2c5e8cc6e2ec55534963a9a3791fd1cdf67d3c2fbd70b"

View File

@ -19,7 +19,7 @@ dependencies = [
"numpy<2",
"aiohttp",
"aiofiles",
"huey (>=2.5.3,<3.0.0)",
"redis (>=4.0,<7.0)",
"pandas>=1.5.0",
"openpyxl>=3.0.0",
"xlrd>=2.0.0",

View File

@ -58,7 +58,6 @@ httpcore==1.0.9 ; python_version >= "3.12" and python_version < "3.15"
httptools==0.7.1 ; python_version >= "3.12" and python_version < "3.15" and platform_system != "Windows"
httpx-sse==0.4.3 ; python_version >= "3.12" and python_version < "3.15"
httpx==0.28.1 ; python_version >= "3.12" and python_version < "3.15"
huey==2.5.3 ; python_version >= "3.12" and python_version < "3.15"
huggingface-hub==0.35.3 ; python_version >= "3.12" and python_version < "3.15"
hyperframe==6.1.0 ; python_version >= "3.12" and python_version < "3.15"
idna==3.11 ; python_version >= "3.12" and python_version < "3.15"
@ -161,6 +160,7 @@ pywin32==311 ; python_version >= "3.12" and python_version < "3.15" and (sys_pla
pyyaml==6.0.3 ; python_version >= "3.12" and python_version < "3.15"
qdrant-client==1.12.1 ; python_version >= "3.13" and python_version < "3.15"
qdrant-client==1.16.2 ; python_version == "3.12"
redis==6.4.0 ; python_version >= "3.12" and python_version < "3.15"
referencing==0.37.0 ; python_version >= "3.12" and python_version < "3.15"
regex==2025.9.18 ; python_version >= "3.12" and python_version < "3.15"
requests-toolbelt==1.0.0 ; python_version >= "3.12" and python_version < "3.15"

View File

@ -1,273 +1,18 @@
import os
import uuid
import shutil
import zipfile
from datetime import datetime
from typing import Optional, List
from fastapi import APIRouter, HTTPException, Header, UploadFile, File, Form
from pydantic import BaseModel
from typing import Optional
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
import logging
logger = logging.getLogger('app')
from utils import (
DatasetRequest, QueueTaskRequest, IncrementalTaskRequest, QueueTaskResponse,
load_processed_files_log, remove_file_or_directory, remove_dataset_directory_by_key
)
from utils.fastapi_utils import get_versioned_filename
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, process_files_incremental_async, cleanup_project_async
from task_queue.task_status import task_status_store
router = APIRouter()
@router.post("/api/v1/files/process/async")
async def process_files_async_endpoint(request: QueueTaskRequest, authorization: Optional[str] = Header(None)):
"""
Queue-based API for asynchronous file processing.
Same functionality as /api/v1/files/process, but processed asynchronously through the queue.
Args:
request: QueueTaskRequest containing dataset_id, files, system_prompt, mcp_settings, and queue options
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
QueueTaskResponse: Processing result with task ID for tracking
"""
try:
dataset_id = request.dataset_id
if not dataset_id:
raise HTTPException(status_code=400, detail="dataset_id is required")
# Estimate processing time (based on file count)
estimated_time = 0
if request.upload_folder:
# For upload_folder, file count cannot be estimated in advance, so use the default time
estimated_time = 120 # Default: 2 minutes
elif request.files:
total_files = sum(len(file_list) for file_list in request.files.values())
estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds
# Create task status record
import uuid
task_id = str(uuid.uuid4())
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="pending"
)
# Submit async task
task = process_files_async(
dataset_id=dataset_id,
files=request.files,
upload_folder=request.upload_folder,
task_id=task_id
)
# Build a more detailed message
message = f"File processing task has been submitted to the queue, project ID: {dataset_id}"
if request.upload_folder:
group_count = len(request.upload_folder)
message += f", files will be scanned automatically from {group_count} uploaded folders"
elif request.files:
total_files = sum(len(file_list) for file_list in request.files.values())
message += f", including {total_files} files"
return QueueTaskResponse(
success=True,
message=message,
dataset_id=dataset_id,
task_id=task_id, # Use our own task_id
task_status="pending",
estimated_processing_time=estimated_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error submitting async file processing task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/files/process/incremental")
async def process_files_incremental_endpoint(request: IncrementalTaskRequest, authorization: Optional[str] = Header(None)):
"""
Queue-based API for incremental file processing, supporting file additions and deletions.
Args:
request: IncrementalTaskRequest containing dataset_id, files_to_add, files_to_remove, system_prompt, mcp_settings, and queue options
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
QueueTaskResponse: Processing result with task ID for tracking
"""
try:
dataset_id = request.dataset_id
if not dataset_id:
raise HTTPException(status_code=400, detail="dataset_id is required")
# Validate that there is at least one add or delete operation
if not request.files_to_add and not request.files_to_remove:
raise HTTPException(status_code=400, detail="At least one of files_to_add or files_to_remove must be provided")
# Estimate processing time (based on file count)
estimated_time = 0
total_add_files = sum(len(file_list) for file_list in (request.files_to_add or {}).values())
total_remove_files = sum(len(file_list) for file_list in (request.files_to_remove or {}).values())
total_files = total_add_files + total_remove_files
estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds
# Create task status record
import uuid
task_id = str(uuid.uuid4())
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="pending"
)
# Submit incremental async task
task = process_files_incremental_async(
dataset_id=dataset_id,
files_to_add=request.files_to_add,
files_to_remove=request.files_to_remove,
system_prompt=request.system_prompt,
mcp_settings=request.mcp_settings,
task_id=task_id
)
return QueueTaskResponse(
success=True,
message=f"Incremental file processing task has been submitted to the queue - added {total_add_files} files, removed {total_remove_files} files, project ID: {dataset_id}",
dataset_id=dataset_id,
task_id=task_id,
task_status="pending",
estimated_processing_time=estimated_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error submitting incremental file processing task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/api/v1/files/{dataset_id}/status")
async def get_files_processing_status(dataset_id: str):
"""Get the file processing status for the project."""
try:
# Load processed files log
processed_log = load_processed_files_log(dataset_id)
# Get project directory info
project_dir = os.path.join("projects", "data", dataset_id)
project_exists = os.path.exists(project_dir)
# Collect document.txt files
document_files = []
if project_exists:
for root, dirs, files in os.walk(project_dir):
for file in files:
if file == "document.txt":
document_files.append(os.path.join(root, file))
return {
"dataset_id": dataset_id,
"project_exists": project_exists,
"processed_files_count": len(processed_log),
"processed_files": processed_log,
"document_files_count": len(document_files),
"document_files": document_files,
"log_file_exists": os.path.exists(os.path.join("projects", "data", dataset_id, "processed_files.json"))
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve file processing status: {str(e)}")
@router.post("/api/v1/files/{dataset_id}/reset")
async def reset_files_processing(dataset_id: str):
"""Reset the project's file processing status by deleting the processing log and all files."""
try:
project_dir = os.path.join("projects", "data", dataset_id)
log_file = os.path.join("projects", "data", dataset_id, "processed_files.json")
# Load processed log to know what files to remove
processed_log = load_processed_files_log(dataset_id)
removed_files = []
# Remove all processed files and their dataset directories
for file_hash, file_info in processed_log.items():
# Remove local file in files directory
if 'local_path' in file_info:
if remove_file_or_directory(file_info['local_path']):
removed_files.append(file_info['local_path'])
# Handle new key-based structure first
if 'key' in file_info:
# Remove dataset directory by key
key = file_info['key']
if remove_dataset_directory_by_key(dataset_id, key):
removed_files.append(f"dataset/{key}")
elif 'filename' in file_info:
# Fallback to old filename-based structure
filename_without_ext = os.path.splitext(file_info['filename'])[0]
dataset_dir = os.path.join("projects", "data", dataset_id, "datasets", filename_without_ext)
if remove_file_or_directory(dataset_dir):
removed_files.append(dataset_dir)
# Also remove any specific dataset path if exists (fallback)
if 'dataset_path' in file_info:
if remove_file_or_directory(file_info['dataset_path']):
removed_files.append(file_info['dataset_path'])
# Remove the log file
if remove_file_or_directory(log_file):
removed_files.append(log_file)
# Remove the entire files directory
files_dir = os.path.join(project_dir, "files")
if remove_file_or_directory(files_dir):
removed_files.append(files_dir)
# Also remove the entire dataset directory (clean up any remaining files)
dataset_dir = os.path.join(project_dir, "datasets")
if remove_file_or_directory(dataset_dir):
removed_files.append(dataset_dir)
# Remove README.md if exists
readme_file = os.path.join(project_dir, "README.md")
if remove_file_or_directory(readme_file):
removed_files.append(readme_file)
return {
"message": f"File processing status reset successfully: {dataset_id}",
"removed_files_count": len(removed_files),
"removed_files": removed_files
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to reset file processing status: {str(e)}")
@router.post("/api/v1/files/{dataset_id}/cleanup/async")
async def cleanup_project_async_endpoint(dataset_id: str, remove_all: bool = False):
"""Asynchronously clean up project files."""
try:
task = cleanup_project_async(dataset_id=dataset_id, remove_all=remove_all)
return {
"success": True,
"message": f"Project cleanup task has been submitted to the queue, project ID: {dataset_id}",
"dataset_id": dataset_id,
"task_id": task.id,
"action": "remove_all" if remove_all else "cleanup_logs"
}
except Exception as e:
logger.error(f"Error submitting cleanup task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to submit cleanup task: {str(e)}")
@router.post("/api/v1/upload")
async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form(None)):
"""
@ -348,121 +93,3 @@ async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form
except Exception as e:
logger.error(f"Error uploading file: {str(e)}")
raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}")
# Task management routes that are related to file processing
@router.get("/api/v1/task/{task_id}/status")
async def get_task_status(task_id: str):
"""Get task status - simple and reliable."""
try:
status_data = task_status_store.get_status(task_id)
if not status_data:
return {
"success": False,
"message": "Task does not exist or has expired",
"task_id": task_id,
"status": "not_found"
}
return {
"success": True,
"message": "Task status retrieved successfully",
"task_id": task_id,
"status": status_data["status"],
"unique_id": status_data["unique_id"],
"created_at": status_data["created_at"],
"updated_at": status_data["updated_at"],
"result": status_data.get("result"),
"error": status_data.get("error")
}
except Exception as e:
logger.error(f"Error getting task status: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve task status: {str(e)}")
@router.delete("/api/v1/task/{task_id}")
async def delete_task(task_id: str):
"""Delete task record."""
try:
success = task_status_store.delete_status(task_id)
if success:
return {
"success": True,
"message": f"Task record deleted: {task_id}",
"task_id": task_id
}
else:
return {
"success": False,
"message": f"Task record does not exist: {task_id}",
"task_id": task_id
}
except Exception as e:
logger.error(f"Error deleting task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete task record: {str(e)}")
@router.get("/api/v1/tasks")
async def list_tasks(status: Optional[str] = None, dataset_id: Optional[str] = None, limit: int = 100):
"""List tasks with optional filters."""
try:
if status or dataset_id:
# Use search function
tasks = task_status_store.search_tasks(status=status, unique_id=dataset_id, limit=limit)
else:
# Get all tasks
all_tasks = task_status_store.list_all()
tasks = list(all_tasks.values())[:limit]
return {
"success": True,
"message": "Task list retrieved successfully",
"total_tasks": len(tasks),
"tasks": tasks,
"filters": {
"status": status,
"dataset_id": dataset_id,
"limit": limit
}
}
except Exception as e:
logger.error(f"Error listing tasks: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve task list: {str(e)}")
@router.get("/api/v1/tasks/statistics")
async def get_task_statistics():
"""Get task statistics."""
try:
stats = task_status_store.get_statistics()
return {
"success": True,
"message": "Statistics retrieved successfully",
"statistics": stats
}
except Exception as e:
logger.error(f"Error getting statistics: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve statistics: {str(e)}")
@router.post("/api/v1/tasks/cleanup")
async def cleanup_tasks(older_than_days: int = 7):
"""Clean up old task records."""
try:
deleted_count = task_status_store.cleanup_old_tasks(older_than_days=older_than_days)
return {
"success": True,
"message": f"Cleaned up {deleted_count} old task records",
"deleted_count": deleted_count,
"older_than_days": older_than_days
}
except Exception as e:
logger.error(f"Error cleaning up tasks: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to clean up task records: {str(e)}")

View File

@ -6,8 +6,6 @@ import logging
logger = logging.getLogger('app')
from task_queue.task_status import task_status_store
router = APIRouter()
@ -155,22 +153,3 @@ async def list_datasets():
except Exception as e:
logger.error(f"Error listing datasets: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve dataset list: {str(e)}")
@router.get("/api/v1/projects/{dataset_id}/tasks")
async def get_project_tasks(dataset_id: str):
"""Get all tasks for the specified project."""
try:
tasks = task_status_store.get_by_unique_id(dataset_id)
return {
"success": True,
"message": "Project tasks retrieved successfully",
"dataset_id": dataset_id,
"total_tasks": len(tasks),
"tasks": tasks
}
except Exception as e:
logger.error(f"Error getting project tasks: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve project tasks: {str(e)}")

View File

@ -1,5 +1,5 @@
#!/bin/bash
# Optimized startup script - integrates the FastAPI application and queue consumer
# Optimized startup script for the FastAPI application
set -e
@ -7,7 +7,6 @@ set -e
DEFAULT_HOST="0.0.0.0"
DEFAULT_PORT="8001"
DEFAULT_API_WORKERS="4"
DEFAULT_QUEUE_WORKERS="2"
DEFAULT_PROFILE="balanced"
DEFAULT_LOG_LEVEL="info"
DEFAULT_MAX_RESTARTS="3"
@ -17,7 +16,6 @@ DEFAULT_CHECK_INTERVAL="5"
HOST=${HOST:-$DEFAULT_HOST}
PORT=${PORT:-$DEFAULT_PORT}
API_WORKERS=${API_WORKERS:-$DEFAULT_API_WORKERS}
QUEUE_WORKERS=${QUEUE_WORKERS:-$DEFAULT_QUEUE_WORKERS}
PROFILE=${PROFILE:-$DEFAULT_PROFILE}
LOG_LEVEL=${LOG_LEVEL:-$DEFAULT_LOG_LEVEL}
MAX_RESTARTS=${MAX_RESTARTS:-$DEFAULT_MAX_RESTARTS}
@ -47,7 +45,6 @@ print_config() {
print_color $GREEN "Startup configuration:"
echo "- API server: http://$HOST:$PORT"
echo "- API worker processes: $API_WORKERS"
echo "- Queue worker threads: $QUEUE_WORKERS"
echo "- Performance profile: $PROFILE"
echo "- Log level: $LOG_LEVEL"
echo "- Maximum restarts: $MAX_RESTARTS"
@ -87,7 +84,6 @@ create_directories() {
print_color $YELLOW "Creating project directories..."
directories=(
"projects/queue_data"
"projects/data"
"projects/uploads"
"projects/robot"
@ -161,16 +157,6 @@ start_services() {
API_PID=$!
echo "API server PID: $API_PID"
# Start the queue consumer
print_color $BLUE "Starting queue consumer..."
python3 task_queue/consumer.py \
--workers=$QUEUE_WORKERS \
--worker-type=threads \
> queue_consumer.log 2>&1 &
CONSUMER_PID=$!
echo "Queue consumer PID: $CONSUMER_PID"
echo
print_color $GREEN "All services started successfully!"
print_color $GREEN "API server: http://$HOST:$PORT"
@ -179,7 +165,7 @@ start_services() {
}
monitor_services() {
local restart_counts=(0 0) # API, Consumer
local restart_counts=(0) # API
while true; do
# Check the API server
@ -205,26 +191,6 @@ monitor_services() {
fi
fi
# Check the queue consumer
if ! kill -0 $CONSUMER_PID 2>/dev/null; then
print_color $RED "Queue consumer stopped unexpectedly"
if [ ${restart_counts[1]} -lt $MAX_RESTARTS ]; then
print_color $YELLOW "Restarting queue consumer (${restart_counts[1]} + 1/$MAX_RESTARTS)..."
python3 task_queue/consumer.py \
--workers=$QUEUE_WORKERS \
--worker-type=threads \
>> queue_consumer.log 2>&1 &
CONSUMER_PID=$!
restart_counts[1]=$((restart_counts[1] + 1))
print_color $GREEN "Queue consumer restarted successfully, PID: $CONSUMER_PID"
else
print_color $RED "Queue consumer restart limit reached, stopping all services"
break
fi
fi
# Wait for the next check interval
sleep $CHECK_INTERVAL
done
@ -253,25 +219,6 @@ cleanup() {
fi
fi
# Stop the queue consumer
if [ ! -z "$CONSUMER_PID" ] && kill -0 $CONSUMER_PID 2>/dev/null; then
print_color $BLUE "Stopping queue consumer (PID: $CONSUMER_PID)..."
kill $CONSUMER_PID 2>/dev/null || true
# Wait for graceful shutdown
local count=0
while kill -0 $CONSUMER_PID 2>/dev/null && [ $count -lt 10 ]; do
sleep 1
count=$((count + 1))
done
# Force terminate if it is still running
if kill -0 $CONSUMER_PID 2>/dev/null; then
print_color $RED "Force stopping queue consumer..."
kill -9 $CONSUMER_PID 2>/dev/null || true
fi
fi
print_color $GREEN "All services have been stopped"
exit 0
}
@ -288,7 +235,6 @@ main() {
echo " HOST API bind host address (default: $DEFAULT_HOST)"
echo " PORT API bind port (default: $DEFAULT_PORT)"
echo " API_WORKERS Number of API worker processes (default: $DEFAULT_API_WORKERS)"
echo " QUEUE_WORKERS Number of queue worker threads (default: $DEFAULT_QUEUE_WORKERS)"
echo " PROFILE Performance profile: low_memory, balanced, high_performance (default: $DEFAULT_PROFILE)"
echo " LOG_LEVEL Log level: debug, info, warning, error (default: $DEFAULT_LOG_LEVEL)"
echo " MAX_RESTARTS Maximum restart count (default: $DEFAULT_MAX_RESTARTS)"
@ -296,7 +242,7 @@ main() {
echo
echo "Examples:"
echo " PROFILE=high_performance API_WORKERS=8 $0"
echo " PORT=8080 QUEUE_WORKERS=4 $0"
echo " PORT=8080 API_WORKERS=4 $0"
exit 0
fi

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Optimized unified startup script combining the FastAPI application and queue consumer.
Optimized unified startup script for the FastAPI application.
Supports performance monitoring, automatic restart, graceful shutdown, and related features.
"""
@ -17,7 +17,7 @@ from typing import List, Optional, Dict, Any
class ProcessManager:
"""Process manager that controls the API service and queue consumer."""
"""Process manager that controls the API service."""
def __init__(self):
self.processes: Dict[str, subprocess.Popen] = {}
@ -78,44 +78,6 @@ class ProcessManager:
print(f"Failed to start API server: {e}")
return None
def start_queue_consumer(self, args) -> Optional[subprocess.Popen]:
"""Start the queue consumer."""
print("Starting queue consumer...")
consumer_script = Path("task_queue/consumer.py")
if not consumer_script.exists():
consumer_script = consumer_script.with_suffix(".pyc")
# Build the queue consumer command
cmd = [
sys.executable,
str(consumer_script),
"--workers", str(args.queue_workers),
"--worker-type", args.worker_type
]
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# Start the output monitoring thread
threading.Thread(
target=self._monitor_output,
args=(process, "Queue consumer"),
daemon=True
).start()
return process
except Exception as e:
print(f"Failed to start queue consumer: {e}")
return None
def _monitor_output(self, process: subprocess.Popen, name: str):
"""Monitor process output."""
try:
@ -138,8 +100,6 @@ class ProcessManager:
if name == "API server":
new_process = self.start_api_server(args)
elif name == "Queue consumer":
new_process = self.start_queue_consumer(args)
else:
return False
@ -169,27 +129,19 @@ class ProcessManager:
print("Failed to start API server; exiting")
return False
queue_process = self.start_queue_consumer(args)
if not queue_process:
print("Failed to start queue consumer; exiting")
api_process.terminate()
return False
self.processes["API server"] = api_process
self.processes["Queue consumer"] = queue_process
print("\n" + "=" * 70)
print("All services started successfully!")
print(f"API server: http://{args.host}:{args.port}")
print(f"API PID: {api_process.pid}")
print(f"Queue consumer PID: {queue_process.pid}")
print("Press Ctrl+C to stop all services")
print("=" * 70 + "\n")
self.running = True
# Main monitoring loop
restart_counts = {"API server": 0, "Queue consumer": 0}
restart_counts = {"API server": 0}
max_restarts = args.max_restarts
while self.running and not self.shutdown_event.is_set():
@ -262,7 +214,6 @@ class ProcessManager:
def create_directories(self):
"""Create the required directories."""
directories = [
"projects/queue_data",
"projects/data",
"projects/uploads",
"projects/robot",
@ -313,11 +264,6 @@ def parse_args():
parser.add_argument("--log-level", type=str, default="info",
choices=["debug", "info", "warning", "error"], help="Log level")
# Queue consumer configuration
parser.add_argument("--queue-workers", type=int, default=2, help="Number of queue consumer worker threads")
parser.add_argument("--worker-type", type=str, default="threads",
choices=["threads", "greenlets", "gevent"], help="Queue worker type")
# Performance profile
parser.add_argument("--profile", type=str, default="low_memory",
choices=["low_memory", "balanced", "high_performance"], help="Performance profile")

View File

@ -1,154 +0,0 @@
# 队列系统使用说明
## 概述
本项目集成了基于 huey 和 SqliteHuey 的异步队列系统,用于处理文件的异步处理任务。
## 安装依赖
```bash
pip install huey
```
## 目录结构
```
queue/
├── __init__.py # 包初始化文件
├── config.py # 队列配置SqliteHuey配置
├── tasks.py # 文件处理任务定义
├── manager.py # 队列管理器
├── consumer.py # 队列消费者(工作进程)
├── example.py # 使用示例
└── README.md # 说明文档
```
## 核心功能
### 1. 队列配置 (config.py)
- 使用 SqliteHuey 作为消息队列
- 数据库文件存储在 `queue_data/huey.db`
- 支持任务重试和错误存储
### 2. 文件处理任务 (tasks.py)
- `process_file_async`: 异步处理单个文件
- `process_multiple_files_async`: 批量异步处理文件
- `process_zip_file_async`: 异步处理zip压缩文件
- `cleanup_processed_files`: 清理旧的文件
### 3. 队列管理器 (manager.py)
- 任务提交和管理
- 队列状态监控
- 任务结果查询
- 任务记录清理
## 使用方法
### 1. 启动队列消费者
```bash
# 启动默认配置的消费者
python queue/consumer.py
# 指定工作线程数
python queue/consumer.py --workers 4
# 查看队列统计信息
python queue/consumer.py --stats
# 检查队列状态
python queue/consumer.py --check
# 清空队列
python queue/consumer.py --flush
```
### 2. 在代码中使用队列
```python
from queue.manager import queue_manager
# 处理单个文件
task_id = queue_manager.enqueue_file(
project_id="my_project",
file_path="/path/to/file.txt",
original_filename="myfile.txt"
)
# 批量处理文件
task_ids = queue_manager.enqueue_multiple_files(
project_id="my_project",
file_paths=["/path/file1.txt", "/path/file2.txt"],
original_filenames=["file1.txt", "file2.txt"]
)
# 处理zip文件
task_id = queue_manager.enqueue_zip_file(
project_id="my_project",
zip_path="/path/to/archive.zip"
)
# 查看任务状态
status = queue_manager.get_task_status(task_id)
print(status)
# 获取队列统计信息
stats = queue_manager.get_queue_stats()
print(stats)
```
### 3. 运行示例
```bash
python queue/example.py
```
## 配置说明
### 队列配置参数 (config.py)
- `filename`: SQLite数据库文件路径
- `always_eager`: 是否立即执行任务开发时可设为True
- `utc`: 是否使用UTC时间
- `compression_level`: 压缩级别
- `store_errors`: 是否存储错误信息
- `max_retries`: 最大重试次数
- `retry_delay`: 重试延迟
### 消费者参数 (consumer.py)
- `--workers`: 工作线程数默认2
- `--worker-type`: 工作类型threads/greenlets/processes
- `--stats`: 显示统计信息
- `--check`: 检查队列状态
- `--flush`: 清空队列
## 任务状态
- `pending`: 等待处理
- `running`: 正在处理
- `complete/finished`: 处理完成
- `error`: 处理失败
- `scheduled`: 定时任务
## 最佳实践
1. **生产环境建议**:
- 设置合适的工作线程数建议CPU核心数的1-2倍
- 定期清理旧的任务记录
- 监控队列状态和任务执行情况
2. **开发环境建议**:
- 可以设置 `always_eager=True` 立即执行任务进行调试
- 使用 `--check` 参数查看队列状态
- 运行示例代码了解功能
3. **错误处理**:
- 任务失败后会自动重试最多3次
- 错误信息会存储在数据库中
- 可以通过 `get_task_status()` 查看错误详情
## 故障排除
1. **数据库锁定**: 确保只有一个消费者实例在运行
2. **任务卡住**: 检查文件路径和权限
3. **内存不足**: 调整工作线程数或使用进程模式
4. **磁盘空间**: 定期清理旧文件和任务记录

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python3
"""
Queue package initialization.
"""
from .config import huey
from .manager import QueueManager, queue_manager
from .tasks import (
process_file_async,
process_multiple_files_async,
process_zip_file_async,
cleanup_processed_files
)
__all__ = [
"huey",
"QueueManager",
"queue_manager",
"process_file_async",
"process_multiple_files_async",
"process_zip_file_async",
"cleanup_processed_files"
]

View File

@ -1,31 +0,0 @@
#!/usr/bin/env python3
"""
Queue configuration using SqliteHuey for asynchronous file processing.
"""
import os
import logging
from huey import SqliteHuey
from datetime import timedelta
# Configure logging
logger = logging.getLogger('app')
# Ensure projects/queue_data directory exists
queue_data_dir = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data')
os.makedirs(queue_data_dir, exist_ok=True)
# Initialize SqliteHuey
huey = SqliteHuey(
filename=os.path.join(queue_data_dir, 'huey.db'),
name='file_processor', # Queue name
always_eager=False, # Set to False to enable async processing
utc=True, # Use UTC time
)
# Set default task configuration
huey.store_errors = True # Store error information
huey.max_retries = 3 # Maximum retry count
huey.retry_delay = timedelta(seconds=60) # Retry delay
logger.info(f"SqliteHuey queue initialized, database path: {os.path.join(queue_data_dir, 'huey.db')}")

View File

@ -1,171 +0,0 @@
#!/usr/bin/env python3
"""
Queue consumer for processing file tasks.
"""
import sys
import os
import time
import signal
import argparse
from pathlib import Path
# Add project root directory to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, cleanup_project_async
from huey.consumer import Consumer
class QueueConsumer:
"""Queue consumer for processing async tasks."""
def __init__(self, worker_type: str = "threads", workers: int = 2):
self.huey = huey
self.worker_type = worker_type
self.workers = workers
self.running = False
self.consumer = None
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Signal handler for graceful shutdown."""
print(f"\nReceived signal {signum}, shutting down queue consumer...")
self.running = False
def start(self):
"""Start the queue consumer."""
print(f"Starting queue consumer...")
print(f"Worker threads: {self.workers}")
print(f"Worker type: {self.worker_type}")
print(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}")
print("Press Ctrl+C to stop the consumer")
self.running = True
try:
# Create Huey consumer
self.consumer = Consumer(self.huey, workers=self.workers, worker_type=self.worker_type.rstrip('s'))
# Display queue statistics
stats = queue_manager.get_queue_stats()
print(f"Current queue status: {stats}")
# Start consumer run loop
print("Consumer starting task processing...")
self.consumer.run()
except KeyboardInterrupt:
print("\nReceived interrupt signal, shutting down...")
except Exception as e:
print(f"Queue consumer runtime error: {str(e)}")
finally:
self.stop()
def stop(self):
"""Stop the queue consumer."""
print("Stopping queue consumer...")
try:
if self.consumer:
# Stop the consumer
self.consumer.stop()
self.consumer = None
print("Queue consumer stopped")
except Exception as e:
print(f"Error stopping queue consumer: {str(e)}")
def process_scheduled_tasks(self):
"""Process scheduled tasks."""
print("Processing scheduled tasks...")
# Additional scheduled task processing logic can be added here
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="File processing queue consumer")
parser.add_argument(
"--workers",
type=int,
default=2,
help="Number of worker threads (default: 2)"
)
parser.add_argument(
"--worker-type",
choices=["threads", "greenlets", "processes"],
default="threads",
help="Worker thread type (default: threads)"
)
parser.add_argument(
"--stats",
action="store_true",
help="Display queue statistics and exit"
)
parser.add_argument(
"--flush",
action="store_true",
help="Flush the queue and exit"
)
parser.add_argument(
"--check",
action="store_true",
help="Check queue status and exit"
)
args = parser.parse_args()
# Initialize consumer
consumer = QueueConsumer(
worker_type=args.worker_type,
workers=args.workers
)
# Handle different command-line options
if args.stats:
print("=== Queue Statistics ===")
stats = queue_manager.get_queue_stats()
print(f"Total tasks: {stats.get('total_tasks', 0)}")
print(f"Pending tasks: {stats.get('pending_tasks', 0)}")
print(f"Running tasks: {stats.get('running_tasks', 0)}")
print(f"Completed tasks: {stats.get('completed_tasks', 0)}")
print(f"Error tasks: {stats.get('error_tasks', 0)}")
print(f"Scheduled tasks: {stats.get('scheduled_tasks', 0)}")
print(f"Database: {stats.get('queue_database', 'N/A')}")
return
if args.flush:
print("=== Flushing Queue ===")
try:
# Flush all tasks
consumer.huey.flush()
print("Queue flushed")
except Exception as e:
print(f"Failed to flush queue: {str(e)}")
return
if args.check:
print("=== Checking Queue Status ===")
stats = queue_manager.get_queue_stats()
print(f"Queue status: OK" if "error" not in stats else f"Queue status: ERROR - {stats['error']}")
pending_tasks = queue_manager.list_pending_tasks(limit=10)
if pending_tasks:
print(f"\nPending tasks (showing up to 10):")
for task in pending_tasks:
print(f" Task ID: {task['task_id']}, Status: {task['status']}, Created: {task['created_time']}")
else:
print("No pending tasks")
return
# Start consumer
print("=== Starting File Processing Queue Consumer ===")
consumer.start()
if __name__ == "__main__":
main()

View File

@ -1,132 +0,0 @@
#!/usr/bin/env python3
"""
Example usage of the queue system.
"""
import sys
import time
from pathlib import Path
# Add project root directory to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from task_queue.manager import queue_manager
from task_queue.tasks import process_file_async, process_multiple_files_async
def example_single_file():
"""Example: Process a single file."""
print("=== Example: Process a single file ===")
project_id = "test_project"
file_path = "public/test_document.txt"
# Enqueue file for processing
task_id = queue_manager.enqueue_file(
project_id=project_id,
file_path=file_path,
original_filename="example_document.txt"
)
print(f"Task submitted, task ID: {task_id}")
# Check task status
time.sleep(2)
status = queue_manager.get_task_status(task_id)
print(f"Task status: {status}")
def example_multiple_files():
"""Example: Batch process files."""
print("\n=== Example: Batch process files ===")
project_id = "test_project_batch"
file_paths = [
"public/test_document.txt",
"public/goods.xlsx" # Assuming this file exists
]
original_filenames = [
"batch_document_1.txt",
"batch_goods.xlsx"
]
# Enqueue multiple files for processing
task_ids = queue_manager.enqueue_multiple_files(
project_id=project_id,
file_paths=file_paths,
original_filenames=original_filenames
)
print(f"Batch tasks submitted, task IDs: {task_ids}")
def example_zip_file():
"""Example: Process a zip file."""
print("\n=== Example: Process a zip file ===")
project_id = "test_project_zip"
zip_path = "public/all_hp_product_spec_book2506.zip"
# Enqueue zip file for processing
task_id = queue_manager.enqueue_zip_file(
project_id=project_id,
zip_path=zip_path
)
print(f"Zip task submitted, task ID: {task_id}")
def example_queue_stats():
"""Example: Get queue statistics."""
print("\n=== Example: Queue statistics ===")
stats = queue_manager.get_queue_stats()
print("Queue statistics:")
for key, value in stats.items():
if key != "recent_tasks":
print(f" {key}: {value}")
def example_cleanup():
"""Example: Cleanup tasks."""
print("\n=== Example: Cleanup tasks ===")
project_id = "test_project"
# Enqueue cleanup task (delayed 10 seconds)
task_id = queue_manager.enqueue_cleanup_task(
project_id=project_id,
older_than_days=1, # Clean files older than 1 day
delay=10
)
print(f"Cleanup task submitted, task ID: {task_id}")
def main():
"""Main entry point."""
print("Queue System Usage Examples")
print("=" * 50)
try:
# Run examples
example_single_file()
example_multiple_files()
example_zip_file()
example_queue_stats()
example_cleanup()
print("\n" + "=" * 50)
print("Examples completed!")
print("\nTo check task execution, run:")
print("python queue/consumer.py --check")
print("\nTo start the queue consumer, run:")
print("python queue/consumer.py")
except Exception as e:
print(f"Error running examples: {str(e)}")
if __name__ == "__main__":
main()

View File

@ -1,499 +0,0 @@
#!/usr/bin/env python3
"""
Queue tasks for file processing integration.
"""
import os
import json
import time
import hashlib
import shutil
from typing import Dict, List, Optional, Any
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.task_status import task_status_store
from utils import download_dataset_files, save_processed_files_log, load_processed_files_log
from utils.dataset_manager import remove_dataset_directory_by_key
def scan_upload_folder(upload_dir: str) -> List[str]:
"""
Scan all supported file formats in the upload folder.
Args:
upload_dir: Upload folder path
Returns:
List[str]: List of supported file paths
"""
supported_extensions = {
# Text files
'.txt', '.md', '.rtf',
# Document files
'.doc', '.docx', '.pdf', '.odt',
# Spreadsheet files
'.xls', '.xlsx', '.csv', '.ods',
# Presentation files
'.ppt', '.pptx', '.odp',
# E-books
'.epub', '.mobi',
# Web files
'.html', '.htm',
# Config files
'.json', '.xml', '.yaml', '.yml',
# Code files
'.py', '.js', '.java', '.cpp', '.c', '.go', '.rs',
# Archive files
'.zip', '.rar', '.7z', '.tar', '.gz'
}
scanned_files = []
if not os.path.exists(upload_dir):
return scanned_files
for root, dirs, files in os.walk(upload_dir):
for file in files:
# Skip hidden files and system files
if file.startswith('.') or file.startswith('~'):
continue
file_path = os.path.join(root, file)
file_extension = os.path.splitext(file)[1].lower()
# Check if file extension is supported
if file_extension in supported_extensions:
scanned_files.append(file_path)
else:
# For files without extension, try to process them (may be text files)
if not file_extension:
try:
# Try reading the file header to determine if it's a text file
with open(file_path, 'r', encoding='utf-8') as f:
f.read(1024) # Read the first 1KB
scanned_files.append(file_path)
except (UnicodeDecodeError, PermissionError):
# Not a text file or unreadable, skip
pass
return scanned_files
@huey.task()
def process_files_async(
dataset_id: str,
files: Optional[Dict[str, List[str]]] = None,
upload_folder: Optional[Dict[str, str]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Asynchronously process file tasks - compatible with existing files/process API.
Args:
dataset_id: Unique project ID
files: Dictionary of file paths grouped by key
upload_folder: Upload folder dictionary organized by group name, e.g. {'group1': 'my_project1', 'group2': 'my_project2'}
task_id: Task ID (for status tracking)
Returns:
Processing result dictionary
"""
try:
print(f"Starting async file processing task, project ID: {dataset_id}")
# If task_id is provided, set initial status
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="running"
)
# Ensure project directory exists
project_dir = os.path.join("projects", "data", dataset_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# Process files: use key-grouped format
processed_files_by_key = {}
# If upload_folder is provided, scan files in those folders
if upload_folder and not files:
scanned_files_by_group = {}
total_scanned_files = 0
for group_name, folder_name in upload_folder.items():
# Security check: prevent path traversal attacks
safe_folder_name = os.path.basename(folder_name)
upload_dir = os.path.join("projects", "uploads", safe_folder_name)
if os.path.exists(upload_dir):
scanned_files = scan_upload_folder(upload_dir)
if scanned_files:
scanned_files_by_group[group_name] = scanned_files
total_scanned_files += len(scanned_files)
print(f"Scanned {len(scanned_files)} files from upload folder '{safe_folder_name}' (group: {group_name})")
else:
print(f"No supported files found in upload folder '{safe_folder_name}' (group: {group_name})")
else:
print(f"Upload folder does not exist: {upload_dir} (group: {group_name})")
if scanned_files_by_group:
files = scanned_files_by_group
print(f"Total scanned {total_scanned_files} files from {len(scanned_files_by_group)} groups")
else:
print("No supported files found in any upload folder")
if files:
# Use files from the request (grouped by key)
# Since this is an async task, call synchronously
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files))
total_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"Async processed {total_files} dataset files across {len(processed_files_by_key)} keys, project ID: {dataset_id}")
else:
print(f"No files provided in request, project ID: {dataset_id}")
# Collect all document.txt files in the project directory
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# Generate project README.md file
try:
from utils.project_manager import save_project_readme
save_project_readme(dataset_id)
print(f"README.md generated, project ID: {dataset_id}")
except Exception as e:
print(f"Failed to generate README.md, project ID: {dataset_id}, error: {str(e)}")
# Does not affect main processing flow, continue
# Build result file list
result_files = []
for key in processed_files_by_key.keys():
# Add corresponding dataset document.txt path
document_path = os.path.join("projects", "data", dataset_id, "datasets", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# Also add document.txt files that exist but are not in processed_files_by_key
existing_document_paths = set(result_files) # Avoid duplicates
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"Successfully processed {len(result_files)} document files across {len(processed_files_by_key)} keys",
"dataset_id": dataset_id,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_processed": sum(len(files_list) for files_list in processed_files_by_key.values()),
"processing_time": time.time()
}
# Update task status to completed
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"Async file processing task completed: {dataset_id}")
return result
except Exception as e:
error_msg = f"Error during async file processing: {str(e)}"
print(error_msg)
# Update task status to error
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}
@huey.task()
def process_files_incremental_async(
dataset_id: str,
files_to_add: Optional[Dict[str, List[str]]] = None,
files_to_remove: Optional[Dict[str, List[str]]] = None,
system_prompt: Optional[str] = None,
mcp_settings: Optional[List[Dict]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Incremental file processing task - supports adding and removing files.
Args:
dataset_id: Unique project ID
files_to_add: Dictionary of file paths to add, grouped by key
files_to_remove: Dictionary of file paths to remove, grouped by key
system_prompt: System prompt
mcp_settings: MCP settings
task_id: Task ID (for status tracking)
Returns:
Processing result dictionary
"""
try:
print(f"Starting incremental file processing task, project ID: {dataset_id}")
# If task_id is provided, set initial status
if task_id:
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="running"
)
# Ensure project directory exists
project_dir = os.path.join("projects", "data", dataset_id)
if not os.path.exists(project_dir):
os.makedirs(project_dir, exist_ok=True)
# Load existing processing log
processed_log = load_processed_files_log(dataset_id)
print(f"Loaded existing processing log with {len(processed_log)} file records")
removed_files = []
added_files = []
# 1. Process removals
if files_to_remove:
print(f"Starting removal processing across {len(files_to_remove)} key groups")
for key, file_list in files_to_remove.items():
if not file_list: # If file list is empty, remove the entire key group
print(f"Removing entire key group: {key}")
if remove_dataset_directory_by_key(dataset_id, key):
removed_files.append(f"dataset/{key}")
# Remove all records for this key from the processing log
keys_to_remove = [file_hash for file_hash, file_info in processed_log.items()
if file_info.get('key') == key]
for file_hash in keys_to_remove:
del processed_log[file_hash]
removed_files.append(f"log_entry:{file_hash}")
else:
# Remove specific files
for file_path in file_list:
print(f"Removing specific file: {key}/{file_path}")
# Actually delete the file
filename = os.path.basename(file_path)
# Delete original file
source_file = os.path.join("projects", "data", dataset_id, "files", key, filename)
if os.path.exists(source_file):
os.remove(source_file)
removed_files.append(f"file:{key}/{filename}")
# Delete processed file directory
processed_dir = os.path.join("projects", "data", dataset_id, "processed", key, filename)
if os.path.exists(processed_dir):
shutil.rmtree(processed_dir)
removed_files.append(f"processed:{key}/{filename}")
# Compute file hash to find in log
file_hash = hashlib.md5(file_path.encode('utf-8')).hexdigest()
# Remove from processing log
if file_hash in processed_log:
del processed_log[file_hash]
removed_files.append(f"log_entry:{file_hash}")
# 2. Process additions
processed_files_by_key = {}
if files_to_add:
print(f"Starting addition processing across {len(files_to_add)} key groups")
# Use async processing to download files
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
processed_files_by_key = loop.run_until_complete(download_dataset_files(dataset_id, files_to_add, incremental_mode=True))
total_added_files = sum(len(files_list) for files_list in processed_files_by_key.values())
print(f"Async processed {total_added_files} dataset files across {len(processed_files_by_key)} keys, project ID: {dataset_id}")
# Record added files
for key, files_list in processed_files_by_key.items():
for file_path in files_list:
added_files.append(f"{key}/{file_path}")
else:
print(f"No files to add provided in request, project ID: {dataset_id}")
# Save updated processing log
save_processed_files_log(dataset_id, processed_log)
print(f"Updated processing log, now contains {len(processed_log)} file records")
# Save system_prompt and mcp_settings to project directory (if provided)
if system_prompt:
system_prompt_file = os.path.join(project_dir, "system_prompt.md")
with open(system_prompt_file, 'w', encoding='utf-8') as f:
f.write(system_prompt)
print(f"Saved system_prompt, project ID: {dataset_id}")
if mcp_settings:
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
with open(mcp_settings_file, 'w', encoding='utf-8') as f:
json.dump(mcp_settings, f, ensure_ascii=False, indent=2)
print(f"Saved mcp_settings, project ID: {dataset_id}")
# Generate project README.md file
try:
from utils.project_manager import save_project_readme
save_project_readme(dataset_id)
print(f"README.md generated, project ID: {dataset_id}")
except Exception as e:
print(f"Failed to generate README.md, project ID: {dataset_id}, error: {str(e)}")
# Does not affect main processing flow, continue
# Collect all document.txt files in the project directory
document_files = []
for root, dirs, files_list in os.walk(project_dir):
for file in files_list:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# Build result file list
result_files = []
for key in processed_files_by_key.keys():
# Add corresponding dataset document.txt path
document_path = os.path.join("projects", "data", dataset_id, "datasets", key, "document.txt")
if os.path.exists(document_path):
result_files.append(document_path)
# Also add document.txt files that exist but are not in processed_files_by_key
existing_document_paths = set(result_files) # Avoid duplicates
for doc_file in document_files:
if doc_file not in existing_document_paths:
result_files.append(doc_file)
result = {
"status": "success",
"message": f"Incremental processing complete - added {len(added_files)} files, removed {len(removed_files)} files, {len(result_files)} document files remaining",
"dataset_id": dataset_id,
"removed_files": removed_files,
"added_files": added_files,
"processed_files": result_files,
"processed_files_by_key": processed_files_by_key,
"document_files": document_files,
"total_files_added": sum(len(files_list) for files_list in processed_files_by_key.values()),
"total_files_removed": len(removed_files),
"final_files_count": len(result_files),
"processing_time": time.time()
}
# Update task status to completed
if task_id:
task_status_store.update_status(
task_id=task_id,
status="completed",
result=result
)
print(f"Incremental file processing task completed: {dataset_id}")
return result
except Exception as e:
error_msg = f"Error during incremental file processing: {str(e)}"
print(error_msg)
# Update task status to error
if task_id:
task_status_store.update_status(
task_id=task_id,
status="failed",
error=error_msg
)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}
@huey.task()
def cleanup_project_async(
dataset_id: str,
remove_all: bool = False
) -> Dict[str, Any]:
"""
Asynchronously clean up project files.
Args:
dataset_id: Unique project ID
remove_all: Whether to remove the entire project directory
Returns:
Cleanup result dictionary
"""
try:
print(f"Starting async project cleanup, project ID: {dataset_id}")
project_dir = os.path.join("projects", "data", dataset_id)
removed_items = []
if remove_all and os.path.exists(project_dir):
import shutil
shutil.rmtree(project_dir)
removed_items.append(project_dir)
result = {
"status": "success",
"message": f"Deleted entire project directory: {project_dir}",
"dataset_id": dataset_id,
"removed_items": removed_items,
"action": "remove_all"
}
else:
# Only clean processing log
log_file = os.path.join(project_dir, "processed_files.json")
if os.path.exists(log_file):
os.remove(log_file)
removed_items.append(log_file)
result = {
"status": "success",
"message": f"Cleaned project processing log, project ID: {dataset_id}",
"dataset_id": dataset_id,
"removed_items": removed_items,
"action": "cleanup_logs"
}
print(f"Async cleanup task completed: {dataset_id}")
return result
except Exception as e:
error_msg = f"Error during async project cleanup: {str(e)}"
print(error_msg)
return {
"status": "error",
"message": error_msg,
"dataset_id": dataset_id,
"error": str(e)
}

View File

@ -1,228 +0,0 @@
#!/usr/bin/env python3
"""
Queue manager for handling file processing queues.
"""
import os
import json
import time
import logging
from typing import Dict, List, Optional, Any
from huey import Huey
from huey.api import Task
from datetime import datetime, timedelta
# Configure logging
logger = logging.getLogger('app')
from .config import huey
from .tasks import process_file_async, process_multiple_files_async, process_zip_file_async, cleanup_processed_files
class QueueManager:
"""Queue manager for file processing tasks."""
def __init__(self):
self.huey = huey
logger.info(f"Queue manager initialized with database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}")
def enqueue_file(
self,
project_id: str,
file_path: str,
original_filename: str = None,
delay: int = 0
) -> str:
"""
Add a file to the processing queue.
Args:
project_id: Project ID
file_path: File path
original_filename: Original filename
delay: Delay before execution in seconds
Returns:
Task ID
"""
if delay > 0:
task = process_file_async.schedule(
args=(project_id, file_path, original_filename),
delay=timedelta(seconds=delay)
)
else:
task = process_file_async(project_id, file_path, original_filename)
logger.info(f"File queued for processing: {file_path}, task ID: {task.id}")
return task.id
def enqueue_multiple_files(
self,
project_id: str,
file_paths: List[str],
original_filenames: List[str] = None,
delay: int = 0
) -> List[str]:
"""
Add multiple files to the processing queue.
Args:
project_id: Project ID
file_paths: List of file paths
original_filenames: List of original filenames
delay: Delay before execution in seconds
Returns:
List of task IDs
"""
if delay > 0:
task = process_multiple_files_async.schedule(
args=(project_id, file_paths, original_filenames),
delay=timedelta(seconds=delay)
)
else:
task = process_multiple_files_async(project_id, file_paths, original_filenames)
logger.info(f"Batch files queued for processing: {len(file_paths)} files, task ID: {task.id}")
return [task.id]
def enqueue_zip_file(
self,
project_id: str,
zip_path: str,
extract_to: str = None,
delay: int = 0
) -> str:
"""
Add a zip file to the processing queue.
Args:
project_id: Project ID
zip_path: Path to the zip file
extract_to: Extraction target directory
delay: Delay before execution in seconds
Returns:
Task ID
"""
if delay > 0:
task = process_zip_file_async.schedule(
args=(project_id, zip_path, extract_to),
delay=timedelta(seconds=delay)
)
else:
task = process_zip_file_async(project_id, zip_path, extract_to)
logger.info(f"Zip file queued for processing: {zip_path}, task ID: {task.id}")
return task.id
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""
Get task status.
Args:
task_id: Task ID
Returns:
Task status information
"""
try:
# Try getting the task result from result storage
try:
# Use Huey's built-in result lookup when available
if hasattr(self.huey, 'result') and self.huey.result:
result = self.huey.result(task_id)
if result is not None:
return {
"task_id": task_id,
"status": "complete",
"result": result
}
except Exception:
pass
# Check whether the task is in the pending queue
try:
pending_tasks = list(self.huey.pending())
for task in pending_tasks:
if hasattr(task, 'id') and task.id == task_id:
return {
"task_id": task_id,
"status": "pending"
}
except Exception:
pass
# Check whether the task is in the scheduled queue
try:
scheduled_tasks = list(self.huey.scheduled())
for task in scheduled_tasks:
if hasattr(task, 'id') and task.id == task_id:
return {
"task_id": task_id,
"status": "scheduled"
}
except Exception:
pass
# If not found anywhere, it may not exist or may have completed with cleaned results
return {
"task_id": task_id,
"status": "unknown",
"message": "Task status is unknown; it may already be complete or may not exist"
}
except Exception as e:
return {
"task_id": task_id,
"status": "error",
"message": f"Failed to get task status: {str(e)}"
}
def get_queue_stats(self) -> Dict[str, Any]:
"""
Get queue statistics.
Returns:
Queue statistics information
"""
try:
# Use a simplified approach for queue statistics
stats = {
"total_tasks": 0,
"pending_tasks": 0,
"running_tasks": 0,
"completed_tasks": 0,
"error_tasks": 0,
"scheduled_tasks": 0,
"recent_tasks": [],
"queue_database": os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')
}
# Try to get the number of pending tasks
try:
pending_tasks = list(self.huey.pending())
stats["pending_tasks"] = len(pending_tasks)
stats["total_tasks"] += len(pending_tasks)
except Exception as e:
logger.error(f"Failed to get pending tasks: {e}")
# Try to get the number of scheduled tasks
try:
scheduled_tasks = list(self.huey.scheduled())
stats["scheduled_tasks"] = len(scheduled_tasks)
stats["total_tasks"] += len(scheduled_tasks)
except Exception as e:
logger.error(f"Failed to get scheduled tasks: {e}")
return stats
except Exception as e:
return {
"error": str(e),
"queue_database": os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')
}
# Global singleton instance
queue_manager = QueueManager()

View File

@ -1,286 +0,0 @@
#!/usr/bin/env python3
"""
Optimized queue consumer with integrated performance monitoring.
"""
import sys
import os
import time
import signal
import argparse
import multiprocessing
import logging
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import threading
# Configure logging
logger = logging.getLogger('app')
# Add project root directory to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, cleanup_project_async
from huey.consumer import Consumer
class OptimizedQueueConsumer:
"""Optimized queue consumer with integrated performance monitoring."""
def __init__(self, worker_type: str = "threads", workers: int = 2):
self.huey = huey
self.worker_type = worker_type
self.workers = workers
self.running = False
self.consumer = None
self.processed_count = 0
self.start_time = None
# Performance monitoring
self.performance_stats = {
'tasks_processed': 0,
'tasks_failed': 0,
'avg_processing_time': 0,
'start_time': None,
'last_activity': None
}
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Signal handler for graceful shutdown."""
logger.info(f"\nReceived signal {signum}, shutting down queue consumer...")
self.running = False
if self.consumer:
self.consumer.stop()
def setup_optimizations(self):
"""Set up performance optimizations."""
# Set environment variables
env_vars = {
'PYTHONUNBUFFERED': '1',
'PYTHONDONTWRITEBYTECODE': '1',
}
for key, value in env_vars.items():
os.environ[key] = value
# Optimize huey configuration
if hasattr(huey, 'immediate'):
huey.immediate = False
# Adjust based on worker type
if self.worker_type == "threads":
# Thread pool optimization
if hasattr(huey, 'worker_type'):
huey.worker_type = 'threads'
# Set thread pool size
if hasattr(huey, 'always_eager'):
huey.always_eager = False
logger.info("Queue consumer optimization setup complete:")
logger.info(f"- Worker type: {self.worker_type}")
logger.info(f"- Worker count: {self.workers}")
def monitor_performance(self):
"""Performance monitoring thread."""
while self.running:
time.sleep(30) # Output statistics every 30 seconds
if self.start_time:
elapsed = time.time() - self.start_time
rate = self.performance_stats['tasks_processed'] / max(1, elapsed)
logger.info(f"\n[Performance Stats]")
logger.info(f"- Uptime: {elapsed:.1f}s")
logger.info(f"- Tasks processed: {self.performance_stats['tasks_processed']}")
logger.info(f"- Failed tasks: {self.performance_stats['tasks_failed']}")
logger.info(f"- Average processing rate: {rate:.2f} tasks/s")
if self.performance_stats['avg_processing_time'] > 0:
logger.info(f"- Average processing time: {self.performance_stats['avg_processing_time']:.2f}s")
def start(self):
"""Start the queue consumer."""
logger.info("=" * 60)
logger.info("Optimized queue consumer starting")
logger.info("=" * 60)
# Apply optimizations
self.setup_optimizations()
logger.info(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}")
logger.info("Press Ctrl+C to stop the consumer")
self.running = True
self.start_time = time.time()
self.performance_stats['start_time'] = self.start_time
# Start performance monitoring thread
monitor_thread = threading.Thread(target=self.monitor_performance, daemon=True)
monitor_thread.start()
try:
# Create consumer
self.consumer = Consumer(
self.huey,
workers=self.workers,
worker_type=self.worker_type,
max_delay=60.0, # Maximum delay
check_delay=1.0, # Check interval
periodic=True, # Enable periodic tasks
)
logger.info("Queue consumer started, waiting for tasks...")
# Start the consumer
self.consumer.run()
except KeyboardInterrupt:
logger.info("\nReceived keyboard interrupt signal")
except Exception as e:
logger.error(f"Queue consumer runtime error: {e}")
import traceback
traceback.print_exc()
finally:
self.shutdown()
def shutdown(self):
"""Shut down the queue consumer."""
logger.info("\nShutting down queue consumer...")
self.running = False
if self.consumer:
try:
self.consumer.stop()
logger.info("Queue consumer stopped")
except Exception as e:
logger.error(f"Error stopping queue consumer: {e}")
# Output final statistics
if self.start_time:
elapsed = time.time() - self.start_time
logger.info(f"\n[Final Stats]")
logger.info(f"- Total uptime: {elapsed:.1f}s")
logger.info(f"- Total tasks processed: {self.performance_stats['tasks_processed']}")
logger.info(f"- Total failed tasks: {self.performance_stats['tasks_failed']}")
if self.performance_stats['tasks_processed'] > 0:
rate = self.performance_stats['tasks_processed'] / elapsed
logger.info(f"- Average processing rate: {rate:.2f} tasks/s")
def calculate_optimal_workers():
"""Calculate the optimal number of worker threads."""
cpu_count = multiprocessing.cpu_count()
# Based on CPU core count and system resources
if cpu_count <= 2:
return 2
elif cpu_count <= 4:
return 4
else:
return min(8, cpu_count)
def check_queue_status():
"""Check queue status."""
try:
stats = queue_manager.get_queue_stats()
logger.info("\n[Queue Status]")
if isinstance(stats, dict):
if 'total_tasks' in stats:
logger.info(f"- Total tasks: {stats['total_tasks']}")
if 'pending_tasks' in stats:
logger.info(f"- Pending tasks: {stats['pending_tasks']}")
if 'scheduled_tasks' in stats:
logger.info(f"- Scheduled tasks: {stats['scheduled_tasks']}")
# Check database file
db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')
if os.path.exists(db_path):
size = os.path.getsize(db_path)
logger.info(f"- Database size: {size} bytes")
else:
logger.info("- Database file: not found")
except Exception as e:
logger.error(f"Failed to get queue status: {e}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Optimized queue consumer")
parser.add_argument(
"--workers",
type=int,
default=calculate_optimal_workers(),
help=f"Number of worker threads (default: {calculate_optimal_workers()})"
)
parser.add_argument(
"--worker-type",
type=str,
default="threads",
choices=["threads", "greenlets", "gevent"],
help="Worker type (default: threads)"
)
parser.add_argument(
"--check-status",
action="store_true",
help="Check queue status and exit"
)
parser.add_argument(
"--profile",
type=str,
default="balanced",
choices=["low_memory", "balanced", "high_performance"],
help="Performance profile"
)
args = parser.parse_args()
# Apply performance profile
if args.profile == "low_memory":
os.environ['PYTHONOPTIMIZE'] = '1'
if args.workers > 2:
args.workers = 2
logger.info(f"Low memory mode: adjusted worker count to {args.workers}")
elif args.profile == "high_performance":
if args.workers < 4:
args.workers = 4
logger.info(f"High performance mode: adjusted worker count to {args.workers}")
# Check queue status
if args.check_status:
check_queue_status()
return
# Check environment
try:
import psutil
memory = psutil.virtual_memory()
logger.info("[System Info]")
logger.info(f"- CPU cores: {multiprocessing.cpu_count()}")
logger.info(f"- Available memory: {memory.available / (1024**3):.1f}GB")
logger.info(f"- Memory usage: {memory.percent:.1f}%")
except ImportError:
logger.info("[Tip] Install psutil to display system info: pip install psutil")
# Create and start the queue consumer
consumer = OptimizedQueueConsumer(
worker_type=args.worker_type,
workers=args.workers
)
consumer.start()
if __name__ == "__main__":
main()

View File

@ -1,210 +0,0 @@
#!/usr/bin/env python3
"""
Task status SQLite storage system.
"""
import json
import os
import sqlite3
import time
from typing import Dict, Optional, Any, List
from pathlib import Path
class TaskStatusStore:
"""SQLite-based task status store."""
def __init__(self, db_path: str = "projects/queue_data/task_status.db"):
self.db_path = db_path
# Ensure directory exists
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._init_database()
def _init_database(self):
"""Initialize database tables."""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS task_status (
task_id TEXT PRIMARY KEY,
unique_id TEXT NOT NULL,
status TEXT NOT NULL,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
result TEXT,
error TEXT
)
''')
conn.commit()
def set_status(self, task_id: str, unique_id: str, status: str,
result: Optional[Dict] = None, error: Optional[str] = None):
"""Set task status."""
current_time = time.time()
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO task_status
(task_id, unique_id, status, created_at, updated_at, result, error)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
task_id, unique_id, status, current_time, current_time,
json.dumps(result) if result else None,
error
))
conn.commit()
def get_status(self, task_id: str) -> Optional[Dict]:
"""Get task status."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
'SELECT * FROM task_status WHERE task_id = ?', (task_id,)
)
row = cursor.fetchone()
if not row:
return None
result = dict(row)
# Parse JSON field
if result['result']:
result['result'] = json.loads(result['result'])
return result
def update_status(self, task_id: str, status: str,
result: Optional[Dict] = None, error: Optional[str] = None):
"""Update task status."""
with sqlite3.connect(self.db_path) as conn:
# Check if task exists
cursor = conn.execute(
'SELECT task_id FROM task_status WHERE task_id = ?', (task_id,)
)
if not cursor.fetchone():
return False
# Update status
conn.execute('''
UPDATE task_status
SET status = ?, updated_at = ?, result = ?, error = ?
WHERE task_id = ?
''', (
status, time.time(),
json.dumps(result) if result else None,
error, task_id
))
conn.commit()
return True
def delete_status(self, task_id: str):
"""Delete task status."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'DELETE FROM task_status WHERE task_id = ?', (task_id,)
)
conn.commit()
return cursor.rowcount > 0
def list_all(self) -> Dict[str, Dict]:
"""List all task statuses."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
'SELECT * FROM task_status ORDER BY updated_at DESC'
)
all_tasks = {}
for row in cursor:
result = dict(row)
# Parse JSON field
if result['result']:
result['result'] = json.loads(result['result'])
all_tasks[result['task_id']] = result
return all_tasks
def get_by_unique_id(self, unique_id: str) -> List[Dict]:
"""Get all tasks for a given project ID."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
'SELECT * FROM task_status WHERE unique_id = ? ORDER BY updated_at DESC',
(unique_id,)
)
tasks = []
for row in cursor:
result = dict(row)
if result['result']:
result['result'] = json.loads(result['result'])
tasks.append(result)
return tasks
def cleanup_old_tasks(self, older_than_days: int = 7) -> int:
"""Clean up old task records."""
cutoff_time = time.time() - (older_than_days * 24 * 3600)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'DELETE FROM task_status WHERE updated_at < ?',
(cutoff_time,)
)
conn.commit()
return cursor.rowcount
def get_statistics(self) -> Dict[str, Any]:
"""Get task statistics."""
with sqlite3.connect(self.db_path) as conn:
# Total tasks
total = conn.execute('SELECT COUNT(*) FROM task_status').fetchone()[0]
# Status breakdown
status_stats = conn.execute('''
SELECT status, COUNT(*) as count
FROM task_status
GROUP BY status
''').fetchall()
# Tasks in the last 24 hours
recent = time.time() - (24 * 3600)
recent_tasks = conn.execute(
'SELECT COUNT(*) FROM task_status WHERE updated_at > ?',
(recent,)
).fetchone()[0]
return {
'total_tasks': total,
'status_breakdown': dict(status_stats),
'recent_24h': recent_tasks,
'database_path': self.db_path
}
def search_tasks(self, status: Optional[str] = None,
unique_id: Optional[str] = None,
limit: int = 100) -> List[Dict]:
"""Search tasks."""
query = 'SELECT * FROM task_status WHERE 1=1'
params = []
if status:
query += ' AND status = ?'
params.append(status)
if unique_id:
query += ' AND unique_id = ?'
params.append(unique_id)
query += ' ORDER BY updated_at DESC LIMIT ?'
params.append(limit)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(query, params)
tasks = []
for row in cursor:
result = dict(row)
if result['result']:
result['result'] = json.loads(result['result'])
tasks.append(result)
return tasks
# Global task status store instance
task_status_store = TaskStatusStore()

View File

@ -1,359 +0,0 @@
#!/usr/bin/env python3
"""
File processing tasks for the queue system.
"""
import os
import json
import time
import shutil
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any
from huey import crontab
# Configure logging
logger = logging.getLogger('app')
from .config import huey
from utils.file_utils import (
extract_zip_file,
get_file_hash,
load_processed_files_log,
save_processed_files_log,
get_document_preview
)
@huey.task()
def process_file_async(
project_id: str,
file_path: str,
original_filename: str = None,
target_directory: str = "files"
) -> Dict[str, Any]:
"""
Asynchronously process a single file.
Args:
project_id: Project ID
file_path: File path
original_filename: Original filename
target_directory: Target directory
Returns:
Processing result dictionary
"""
try:
logger.info(f"Starting file processing: {file_path}")
# Ensure project directory exists
project_dir = os.path.join("projects", project_id)
files_dir = os.path.join(project_dir, target_directory)
os.makedirs(files_dir, exist_ok=True)
# Get file hash as identifier
file_hash = get_file_hash(file_path)
# Check if file has already been processed
processed_log = load_processed_files_log(project_id)
if file_hash in processed_log:
logger.info(f"File already processed, skipping: {file_path}")
return {
"status": "skipped",
"message": "File already processed",
"file_hash": file_hash,
"project_id": project_id
}
# Process the file
result = _process_single_file(
file_path,
files_dir,
original_filename or os.path.basename(file_path)
)
# Update processing log
if result["status"] == "success":
processed_log[file_hash] = {
"original_path": file_path,
"original_filename": original_filename or os.path.basename(file_path),
"processed_at": str(time.time()),
"status": "processed",
"result": result
}
save_processed_files_log(project_id, processed_log)
result["file_hash"] = file_hash
result["project_id"] = project_id
logger.info(f"File processing complete: {file_path}, status: {result['status']}")
return result
except Exception as e:
error_msg = f"Error processing file: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"file_path": file_path,
"project_id": project_id
}
@huey.task()
def process_multiple_files_async(
project_id: str,
file_paths: List[str],
original_filenames: List[str] = None
) -> List[Dict[str, Any]]:
"""
Asynchronously process multiple files in batch.
Args:
project_id: Project ID
file_paths: List of file paths
original_filenames: List of original filenames
Returns:
List of processing results
"""
try:
logger.info(f"Starting batch processing of {len(file_paths)} files")
results = []
for i, file_path in enumerate(file_paths):
original_filename = original_filenames[i] if original_filenames and i < len(original_filenames) else None
# Create async task for each file
result = process_file_async(project_id, file_path, original_filename)
results.append(result)
logger.info(f"Batch file processing tasks submitted, total {len(results)} files")
return results
except Exception as e:
error_msg = f"Error during batch file processing: {str(e)}"
logger.error(error_msg)
return [{
"status": "error",
"message": error_msg,
"project_id": project_id
}]
@huey.task()
def process_zip_file_async(
project_id: str,
zip_path: str,
extract_to: str = None
) -> Dict[str, Any]:
"""
Asynchronously process a zip archive file.
Args:
project_id: Project ID
zip_path: Zip file path
extract_to: Extraction target directory
Returns:
Processing result dictionary
"""
try:
logger.info(f"Starting zip file processing: {zip_path}")
# Set extraction directory
if extract_to is None:
extract_to = os.path.join("projects", project_id, "extracted", os.path.basename(zip_path))
os.makedirs(extract_to, exist_ok=True)
# Extract files
extracted_files = extract_zip_file(zip_path, extract_to)
if not extracted_files:
return {
"status": "error",
"message": "Extraction failed or no supported files found",
"zip_path": zip_path,
"project_id": project_id
}
# Batch process extracted files
result = process_multiple_files_async(project_id, extracted_files)
return {
"status": "success",
"message": f"Zip file processing complete, extracted {len(extracted_files)} files",
"zip_path": zip_path,
"extract_to": extract_to,
"extracted_files": extracted_files,
"project_id": project_id,
"batch_task_result": result
}
except Exception as e:
error_msg = f"Error processing zip file: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"zip_path": zip_path,
"project_id": project_id
}
@huey.task()
def cleanup_processed_files(
project_id: str,
older_than_days: int = 30
) -> Dict[str, Any]:
"""
Clean up old processed files.
Args:
project_id: Project ID
older_than_days: Clean files older than this many days
Returns:
Cleanup result dictionary
"""
try:
logger.info(f"Starting cleanup of files older than {older_than_days} days in project {project_id}")
project_dir = os.path.join("projects", project_id)
if not os.path.exists(project_dir):
return {
"status": "error",
"message": "Project directory does not exist",
"project_id": project_id
}
current_time = time.time()
cutoff_time = current_time - (older_than_days * 24 * 3600)
cleaned_files = []
# Walk through project directory
for root, dirs, files in os.walk(project_dir):
for file in files:
file_path = os.path.join(root, file)
file_mtime = os.path.getmtime(file_path)
if file_mtime < cutoff_time:
try:
os.remove(file_path)
cleaned_files.append(file_path)
logger.info(f"Deleted old file: {file_path}")
except Exception as e:
logger.error(f"Failed to delete file {file_path}: {str(e)}")
# Clean up empty directories
for root, dirs, files in os.walk(project_dir, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
if not os.listdir(dir_path):
os.rmdir(dir_path)
logger.info(f"Deleted empty directory: {dir_path}")
except Exception as e:
logger.error(f"Failed to delete directory {dir_path}: {str(e)}")
return {
"status": "success",
"message": f"Cleanup complete, deleted {len(cleaned_files)} files",
"project_id": project_id,
"cleaned_files": cleaned_files,
"older_than_days": older_than_days
}
except Exception as e:
error_msg = f"Error during file cleanup: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"project_id": project_id
}
def _process_single_file(
file_path: str,
target_dir: str,
original_filename: str
) -> Dict[str, Any]:
"""
Internal method for processing a single file.
Args:
file_path: Source file path
target_dir: Target directory
original_filename: Original filename
Returns:
Processing result dictionary
"""
try:
# Check if file exists
if not os.path.exists(file_path):
return {
"status": "error",
"message": "Source file does not exist",
"file_path": file_path
}
# Get file info
file_size = os.path.getsize(file_path)
file_ext = os.path.splitext(original_filename)[1].lower()
# Different processing based on file type
supported_extensions = ['.txt', '.md', '.csv', '.xlsx', '.zip']
if file_ext not in supported_extensions:
return {
"status": "error",
"message": f"Unsupported file type: {file_ext}",
"file_path": file_path,
"supported_extensions": supported_extensions
}
# Copy file to target directory
target_file_path = os.path.join(target_dir, original_filename)
# If target file already exists, add timestamp
if os.path.exists(target_file_path):
name, ext = os.path.splitext(original_filename)
timestamp = int(time.time())
target_file_path = os.path.join(target_dir, f"{name}_{timestamp}{ext}")
shutil.copy2(file_path, target_file_path)
# Get file preview (if it's a text file)
preview = None
if file_ext in ['.txt', '.md']:
preview = get_document_preview(target_file_path, max_lines=5)
return {
"status": "success",
"message": "File processed successfully",
"original_path": file_path,
"target_path": target_file_path,
"file_size": file_size,
"file_extension": file_ext,
"preview": preview
}
except Exception as e:
return {
"status": "error",
"message": f"Error processing file: {str(e)}",
"file_path": file_path
}
# Periodic task example: clean up files older than 30 days daily at 2 AM
@huey.periodic_task(crontab(hour=2, minute=0))
def daily_cleanup():
"""Daily cleanup task."""
logger.info("Running daily cleanup task")
# Add cleanup logic here
return {"status": "completed", "message": "Daily cleanup task completed"}

View File

@ -13,23 +13,6 @@ from .file_utils import (
save_processed_files_log
)
from .dataset_manager import (
download_dataset_files,
generate_dataset_structure,
remove_dataset_directory,
remove_dataset_directory_by_key
)
from .project_manager import (
generate_project_readme,
save_project_readme,
get_project_status,
remove_project,
list_projects,
get_project_stats
)
from .system_optimizer import (
setup_system_optimizations
)
@ -59,11 +42,6 @@ from .api_models import (
ProjectListResponse,
ProjectStatsResponse,
ProjectActionResponse,
QueueTaskRequest,
IncrementalTaskRequest,
QueueTaskResponse,
QueueStatusResponse,
TaskStatusResponse,
create_success_response,
create_error_response,
create_chat_response,
@ -90,20 +68,6 @@ __all__ = [
'load_processed_files_log',
'save_processed_files_log',
# dataset_manager
'download_dataset_files',
'generate_dataset_structure',
'remove_dataset_directory',
'remove_dataset_directory_by_key',
# project_manager
'generate_project_readme',
'save_project_readme',
'get_project_status',
'remove_project',
'list_projects',
'get_project_stats',
# agent_pool
'AgentPool',
'get_agent_pool',
@ -128,10 +92,6 @@ __all__ = [
'ProjectListResponse',
'ProjectStatsResponse',
'ProjectActionResponse',
'QueueTaskRequest',
'QueueTaskResponse',
'QueueStatusResponse',
'TaskStatusResponse',
'create_success_response',
'create_error_response',
'create_chat_response',

View File

@ -224,133 +224,6 @@ def create_error_response(message: str, error_type: str = "error", **kwargs) ->
}
class QueueTaskRequest(BaseModel):
"""Queue task request model"""
dataset_id: str
files: Optional[Dict[str, List[str]]] = Field(default=None, description="Files organized by key groups. Each key maps to a list of file paths (supports zip files)")
upload_folder: Optional[Dict[str, str]] = Field(default=None, description="Upload folders organized by group names. Each key maps to a folder name. Example: {'group1': 'my_project1', 'group2': 'my_project2'}")
priority: Optional[int] = Field(default=0, description="Task priority (higher number = higher priority)")
delay: Optional[int] = Field(default=0, description="Delay execution by N seconds")
model_config = ConfigDict(extra='allow')
@field_validator('upload_folder', mode='before')
@classmethod
def validate_upload_folder(cls, v):
"""Validate upload_folder dict format"""
if v is None:
return None
if isinstance(v, dict):
# Validate dict format
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Key in upload_folder dict must be string, got {type(key)}")
if not isinstance(value, str):
raise ValueError(f"Value in upload_folder dict must be string (folder name), got {type(value)} for key '{key}'")
return v
else:
raise ValueError(f"upload_folder must be a dict with group names as keys and folder names as values, got {type(v)}")
@field_validator('files', mode='before')
@classmethod
def validate_files(cls, v):
"""Validate dict format with key-grouped files"""
if v is None:
return None
if isinstance(v, dict):
# Validate dict format
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Key in files dict must be string, got {type(key)}")
if not isinstance(value, list):
raise ValueError(f"Value in files dict must be list, got {type(value)} for key '{key}'")
for item in value:
if not isinstance(item, str):
raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'")
return v
else:
raise ValueError(f"Files must be a dict with key groups, got {type(v)}")
class IncrementalTaskRequest(BaseModel):
"""Incremental file processing request model"""
dataset_id: str = Field(..., description="Dataset ID for the project")
files_to_add: Optional[Dict[str, List[str]]] = Field(default=None, description="Files to add organized by key groups")
files_to_remove: Optional[Dict[str, List[str]]] = Field(default=None, description="Files to remove organized by key groups")
system_prompt: Optional[str] = None
mcp_settings: Optional[List[Dict]] = None
priority: Optional[int] = Field(default=0, description="Task priority (higher number = higher priority)")
delay: Optional[int] = Field(default=0, description="Delay execution by N seconds")
model_config = ConfigDict(extra='allow')
@field_validator('files_to_add', mode='before')
@classmethod
def validate_files_to_add(cls, v):
"""Validate files_to_add dict format"""
if v is None:
return None
if isinstance(v, dict):
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Key in files_to_add dict must be string, got {type(key)}")
if not isinstance(value, list):
raise ValueError(f"Value in files_to_add dict must be list, got {type(value)} for key '{key}'")
for item in value:
if not isinstance(item, str):
raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'")
return v
else:
raise ValueError(f"files_to_add must be a dict with key groups, got {type(v)}")
@field_validator('files_to_remove', mode='before')
@classmethod
def validate_files_to_remove(cls, v):
"""Validate files_to_remove dict format"""
if v is None:
return None
if isinstance(v, dict):
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Key in files_to_remove dict must be string, got {type(key)}")
if not isinstance(value, list):
raise ValueError(f"Value in files_to_remove dict must be list, got {type(value)} for key '{key}'")
for item in value:
if not isinstance(item, str):
raise ValueError(f"File paths must be strings, got {type(item)} in key '{key}'")
return v
else:
raise ValueError(f"files_to_remove must be a dict with key groups, got {type(v)}")
class QueueTaskResponse(BaseModel):
"""Queue task response model"""
success: bool
message: str
dataset_id: str
task_id: Optional[str] = None
task_status: Optional[str] = None
estimated_processing_time: Optional[int] = None # seconds
class QueueStatusResponse(BaseModel):
"""Queue status response model"""
success: bool
message: str
queue_stats: Dict[str, Any]
pending_tasks: List[Dict[str, Any]]
class TaskStatusResponse(BaseModel):
"""Task status response model"""
success: bool
message: str
task_id: str
task_status: Optional[str] = None
task_result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
def create_chat_response(
messages: List[Message],
model: str,

View File

@ -1,439 +0,0 @@
#!/usr/bin/env python3
"""
Data merging functions for combining processed file results.
"""
import os
import pickle
import logging
from typing import Dict
from utils.settings import EMBEDDING_MODEL_NAME
# Configure logger
logger = logging.getLogger('app')
# Try to import numpy, but handle if missing
try:
import numpy as np
NUMPY_SUPPORT = True
except ImportError:
logger.warning("NumPy not available, some embedding features may be limited")
NUMPY_SUPPORT = False
def merge_documents_by_group(unique_id: str, group_name: str) -> Dict:
"""Merge all document.txt files in a group into a single document."""
processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name)
dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name)
os.makedirs(dataset_group_dir, exist_ok=True)
merged_document_path = os.path.join(dataset_group_dir, "document.txt")
result = {
"success": False,
"merged_document_path": merged_document_path,
"source_files": [],
"total_pages": 0,
"total_characters": 0,
"error": None
}
try:
# Find all document.txt files in the processed directory
document_files = []
if os.path.exists(processed_group_dir):
for item in os.listdir(processed_group_dir):
item_path = os.path.join(processed_group_dir, item)
if os.path.isdir(item_path):
document_path = os.path.join(item_path, "document.txt")
if os.path.exists(document_path) and os.path.getsize(document_path) > 0:
document_files.append((item, document_path))
if not document_files:
result["error"] = "No document files found to merge"
return result
# Merge all documents with page separators
merged_content = []
total_characters = 0
for filename_stem, document_path in sorted(document_files):
try:
with open(document_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
merged_content.append(f"# Page {filename_stem}")
merged_content.append(content)
total_characters += len(content)
result["source_files"].append(filename_stem)
except Exception as e:
logger.error(f"Error reading document file {document_path}: {str(e)}")
continue
if merged_content:
# Write merged document
with open(merged_document_path, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(merged_content))
result["total_pages"] = len(document_files)
result["total_characters"] = total_characters
result["success"] = True
else:
result["error"] = "No valid content found in document files"
except Exception as e:
result["error"] = f"Document merging failed: {str(e)}"
logger.error(f"Error merging documents for group {group_name}: {str(e)}")
return result
def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict:
"""Merge all pagination.txt files in a group."""
processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name)
dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name)
os.makedirs(dataset_group_dir, exist_ok=True)
merged_pagination_path = os.path.join(dataset_group_dir, "pagination.txt")
result = {
"success": False,
"merged_pagination_path": merged_pagination_path,
"source_files": [],
"total_lines": 0,
"error": None
}
try:
# Find all pagination.txt files
pagination_files = []
if os.path.exists(processed_group_dir):
for item in os.listdir(processed_group_dir):
item_path = os.path.join(processed_group_dir, item)
if os.path.isdir(item_path):
pagination_path = os.path.join(item_path, "pagination.txt")
if os.path.exists(pagination_path) and os.path.getsize(pagination_path) > 0:
pagination_files.append((item, pagination_path))
if not pagination_files:
result["error"] = "No pagination files found to merge"
return result
# Merge all pagination files
merged_lines = []
for filename_stem, pagination_path in sorted(pagination_files):
try:
with open(pagination_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if line:
merged_lines.append(line)
result["source_files"].append(filename_stem)
except Exception as e:
logger.error(f"Error reading pagination file {pagination_path}: {str(e)}")
continue
if merged_lines:
# Write merged pagination
with open(merged_pagination_path, 'w', encoding='utf-8') as f:
for line in merged_lines:
f.write(f"{line}\n")
result["total_lines"] = len(merged_lines)
result["success"] = True
else:
result["error"] = "No valid pagination data found"
except Exception as e:
result["error"] = f"Pagination merging failed: {str(e)}"
logger.error(f"Error merging paginations for group {group_name}: {str(e)}")
return result
def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict:
"""Merge all embedding.pkl files in a group."""
processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name)
dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name)
os.makedirs(dataset_group_dir, exist_ok=True)
merged_embedding_path = os.path.join(dataset_group_dir, "embedding.pkl")
result = {
"success": False,
"merged_embedding_path": merged_embedding_path,
"source_files": [],
"total_chunks": 0,
"total_dimensions": 0,
"error": None
}
try:
# Find all embedding.pkl files
embedding_files = []
if os.path.exists(processed_group_dir):
for item in os.listdir(processed_group_dir):
item_path = os.path.join(processed_group_dir, item)
if os.path.isdir(item_path):
embedding_path = os.path.join(item_path, "embedding.pkl")
if os.path.exists(embedding_path) and os.path.getsize(embedding_path) > 0:
embedding_files.append((item, embedding_path))
if not embedding_files:
result["error"] = "No embedding files found to merge"
return result
# Load and merge all embedding data
all_chunks = []
all_embeddings = [] # Fix: collect all embedding vectors
total_chunks = 0
dimensions = 0
chunking_strategy = 'unknown'
chunking_params = {}
model_path = EMBEDDING_MODEL_NAME
for filename_stem, embedding_path in sorted(embedding_files):
try:
with open(embedding_path, 'rb') as f:
embedding_data = pickle.load(f)
if isinstance(embedding_data, dict) and 'chunks' in embedding_data:
chunks = embedding_data['chunks']
# Get embedding vectors (critical fix)
if 'embeddings' in embedding_data:
embeddings = embedding_data['embeddings']
all_embeddings.append(embeddings)
# Get model metadata from the first file
if 'model_path' in embedding_data:
model_path = embedding_data['model_path']
if 'chunking_strategy' in embedding_data:
chunking_strategy = embedding_data['chunking_strategy']
if 'chunking_params' in embedding_data:
chunking_params = embedding_data['chunking_params']
# Add source file metadata to each chunk
for chunk in chunks:
if isinstance(chunk, dict):
chunk['source_file'] = filename_stem
chunk['source_group'] = group_name
elif isinstance(chunk, str):
# If the chunk is a string, keep it unchanged
pass
all_chunks.extend(chunks)
total_chunks += len(chunks)
result["source_files"].append(filename_stem)
except Exception as e:
logger.error(f"Error loading embedding file {embedding_path}: {str(e)}")
continue
if all_chunks and all_embeddings:
# Merge all embedding vectors
try:
# Try merging tensors with torch
import torch
if all(isinstance(emb, torch.Tensor) for emb in all_embeddings):
merged_embeddings = torch.cat(all_embeddings, dim=0)
dimensions = merged_embeddings.shape[1]
else:
# If the values are not tensors, try converting them to numpy
import numpy as np
if NUMPY_SUPPORT:
np_embeddings = []
for emb in all_embeddings:
if hasattr(emb, 'numpy'):
np_embeddings.append(emb.numpy())
elif isinstance(emb, np.ndarray):
np_embeddings.append(emb)
else:
# If conversion fails, skip this file
logger.warning(f"Warning: Cannot convert embedding to numpy from file {filename_stem}")
continue
if np_embeddings:
merged_embeddings = np.concatenate(np_embeddings, axis=0)
dimensions = merged_embeddings.shape[1]
else:
result["error"] = "No valid embedding tensors could be merged"
return result
else:
result["error"] = "NumPy not available for merging embeddings"
return result
except ImportError:
# If torch is unavailable, try using numpy
if NUMPY_SUPPORT:
import numpy as np
np_embeddings = []
for emb in all_embeddings:
if hasattr(emb, 'numpy'):
np_embeddings.append(emb.numpy())
elif isinstance(emb, np.ndarray):
np_embeddings.append(emb)
else:
logger.warning(f"Warning: Cannot convert embedding to numpy from file {filename_stem}")
continue
if np_embeddings:
merged_embeddings = np.concatenate(np_embeddings, axis=0)
dimensions = merged_embeddings.shape[1]
else:
result["error"] = "No valid embedding tensors could be merged"
return result
else:
result["error"] = "Neither torch nor numpy available for merging embeddings"
return result
except Exception as e:
result["error"] = f"Failed to merge embedding tensors: {str(e)}"
logger.error(f"Error merging embedding tensors: {str(e)}")
return result
# Create merged embedding data structure
merged_embedding_data = {
'chunks': all_chunks,
'embeddings': merged_embeddings, # Critical fix: include the embeddings key
'total_chunks': total_chunks,
'dimensions': dimensions,
'source_files': result["source_files"],
'group_name': group_name,
'merged_at': str(__import__('time').time()),
'chunking_strategy': chunking_strategy,
'chunking_params': chunking_params,
'model_path': model_path
}
# Save merged embeddings
with open(merged_embedding_path, 'wb') as f:
pickle.dump(merged_embedding_data, f)
result["total_chunks"] = total_chunks
result["total_dimensions"] = dimensions
result["success"] = True
else:
result["error"] = "No valid embedding data found"
except Exception as e:
result["error"] = f"Embedding merging failed: {str(e)}"
logger.error(f"Error merging embeddings for group {group_name}: {str(e)}")
return result
def merge_all_data_by_group(unique_id: str, group_name: str) -> Dict:
"""Merge documents, paginations, and embeddings for a group."""
merge_results = {
"group_name": group_name,
"unique_id": unique_id,
"success": True,
"document_merge": None,
"pagination_merge": None,
"embedding_merge": None,
"errors": []
}
# Merge documents
document_result = merge_documents_by_group(unique_id, group_name)
merge_results["document_merge"] = document_result
if not document_result["success"]:
merge_results["success"] = False
merge_results["errors"].append(f"Document merge failed: {document_result['error']}")
# Merge paginations
pagination_result = merge_paginations_by_group(unique_id, group_name)
merge_results["pagination_merge"] = pagination_result
if not pagination_result["success"]:
merge_results["success"] = False
merge_results["errors"].append(f"Pagination merge failed: {pagination_result['error']}")
# Merge embeddings
embedding_result = merge_embeddings_by_group(unique_id, group_name)
merge_results["embedding_merge"] = embedding_result
if not embedding_result["success"]:
merge_results["success"] = False
merge_results["errors"].append(f"Embedding merge failed: {embedding_result['error']}")
return merge_results
def get_group_merge_status(unique_id: str, group_name: str) -> Dict:
"""Get the status of merged data for a group."""
dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name)
status = {
"group_name": group_name,
"unique_id": unique_id,
"dataset_dir_exists": os.path.exists(dataset_group_dir),
"document_exists": False,
"document_size": 0,
"pagination_exists": False,
"pagination_size": 0,
"embedding_exists": False,
"embedding_size": 0,
"merge_complete": False
}
if os.path.exists(dataset_group_dir):
document_path = os.path.join(dataset_group_dir, "document.txt")
pagination_path = os.path.join(dataset_group_dir, "pagination.txt")
embedding_path = os.path.join(dataset_group_dir, "embedding.pkl")
if os.path.exists(document_path):
status["document_exists"] = True
status["document_size"] = os.path.getsize(document_path)
if os.path.exists(pagination_path):
status["pagination_exists"] = True
status["pagination_size"] = os.path.getsize(pagination_path)
if os.path.exists(embedding_path):
status["embedding_exists"] = True
status["embedding_size"] = os.path.getsize(embedding_path)
# Check if all files exist and are not empty
if (status["document_exists"] and status["document_size"] > 0 and
status["pagination_exists"] and status["pagination_size"] > 0 and
status["embedding_exists"] and status["embedding_size"] > 0):
status["merge_complete"] = True
return status
def cleanup_dataset_group(unique_id: str, group_name: str) -> bool:
"""Clean up merged dataset files for a group."""
dataset_group_dir = os.path.join("projects", "data", unique_id, "datasets", group_name)
try:
if os.path.exists(dataset_group_dir):
import shutil
shutil.rmtree(dataset_group_dir)
logger.info(f"Cleaned up dataset group: {group_name}")
return True
else:
return True # Nothing to clean up
except Exception as e:
logger.error(f"Error cleaning up dataset group {group_name}: {str(e)}")
return False

View File

@ -1,297 +0,0 @@
#!/usr/bin/env python3
"""
Dataset management functions for organizing and processing datasets.
New implementation with per-file processing and group merging.
"""
import os
import json
import logging
from typing import Dict, List
# Configure logger
logger = logging.getLogger('app')
# Import new modules
from utils.file_manager import (
ensure_directories, sync_files_to_group, cleanup_orphaned_files,
get_group_files_list
)
from utils.single_file_processor import (
process_single_file, check_file_already_processed
)
from utils.data_merger import (
merge_all_data_by_group, cleanup_dataset_group
)
async def download_dataset_files(unique_id: str, files: Dict[str, List[str]], incremental_mode: bool = False) -> Dict[str, List[str]]:
"""
Process dataset files with new architecture:
1. Sync files to group directories
2. Process each file individually
3. Merge results by group
4. Clean up orphaned files (only in non-incremental mode)
Args:
unique_id: Project ID
files: Dictionary of files to process, grouped by key
incremental_mode: If True, preserve existing files and only process new ones
"""
if not files:
return {}
logger.info(f"Starting {'incremental' if incremental_mode else 'full'} file processing for project: {unique_id}")
# Ensure project directories exist
ensure_directories(unique_id)
# Step 1: Sync files to group directories
logger.info("Step 1: Syncing files to group directories...")
synced_files, failed_files = sync_files_to_group(unique_id, files, incremental_mode)
# Step 2: Detect changes and cleanup orphaned files (only in non-incremental mode)
from utils.file_manager import detect_file_changes
changes = detect_file_changes(unique_id, files, incremental_mode)
# Only cleanup orphaned files in non-incremental mode or when files are explicitly removed
if not incremental_mode and any(changes["removed"].values()):
logger.info("Step 2: Cleaning up orphaned files...")
removed_files = cleanup_orphaned_files(unique_id, changes)
logger.info(f"Removed orphaned files: {removed_files}")
elif incremental_mode:
logger.info("Step 2: Skipping cleanup in incremental mode to preserve existing files")
# Step 3: Process individual files
logger.info("Step 3: Processing individual files...")
processed_files_by_group = {}
processing_results = {}
for group_name, file_list in files.items():
processed_files_by_group[group_name] = []
processing_results[group_name] = []
for file_path in file_list:
filename = os.path.basename(file_path)
# Get local file path
local_path = os.path.join("projects", "data", unique_id, "files", group_name, filename)
# Skip if file doesn't exist (might be remote file that failed to download)
if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')):
logger.warning(f"Skipping non-existent file: {filename}")
continue
# Check if already processed
if check_file_already_processed(unique_id, group_name, filename):
logger.info(f"Skipping already processed file: {filename}")
processed_files_by_group[group_name].append(filename)
processing_results[group_name].append({
"filename": filename,
"status": "existing"
})
continue
# Process the file
logger.info(f"Processing file: {filename} (group: {group_name})")
result = await process_single_file(unique_id, group_name, filename, file_path, local_path)
processing_results[group_name].append(result)
if result["success"]:
processed_files_by_group[group_name].append(filename)
logger.info(f" Successfully processed {filename}")
else:
logger.error(f" Failed to process {filename}: {result['error']}")
# Step 4: Merge results by group
logger.info("Step 4: Merging results by group...")
merge_results = {}
for group_name in processed_files_by_group.keys():
# Get all files in the group (including existing ones)
group_files = get_group_files_list(unique_id, group_name)
if group_files:
logger.info(f"Merging group: {group_name} with {len(group_files)} files")
merge_result = merge_all_data_by_group(unique_id, group_name)
merge_results[group_name] = merge_result
if merge_result["success"]:
logger.info(f" Successfully merged group {group_name}")
else:
logger.error(f" Failed to merge group {group_name}: {merge_result['errors']}")
# Step 5: Save processing log
logger.info("Step 5: Saving processing log...")
await save_processing_log(unique_id, files, synced_files, processing_results, merge_results)
logger.info(f"File processing completed for project: {unique_id}")
return processed_files_by_group
async def save_processing_log(
unique_id: str,
requested_files: Dict[str, List[str]],
synced_files: Dict,
processing_results: Dict,
merge_results: Dict
):
"""Save comprehensive processing log."""
log_data = {
"unique_id": unique_id,
"timestamp": str(os.path.getmtime("projects") if os.path.exists("projects") else 0),
"requested_files": requested_files,
"synced_files": synced_files,
"processing_results": processing_results,
"merge_results": merge_results,
"summary": {
"total_groups": len(requested_files),
"total_files_requested": sum(len(files) for files in requested_files.values()),
"total_files_processed": sum(
len([r for r in results if r.get("success", False)])
for results in processing_results.values()
),
"total_groups_merged": len([r for r in merge_results.values() if r.get("success", False)])
}
}
log_file_path = os.path.join("projects", "data", unique_id, "processing_log.json")
try:
with open(log_file_path, 'w', encoding='utf-8') as f:
json.dump(log_data, f, ensure_ascii=False, indent=2)
logger.info(f"Processing log saved to: {log_file_path}")
except Exception as e:
logger.error(f"Error saving processing log: {str(e)}")
def generate_dataset_structure(unique_id: str) -> str:
"""Generate a string representation of the dataset structure"""
project_dir = os.path.join("projects", "data", unique_id)
structure = []
def add_directory_contents(dir_path: str, prefix: str = ""):
try:
if not os.path.exists(dir_path):
structure.append(f"{prefix}└── (not found)")
return
items = sorted(os.listdir(dir_path))
for i, item in enumerate(items):
item_path = os.path.join(dir_path, item)
is_last = i == len(items) - 1
current_prefix = "└── " if is_last else "├── "
structure.append(f"{prefix}{current_prefix}{item}")
if os.path.isdir(item_path):
next_prefix = prefix + (" " if is_last else "")
add_directory_contents(item_path, next_prefix)
except Exception as e:
structure.append(f"{prefix}└── Error: {str(e)}")
# Add files directory structure
files_dir = os.path.join(project_dir, "files")
structure.append("files/")
add_directory_contents(files_dir, "")
# Add processed directory structure
processed_dir = os.path.join(project_dir, "processed")
structure.append("\nprocessed/")
add_directory_contents(processed_dir, "")
# Add dataset directory structure
dataset_dir = os.path.join(project_dir, "datasets")
structure.append("\ndataset/")
add_directory_contents(dataset_dir, "")
return "\n".join(structure)
def get_processing_status(unique_id: str) -> Dict:
"""Get comprehensive processing status for a project."""
project_dir = os.path.join("projects", "data", unique_id)
if not os.path.exists(project_dir):
return {
"project_exists": False,
"unique_id": unique_id
}
status = {
"project_exists": True,
"unique_id": unique_id,
"directories": {
"files": os.path.exists(os.path.join(project_dir, "files")),
"processed": os.path.exists(os.path.join(project_dir, "processed")),
"dataset": os.path.exists(os.path.join(project_dir, "datasets"))
},
"groups": {},
"processing_log_exists": os.path.exists(os.path.join(project_dir, "processing_log.json"))
}
# Check each group's status
files_dir = os.path.join(project_dir, "files")
if os.path.exists(files_dir):
for group_name in os.listdir(files_dir):
group_path = os.path.join(files_dir, group_name)
if os.path.isdir(group_path):
status["groups"][group_name] = {
"files_count": len([
f for f in os.listdir(group_path)
if os.path.isfile(os.path.join(group_path, f))
]),
"merge_status": "pending"
}
# Check merge status for each group
dataset_dir = os.path.join(project_dir, "datasets")
if os.path.exists(dataset_dir):
for group_name in os.listdir(dataset_dir):
group_path = os.path.join(dataset_dir, group_name)
if os.path.isdir(group_path):
if group_name in status["groups"]:
# Check if merge is complete
document_path = os.path.join(group_path, "document.txt")
pagination_path = os.path.join(group_path, "pagination.txt")
embedding_path = os.path.join(group_path, "embedding.pkl")
if (os.path.exists(document_path) and os.path.exists(pagination_path) and
os.path.exists(embedding_path)):
status["groups"][group_name]["merge_status"] = "completed"
else:
status["groups"][group_name]["merge_status"] = "incomplete"
else:
status["groups"][group_name] = {
"files_count": 0,
"merge_status": "completed"
}
return status
def remove_dataset_directory(unique_id: str, filename_without_ext: str):
"""Remove a specific dataset directory (deprecated - use new structure)"""
# This function is kept for compatibility but delegates to new structure
dataset_path = os.path.join("projects", "data", unique_id, "processed", filename_without_ext)
if os.path.exists(dataset_path):
import shutil
shutil.rmtree(dataset_path)
def remove_dataset_directory_by_key(unique_id: str, key: str):
"""Remove dataset directory by key (group name)"""
# Remove files directory
files_group_path = os.path.join("projects", "data", unique_id, "files", key)
if os.path.exists(files_group_path):
import shutil
shutil.rmtree(files_group_path)
# Remove processed directory
processed_group_path = os.path.join("projects", "data", unique_id, "processed", key)
if os.path.exists(processed_group_path):
import shutil
shutil.rmtree(processed_group_path)
# Remove dataset directory
cleanup_dataset_group(unique_id, key)

View File

@ -1,343 +0,0 @@
#!/usr/bin/env python3
"""
Project management functions for handling projects, README generation, and status tracking.
"""
import os
import json
import logging
from typing import Dict, List, Optional
from pathlib import Path
# Configure logger
logger = logging.getLogger('app')
from utils.file_utils import get_document_preview, load_processed_files_log
def generate_directory_tree(project_dir: str, unique_id: str, max_depth: int = 3) -> str:
"""Generate dataset directory tree structure for the project"""
def _build_tree(path: str, prefix: str = "", is_last: bool = True, depth: int = 0) -> List[str]:
if depth > max_depth:
return []
lines = []
try:
entries = sorted(os.listdir(path))
# Separate directories and files
dirs = [e for e in entries if os.path.isdir(os.path.join(path, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(path, e)) and not e.startswith('.')]
entries = dirs + files
for i, entry in enumerate(entries):
entry_path = os.path.join(path, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
# Choose the appropriate tree symbols
if is_last_entry:
connector = "└── "
new_prefix = prefix + " "
else:
connector = "├── "
new_prefix = prefix + ""
# Add entry line
line = prefix + connector + entry
if is_dir:
line += "/"
lines.append(line)
# Recursively add subdirectories
if is_dir and depth < max_depth:
sub_lines = _build_tree(entry_path, new_prefix, is_last_entry, depth + 1)
lines.extend(sub_lines)
except PermissionError:
lines.append(prefix + "└── [Permission Denied]")
except Exception as e:
lines.append(prefix + "└── [Error: " + str(e) + "]")
return lines
# Start building tree from dataset directory
dataset_dir = os.path.join(project_dir, "datasets")
tree_lines = []
if not os.path.exists(dataset_dir):
return "└── [No dataset directory found]"
try:
entries = sorted(os.listdir(dataset_dir))
dirs = [e for e in entries if os.path.isdir(os.path.join(dataset_dir, e)) and not e.startswith('.')]
files = [e for e in entries if os.path.isfile(os.path.join(dataset_dir, e)) and not e.startswith('.')]
entries = dirs + files
if not entries:
tree_lines.append("└── [Empty dataset directory]")
else:
for i, entry in enumerate(entries):
entry_path = os.path.join(dataset_dir, entry)
is_dir = os.path.isdir(entry_path)
is_last_entry = i == len(entries) - 1
if is_last_entry:
connector = "└── "
prefix = " "
else:
connector = "├── "
prefix = ""
line = connector + entry
if is_dir:
line += "/"
tree_lines.append(line)
# Recursively add subdirectories
if is_dir:
sub_lines = _build_tree(entry_path, prefix, is_last_entry, 1)
tree_lines.extend(sub_lines)
except Exception as e:
tree_lines.append(f"└── [Error generating tree: {str(e)}]")
return "\n".join(tree_lines)
def generate_project_readme(unique_id: str) -> str:
"""Generate README.md content for a project"""
project_dir = os.path.join("projects", "data", unique_id)
readme_content = f"""# Project: {unique_id}
## Project Overview
This project contains processed documents and their associated embeddings for semantic search.
## Directory Structure
"""
# Generate directory tree
readme_content += "```\n"
readme_content += generate_directory_tree(project_dir, unique_id)
readme_content += "\n```\n\n"
readme_content += """## Dataset Structure
"""
dataset_dir = os.path.join(project_dir, "datasets")
if not os.path.exists(dataset_dir):
readme_content += "No dataset files available.\n"
else:
# Get all document directories
doc_dirs = []
try:
for item in sorted(os.listdir(dataset_dir)):
item_path = os.path.join(dataset_dir, item)
if os.path.isdir(item_path):
doc_dirs.append(item)
except Exception as e:
logger.error(f"Error listing dataset directories: {str(e)}")
if not doc_dirs:
readme_content += "No document directories found.\n"
else:
for doc_dir in doc_dirs:
doc_path = os.path.join(dataset_dir, doc_dir)
document_file = os.path.join(doc_path, "document.txt")
pagination_file = os.path.join(doc_path, "pagination.txt")
embeddings_file = os.path.join(doc_path, "embedding.pkl")
readme_content += f"### {doc_dir}\n\n"
readme_content += f"**Files:**\n"
readme_content += f"- `{doc_dir}/document.txt`"
if os.path.exists(document_file):
readme_content += ""
readme_content += "\n"
readme_content += f"- `{doc_dir}/pagination.txt`"
if os.path.exists(pagination_file):
readme_content += ""
readme_content += "\n"
readme_content += f"- `{doc_dir}/embedding.pkl`"
if os.path.exists(embeddings_file):
readme_content += ""
readme_content += "\n\n"
# Add document preview
if os.path.exists(document_file):
readme_content += f"**Content Preview (first 10 lines):**\n\n```\n"
preview = get_document_preview(document_file, 10)
readme_content += preview
readme_content += "\n```\n\n"
else:
readme_content += f"**Content Preview:** Not available\n\n"
readme_content += f"""---
*Generated on {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
return readme_content
def save_project_readme(unique_id: str):
"""Save README.md for a project"""
readme_content = generate_project_readme(unique_id)
readme_path = os.path.join("projects", "data", unique_id, "README.md")
try:
os.makedirs(os.path.dirname(readme_path), exist_ok=True)
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info(f"Generated README.md for project {unique_id}")
return readme_path
except Exception as e:
logger.error(f"Error generating README for project {unique_id}: {str(e)}")
return None
def get_project_status(unique_id: str) -> Dict:
"""Get comprehensive status of a project"""
project_dir = os.path.join("projects", "data", unique_id)
project_exists = os.path.exists(project_dir)
if not project_exists:
return {
"unique_id": unique_id,
"project_exists": False,
"error": "Project not found"
}
# Get processed log
processed_log = load_processed_files_log(unique_id)
# Collect document.txt files
document_files = []
dataset_dir = os.path.join(project_dir, "datasets")
if os.path.exists(dataset_dir):
for root, dirs, files in os.walk(dataset_dir):
for file in files:
if file == "document.txt":
document_files.append(os.path.join(root, file))
# Check system prompt and MCP settings
system_prompt_file = os.path.join(project_dir, "system_prompt.txt")
mcp_settings_file = os.path.join(project_dir, "mcp_settings.json")
status = {
"unique_id": unique_id,
"project_exists": True,
"project_path": project_dir,
"processed_files_count": len(processed_log),
"processed_files": processed_log,
"document_files_count": len(document_files),
"document_files": document_files,
"has_system_prompt": os.path.exists(system_prompt_file),
"has_mcp_settings": os.path.exists(mcp_settings_file),
"readme_exists": os.path.exists(os.path.join(project_dir, "README.md")),
"log_file_exists": os.path.exists(os.path.join(project_dir, "processed_files.json"))
}
# Add dataset structure
try:
from utils.dataset_manager import generate_dataset_structure
status["dataset_structure"] = generate_dataset_structure(unique_id)
except Exception as e:
status["dataset_structure"] = f"Error generating structure: {str(e)}"
return status
def remove_project(unique_id: str) -> bool:
"""Remove entire project directory"""
project_dir = os.path.join("projects", "data", unique_id)
try:
if os.path.exists(project_dir):
import shutil
shutil.rmtree(project_dir)
logger.info(f"Removed project directory: {project_dir}")
return True
else:
logger.warning(f"Project directory not found: {project_dir}")
return False
except Exception as e:
logger.error(f"Error removing project {unique_id}: {str(e)}")
return False
def list_projects() -> List[str]:
"""List all existing project IDs"""
projects_dir = "projects"
if not os.path.exists(projects_dir):
return []
try:
return [item for item in os.listdir(projects_dir)
if os.path.isdir(os.path.join(projects_dir, item))]
except Exception as e:
logger.error(f"Error listing projects: {str(e)}")
return []
def get_project_stats(unique_id: str) -> Dict:
"""Get statistics for a specific project"""
status = get_project_status(unique_id)
if not status["project_exists"]:
return status
stats = {
"unique_id": unique_id,
"total_processed_files": status["processed_files_count"],
"total_document_files": status["document_files_count"],
"has_system_prompt": status["has_system_prompt"],
"has_mcp_settings": status["has_mcp_settings"],
"has_readme": status["readme_exists"]
}
# Calculate file sizes
total_size = 0
document_sizes = []
for doc_file in status["document_files"]:
try:
size = os.path.getsize(doc_file)
document_sizes.append({
"file": doc_file,
"size": size,
"size_mb": round(size / (1024 * 1024), 2)
})
total_size += size
except Exception:
pass
stats["total_document_size"] = total_size
stats["total_document_size_mb"] = round(total_size / (1024 * 1024), 2)
stats["document_files_detail"] = document_sizes
# Check embeddings files
embedding_files = []
dataset_dir = os.path.join("projects", "data", unique_id, "datasets")
if os.path.exists(dataset_dir):
for root, dirs, files in os.walk(dataset_dir):
for file in files:
if file == "embedding.pkl":
file_path = os.path.join(root, file)
try:
size = os.path.getsize(file_path)
embedding_files.append({
"file": file_path,
"size": size,
"size_mb": round(size / (1024 * 1024), 2)
})
except Exception:
pass
stats["embedding_files_count"] = len(embedding_files)
stats["embedding_files_detail"] = embedding_files
return stats

View File

@ -77,6 +77,15 @@ CHECKPOINT_CLEANUP_INACTIVE_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_INACTIVE_DA
CHECKPOINT_CLEANUP_INTERVAL_HOURS = int(os.getenv("CHECKPOINT_CLEANUP_INTERVAL_HOURS", "24"))
# ============================================================
# Redis Configuration (Huey task queue backend)
# ============================================================
# Redis connection URL.
# Format: redis://[:password]@host:port/db
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/1")
# ============================================================
# Mem0 long-term memory configuration
# ============================================================

View File

@ -1,301 +0,0 @@
#!/usr/bin/env python3
"""
Single file processing functions for handling individual files.
"""
import os
import tempfile
import zipfile
import logging
from typing import Dict, List, Tuple, Optional
from pathlib import Path
# Configure logger
logger = logging.getLogger('app')
from utils.file_utils import download_file
# Try to import excel/csv processor, but handle if dependencies are missing
try:
from utils.excel_csv_processor import (
is_excel_file, is_csv_file, process_excel_file, process_csv_file
)
EXCEL_CSV_SUPPORT = True
except ImportError as e:
logger.warning(f"Excel/CSV processing not available: {e}")
EXCEL_CSV_SUPPORT = False
# Fallback functions
def is_excel_file(file_path):
return file_path.lower().endswith(('.xlsx', '.xls'))
def is_csv_file(file_path):
return file_path.lower().endswith('.csv')
def process_excel_file(file_path):
return "", []
def process_csv_file(file_path):
return "", []
async def process_single_file(
unique_id: str,
group_name: str,
filename: str,
original_path: str,
local_path: str
) -> Dict:
"""
Process a single file and generate document.txt, pagination.txt, and embedding.pkl.
Returns:
Dict with processing results and file paths
"""
# Create output directory for this file
filename_stem = Path(filename).stem
output_dir = os.path.join("projects", "data", unique_id, "processed", group_name, filename_stem)
os.makedirs(output_dir, exist_ok=True)
result = {
"success": False,
"filename": filename,
"group": group_name,
"output_dir": output_dir,
"document_path": os.path.join(output_dir, "document.txt"),
"pagination_path": os.path.join(output_dir, "pagination.txt"),
"embedding_path": os.path.join(output_dir, "embedding.pkl"),
"error": None,
"content_size": 0,
"pagination_lines": 0,
"embedding_chunks": 0
}
try:
# Download file if it's remote and not yet downloaded
if original_path.startswith(('http://', 'https://')):
if not os.path.exists(local_path):
logger.info(f"Downloading {original_path} -> {local_path}")
success = await download_file(original_path, local_path)
if not success:
result["error"] = "Failed to download file"
return result
# Extract content from file
content, pagination_lines = await extract_file_content(local_path, filename)
if not content or not content.strip():
result["error"] = "No content extracted from file"
return result
# Write document.txt
with open(result["document_path"], 'w', encoding='utf-8') as f:
f.write(content)
result["content_size"] = len(content)
# Write pagination.txt
if pagination_lines:
with open(result["pagination_path"], 'w', encoding='utf-8') as f:
for line in pagination_lines:
if line.strip():
f.write(f"{line}\n")
result["pagination_lines"] = len(pagination_lines)
else:
# Generate pagination from text content
pagination_lines = generate_pagination_from_text(result["document_path"],
result["pagination_path"])
result["pagination_lines"] = len(pagination_lines)
# Generate embeddings
try:
embedding_chunks = await generate_embeddings_for_file(
result["document_path"], result["embedding_path"]
)
result["embedding_chunks"] = len(embedding_chunks) if embedding_chunks else 0
result["success"] = True
except Exception as e:
result["error"] = f"Embedding generation failed: {str(e)}"
logger.error(f"Failed to generate embeddings for {filename}: {str(e)}")
except Exception as e:
result["error"] = f"File processing failed: {str(e)}"
logger.error(f"Error processing file {filename}: {str(e)}")
return result
async def extract_file_content(file_path: str, filename: str) -> Tuple[str, List[str]]:
"""Extract content from various file formats."""
# Handle zip files
if filename.lower().endswith('.zip'):
return await extract_from_zip(file_path, filename)
# Handle Excel files
elif is_excel_file(file_path):
return await extract_from_excel(file_path, filename)
# Handle CSV files
elif is_csv_file(file_path):
return await extract_from_csv(file_path, filename)
# Handle text files
else:
return await extract_from_text(file_path, filename)
async def extract_from_zip(zip_path: str, filename: str) -> Tuple[str, List[str]]:
"""Extract content from zip file."""
content_parts = []
pagination_lines = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Extract to temporary directory
temp_dir = tempfile.mkdtemp(prefix=f"extract_{Path(filename).stem}_")
zip_ref.extractall(temp_dir)
# Process extracted files
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')):
file_path = os.path.join(root, file)
try:
file_content, file_pagination = await extract_file_content(file_path, file)
if file_content:
content_parts.append(f"# Page {file}")
content_parts.append(file_content)
pagination_lines.extend(file_pagination)
except Exception as e:
logger.error(f"Error processing extracted file {file}: {str(e)}")
# Clean up temporary directory
import shutil
shutil.rmtree(temp_dir)
except Exception as e:
logger.error(f"Error extracting zip file {filename}: {str(e)}")
return "", []
return '\n\n'.join(content_parts), pagination_lines
async def extract_from_excel(file_path: str, filename: str) -> Tuple[str, List[str]]:
"""Extract content from Excel file."""
try:
document_content, pagination_lines = process_excel_file(file_path)
if document_content:
content = f"# Page {filename}\n{document_content}"
return content, pagination_lines
else:
return "", []
except Exception as e:
logger.error(f"Error processing Excel file {filename}: {str(e)}")
return "", []
async def extract_from_csv(file_path: str, filename: str) -> Tuple[str, List[str]]:
"""Extract content from CSV file."""
try:
document_content, pagination_lines = process_csv_file(file_path)
if document_content:
content = f"# Page {filename}\n{document_content}"
return content, pagination_lines
else:
return "", []
except Exception as e:
logger.error(f"Error processing CSV file {filename}: {str(e)}")
return "", []
async def extract_from_text(file_path: str, filename: str) -> Tuple[str, List[str]]:
"""Extract content from text file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
return content, []
else:
return "", []
except Exception as e:
logger.error(f"Error reading text file {filename}: {str(e)}")
return "", []
def generate_pagination_from_text(document_path: str, pagination_path: str) -> List[str]:
"""Generate pagination from text document."""
try:
# Import embedding module for pagination
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding'))
from embedding import split_document_by_pages
pages = split_document_by_pages(str(document_path), str(pagination_path))
# Return pagination lines
pagination_lines = []
with open(pagination_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
pagination_lines.append(line.strip())
return pagination_lines
except Exception as e:
logger.error(f"Error generating pagination from text: {str(e)}")
return []
async def generate_embeddings_for_file(document_path: str, embedding_path: str) -> Optional[List]:
"""Generate embeddings for a document."""
try:
# Import embedding module
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding'))
from embedding import embed_document
# Generate embeddings using paragraph chunking
embedding_data = embed_document(
str(document_path),
str(embedding_path),
chunking_strategy='paragraph'
)
if embedding_data and 'chunks' in embedding_data:
return embedding_data['chunks']
else:
return None
except Exception as e:
logger.error(f"Error generating embeddings: {str(e)}")
return None
def check_file_already_processed(unique_id: str, group_name: str, filename: str) -> bool:
"""Check if a file has already been processed."""
filename_stem = Path(filename).stem
output_dir = os.path.join("projects", "data", unique_id, "processed", group_name, filename_stem)
document_path = os.path.join(output_dir, "document.txt")
pagination_path = os.path.join(output_dir, "pagination.txt")
embedding_path = os.path.join(output_dir, "embedding.pkl")
# Check if all files exist and are not empty
if (os.path.exists(document_path) and os.path.exists(pagination_path) and
os.path.exists(embedding_path)):
if (os.path.getsize(document_path) > 0 and os.path.getsize(pagination_path) > 0 and
os.path.getsize(embedding_path) > 0):
return True
return False