#!/usr/bin/env python3 """ Queue consumer for processing file tasks. """ import sys import os import time import signal import argparse from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from task_queue.config import huey from task_queue.manager import queue_manager from task_queue.integration_tasks import process_files_async, cleanup_project_async from huey.consumer import Consumer class QueueConsumer: """队列消费者,用于处理异步任务""" def __init__(self, worker_type: str = "threads", workers: int = 2): self.huey = huey self.worker_type = worker_type self.workers = workers self.running = False self.consumer = None # 注册信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """信号处理器,用于优雅关闭""" print(f"\n收到信号 {signum},正在关闭队列消费者...") self.running = False def start(self): """启动队列消费者""" print(f"启动队列消费者...") print(f"工作线程数: {self.workers}") print(f"工作类型: {self.worker_type}") print(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'queue_data', 'huey.db')}") print("按 Ctrl+C 停止消费者") self.running = True try: # 创建Huey消费者 self.consumer = Consumer(self.huey, workers=self.workers, worker_type=self.worker_type.rstrip('s')) # 显示队列统计信息 stats = queue_manager.get_queue_stats() print(f"当前队列状态: {stats}") # 启动消费者运行 print("消费者开始处理任务...") self.consumer.run() except KeyboardInterrupt: print("\n收到中断信号,正在关闭...") except Exception as e: print(f"队列消费者运行时发生错误: {str(e)}") finally: self.stop() def stop(self): """停止队列消费者""" print("正在停止队列消费者...") try: if self.consumer: # 停止消费者 self.consumer.stop() self.consumer = None print("队列消费者已停止") except Exception as e: print(f"停止队列消费者时发生错误: {str(e)}") def process_scheduled_tasks(self): """处理定时任务""" print("处理定时任务...") # 这里可以添加额外的定时任务处理逻辑 def main(): """主函数""" parser = argparse.ArgumentParser(description="文件处理队列消费者") parser.add_argument( "--workers", type=int, default=2, help="工作线程数 (默认: 2)" ) parser.add_argument( "--worker-type", choices=["threads", "greenlets", "processes"], default="threads", help="工作线程类型 (默认: threads)" ) parser.add_argument( "--stats", action="store_true", help="显示队列统计信息并退出" ) parser.add_argument( "--flush", action="store_true", help="清空队列并退出" ) parser.add_argument( "--check", action="store_true", help="检查队列状态并退出" ) args = parser.parse_args() # 初始化消费者 consumer = QueueConsumer( worker_type=args.worker_type, workers=args.workers ) # 处理不同的命令行选项 if args.stats: print("=== 队列统计信息 ===") stats = queue_manager.get_queue_stats() print(f"总任务数: {stats.get('total_tasks', 0)}") print(f"待处理任务: {stats.get('pending_tasks', 0)}") print(f"运行中任务: {stats.get('running_tasks', 0)}") print(f"已完成任务: {stats.get('completed_tasks', 0)}") print(f"错误任务: {stats.get('error_tasks', 0)}") print(f"定时任务: {stats.get('scheduled_tasks', 0)}") print(f"数据库: {stats.get('queue_database', 'N/A')}") return if args.flush: print("=== 清空队列 ===") try: # 清空所有任务 consumer.huey.flush() print("队列已清空") except Exception as e: print(f"清空队列失败: {str(e)}") return if args.check: print("=== 检查队列状态 ===") stats = queue_manager.get_queue_stats() print(f"队列状态: 正常" if "error" not in stats else f"队列状态: 错误 - {stats['error']}") pending_tasks = queue_manager.list_pending_tasks(limit=10) if pending_tasks: print(f"\n待处理任务 (最多显示10个):") for task in pending_tasks: print(f" 任务ID: {task['task_id']}, 状态: {task['status']}, 创建时间: {task['created_time']}") else: print("当前没有待处理任务") return # 启动消费者 print("=== 启动文件处理队列消费者 ===") consumer.start() if __name__ == "__main__": main()