From e6b28818bf5fbda745df5bec6f3aaf103bc974e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Mon, 8 Jun 2026 19:00:55 +0800 Subject: [PATCH] add REDIS_URL --- poetry.lock | 39 +++++++++++++++----------------- pyproject.toml | 1 + requirements.txt | 2 +- task_queue/config.py | 25 ++++++++++---------- task_queue/consumer.py | 5 ++-- task_queue/manager.py | 6 ++--- task_queue/optimized_consumer.py | 11 ++------- utils/settings.py | 9 ++++++++ 8 files changed, 49 insertions(+), 49 deletions(-) diff --git a/poetry.lock b/poetry.lock index b9bba52..96d1a71 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2765,26 +2765,6 @@ cli = ["python-dotenv (>=1.0.0)", "typer (>=0.16.0)"] rich = ["rich (>=13.9.4)"] ws = ["websockets (>=15.0.1)"] -[[package]] -name = "mcp-ui-server" -version = "1.0.0" -description = "mcp-ui Server SDK for Python" -optional = false -python-versions = ">=3.10" -groups = ["main"] -files = [ - {file = "mcp_ui_server-1.0.0-py3-none-any.whl", hash = "sha256:85f53b2e4300fbd175f1fbb7c40f2566b1f4a4ad03a1f33647867c82a3159dcc"}, - {file = "mcp_ui_server-1.0.0.tar.gz", hash = "sha256:5ab8f17b93bf794966af7c35e9a575e4f21a9ba2bab3d316cfc107a15f88a3c9"}, -] - -[package.dependencies] -mcp = ">=1.0.0" -pydantic = ">=2.0.0" -typing-extensions = ">=4.0.0" - -[package.extras] -dev = ["pyright (>=1.1.0)", "pytest (>=7.0.0)", "ruff (>=0.1.0)"] - [[package]] name = "mdit-py-plugins" version = "0.5.0" @@ -4845,6 +4825,23 @@ urllib3 = ">=1.26.14,<3" fastembed = ["fastembed (>=0.7,<0.8)"] fastembed-gpu = ["fastembed-gpu (>=0.7,<0.8)"] +[[package]] +name = "redis" +version = "6.4.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f"}, + {file = "redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010"}, +] + +[package.extras] +hiredis = ["hiredis (>=3.2.0)"] +jwt = ["pyjwt (>=2.9.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"] + [[package]] name = "referencing" version = "0.37.0" @@ -7152,4 +7149,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt [metadata] lock-version = "2.1" python-versions = ">=3.12,<3.15" -content-hash = "9c949ca9f49b62502571dadab242919fad5e90621f187998e6abdcbcbd448fe4" +content-hash = "54aa16923236b17d28e18694c093d29786e9857bb5d294f435664a552ba6e483" diff --git a/pyproject.toml b/pyproject.toml index 3b30d87..a38d20c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "aiohttp", "aiofiles", "huey (>=2.5.3,<3.0.0)", + "redis (>=4.0,<7.0)", "pandas>=1.5.0", "openpyxl>=3.0.0", "xlrd>=2.0.0", diff --git a/requirements.txt b/requirements.txt index 01994d9..06f1a60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -95,7 +95,6 @@ linkify-it-py==2.1.0 ; python_version >= "3.12" and python_version < "3.15" markdown-it-py==4.0.0 ; python_version >= "3.12" and python_version < "3.15" markdownify==1.2.2 ; python_version >= "3.12" and python_version < "3.15" markupsafe==3.0.3 ; python_version >= "3.12" and python_version < "3.15" -mcp-ui-server==1.0.0 ; python_version >= "3.12" and python_version < "3.15" mcp==1.12.4 ; python_version >= "3.12" and python_version < "3.15" mdit-py-plugins==0.5.0 ; python_version >= "3.12" and python_version < "3.15" mdurl==0.1.2 ; python_version >= "3.12" and python_version < "3.15" @@ -162,6 +161,7 @@ pywin32==311 ; python_version >= "3.12" and python_version < "3.15" and (sys_pla pyyaml==6.0.3 ; python_version >= "3.12" and python_version < "3.15" qdrant-client==1.12.1 ; python_version >= "3.13" and python_version < "3.15" qdrant-client==1.16.2 ; python_version == "3.12" +redis==6.4.0 ; python_version >= "3.12" and python_version < "3.15" referencing==0.37.0 ; python_version >= "3.12" and python_version < "3.15" regex==2025.9.18 ; python_version >= "3.12" and python_version < "3.15" requests-toolbelt==1.0.0 ; python_version >= "3.12" and python_version < "3.15" diff --git a/task_queue/config.py b/task_queue/config.py index d1cbfec..a35f728 100644 --- a/task_queue/config.py +++ b/task_queue/config.py @@ -1,31 +1,30 @@ #!/usr/bin/env python3 """ -Queue configuration using SqliteHuey for asynchronous file processing. +Queue configuration using RedisHuey for asynchronous file processing. """ -import os import logging -from huey import SqliteHuey +from huey import RedisHuey from datetime import timedelta +from utils.settings import REDIS_URL + # Configure logging logger = logging.getLogger('app') -# Ensure projects/queue_data directory exists -queue_data_dir = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data') -os.makedirs(queue_data_dir, exist_ok=True) - -# Initialize SqliteHuey -huey = SqliteHuey( - filename=os.path.join(queue_data_dir, 'huey.db'), - name='file_processor', # Queue name +# Initialize RedisHuey +# Redis is an external backend shared between the web (enqueue) and consumer +# (dequeue) processes, avoiding the local SQLite file corruption that the +# previous SqliteHuey backend suffered from under concurrent multi-process access. +huey = RedisHuey( + name='file_processor', # Queue name, used as the Redis key prefix + url=REDIS_URL, always_eager=False, # Set to False to enable async processing utc=True, # Use UTC time ) # Set default task configuration -huey.store_errors = True # Store error information huey.max_retries = 3 # Maximum retry count huey.retry_delay = timedelta(seconds=60) # Retry delay -logger.info(f"SqliteHuey queue initialized, database path: {os.path.join(queue_data_dir, 'huey.db')}") \ No newline at end of file +logger.info("RedisHuey queue initialized") diff --git a/task_queue/consumer.py b/task_queue/consumer.py index a13a7b8..db68656 100755 --- a/task_queue/consumer.py +++ b/task_queue/consumer.py @@ -18,6 +18,7 @@ from task_queue.config import huey from task_queue.manager import queue_manager from task_queue.integration_tasks import process_files_async, cleanup_project_async from huey.consumer import Consumer +from utils.settings import REDIS_URL class QueueConsumer: @@ -44,7 +45,7 @@ class QueueConsumer: print(f"Starting queue consumer...") print(f"Worker threads: {self.workers}") print(f"Worker type: {self.worker_type}") - print(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") + print(f"Backend: Redis ({REDIS_URL})") print("Press Ctrl+C to stop the consumer") self.running = True @@ -135,7 +136,7 @@ def main(): print(f"Completed tasks: {stats.get('completed_tasks', 0)}") print(f"Error tasks: {stats.get('error_tasks', 0)}") print(f"Scheduled tasks: {stats.get('scheduled_tasks', 0)}") - print(f"Database: {stats.get('queue_database', 'N/A')}") + print(f"Backend: {stats.get('queue_backend', 'N/A')}") return if args.flush: diff --git a/task_queue/manager.py b/task_queue/manager.py index 3eb2626..5ab82c5 100644 --- a/task_queue/manager.py +++ b/task_queue/manager.py @@ -24,7 +24,7 @@ class QueueManager: def __init__(self): self.huey = huey - logger.info(f"Queue manager initialized with database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") + logger.info("Queue manager initialized with Redis backend") def enqueue_file( self, @@ -196,7 +196,7 @@ class QueueManager: "error_tasks": 0, "scheduled_tasks": 0, "recent_tasks": [], - "queue_database": os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') + "queue_backend": "redis" } # Try to get the number of pending tasks @@ -220,7 +220,7 @@ class QueueManager: except Exception as e: return { "error": str(e), - "queue_database": os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') + "queue_backend": "redis" } diff --git a/task_queue/optimized_consumer.py b/task_queue/optimized_consumer.py index ab66b08..30a64e0 100755 --- a/task_queue/optimized_consumer.py +++ b/task_queue/optimized_consumer.py @@ -25,6 +25,7 @@ from task_queue.config import huey from task_queue.manager import queue_manager from task_queue.integration_tasks import process_files_async, cleanup_project_async from huey.consumer import Consumer +from utils.settings import REDIS_URL class OptimizedQueueConsumer: @@ -115,7 +116,7 @@ class OptimizedQueueConsumer: # Apply optimizations self.setup_optimizations() - logger.info(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") + logger.info(f"Backend: Redis ({REDIS_URL})") logger.info("Press Ctrl+C to stop the consumer") self.running = True @@ -203,14 +204,6 @@ def check_queue_status(): if 'scheduled_tasks' in stats: logger.info(f"- Scheduled tasks: {stats['scheduled_tasks']}") - # Check database file - db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') - if os.path.exists(db_path): - size = os.path.getsize(db_path) - logger.info(f"- Database size: {size} bytes") - else: - logger.info("- Database file: not found") - except Exception as e: logger.error(f"Failed to get queue status: {e}") diff --git a/utils/settings.py b/utils/settings.py index 4d73705..b921062 100644 --- a/utils/settings.py +++ b/utils/settings.py @@ -77,6 +77,15 @@ CHECKPOINT_CLEANUP_INACTIVE_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_INACTIVE_DA CHECKPOINT_CLEANUP_INTERVAL_HOURS = int(os.getenv("CHECKPOINT_CLEANUP_INTERVAL_HOURS", "24")) +# ============================================================ +# Redis Configuration (Huey task queue backend) +# ============================================================ + +# Redis connection URL. +# Format: redis://[:password]@host:port/db +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/1") + + # ============================================================ # Mem0 long-term memory configuration # ============================================================