#!/usr/bin/env python3 """ 优化的队列消费者 - 集成性能优化功能 """ import sys import os import time import signal import argparse import multiprocessing from pathlib import Path from concurrent.futures import ThreadPoolExecutor import threading # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from task_queue.config import huey from task_queue.manager import queue_manager from task_queue.integration_tasks import process_files_async, cleanup_project_async from huey.consumer import Consumer class OptimizedQueueConsumer: """优化的队列消费者,集成性能优化""" def __init__(self, worker_type: str = "threads", workers: int = 2): self.huey = huey self.worker_type = worker_type self.workers = workers self.running = False self.consumer = None self.processed_count = 0 self.start_time = None # 性能监控 self.performance_stats = { 'tasks_processed': 0, 'tasks_failed': 0, 'avg_processing_time': 0, 'start_time': None, 'last_activity': None } # 注册信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """信号处理器,用于优雅关闭""" print(f"\n收到信号 {signum},正在关闭队列消费者...") self.running = False if self.consumer: self.consumer.stop() def setup_optimizations(self): """设置性能优化""" # 设置环境变量 env_vars = { 'PYTHONUNBUFFERED': '1', 'PYTHONDONTWRITEBYTECODE': '1', } for key, value in env_vars.items(): os.environ[key] = value # 优化 huey 配置 if hasattr(huey, 'immediate'): huey.immediate = False # 根据工作类型调整 if self.worker_type == "threads": # 线程池优化 if hasattr(huey, 'worker_type'): huey.worker_type = 'threads' # 设置线程池大小 if hasattr(huey, 'always_eager'): huey.always_eager = False print(f"队列消费者优化设置完成:") print(f"- 工作类型: {self.worker_type}") print(f"- 工作线程数: {self.workers}") def monitor_performance(self): """性能监控线程""" while self.running: time.sleep(30) # 每30秒输出一次统计 if self.start_time: elapsed = time.time() - self.start_time rate = self.performance_stats['tasks_processed'] / max(1, elapsed) print(f"\n[性能统计]") print(f"- 运行时间: {elapsed:.1f}秒") print(f"- 已处理任务: {self.performance_stats['tasks_processed']}") print(f"- 失败任务: {self.performance_stats['tasks_failed']}") print(f"- 平均处理速率: {rate:.2f} 任务/秒") if self.performance_stats['avg_processing_time'] > 0: print(f"- 平均处理时间: {self.performance_stats['avg_processing_time']:.2f}秒") def start(self): """启动队列消费者""" print("=" * 60) print("优化的队列消费者启动") print("=" * 60) # 设置优化 self.setup_optimizations() print(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") print("按 Ctrl+C 停止消费者") print() self.running = True self.start_time = time.time() self.performance_stats['start_time'] = self.start_time # 启动性能监控线程 monitor_thread = threading.Thread(target=self.monitor_performance, daemon=True) monitor_thread.start() try: # 创建消费者 self.consumer = Consumer( self.huey, workers=self.workers, worker_type=self.worker_type, max_delay=60.0, # 最大延迟 check_delay=1.0, # 检查间隔 periodic=True, # 启用定期任务 ) print("队列消费者已启动,等待任务...") # 启动消费者 self.consumer.run() except KeyboardInterrupt: print("\n收到键盘中断信号") except Exception as e: print(f"队列消费者运行错误: {e}") import traceback traceback.print_exc() finally: self.shutdown() def shutdown(self): """关闭队列消费者""" print("\n正在关闭队列消费者...") self.running = False if self.consumer: try: self.consumer.stop() print("队列消费者已停止") except Exception as e: print(f"停止队列消费者时出错: {e}") # 输出最终统计 if self.start_time: elapsed = time.time() - self.start_time print(f"\n[最终统计]") print(f"- 总运行时间: {elapsed:.1f}秒") print(f"- 总处理任务: {self.performance_stats['tasks_processed']}") print(f"- 总失败任务: {self.performance_stats['tasks_failed']}") if self.performance_stats['tasks_processed'] > 0: rate = self.performance_stats['tasks_processed'] / elapsed print(f"- 平均处理速率: {rate:.2f} 任务/秒") def calculate_optimal_workers(): """计算最优的工作线程数""" cpu_count = multiprocessing.cpu_count() # 基于CPU核心数和系统资源 if cpu_count <= 2: return 2 elif cpu_count <= 4: return 4 else: return min(8, cpu_count) def check_queue_status(): """检查队列状态""" try: stats = queue_manager.get_queue_stats() print("\n[队列状态]") if isinstance(stats, dict): if 'total_tasks' in stats: print(f"- 总任务数: {stats['total_tasks']}") if 'pending_tasks' in stats: print(f"- 待处理任务: {stats['pending_tasks']}") if 'scheduled_tasks' in stats: print(f"- 定时任务: {stats['scheduled_tasks']}") # 检查数据库文件 db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db') if os.path.exists(db_path): size = os.path.getsize(db_path) print(f"- 数据库大小: {size} 字节") else: print("- 数据库文件: 不存在") except Exception as e: print(f"获取队列状态失败: {e}") def main(): """主函数""" parser = argparse.ArgumentParser(description="优化的队列消费者") parser.add_argument( "--workers", type=int, default=calculate_optimal_workers(), help=f"工作线程数 (默认: {calculate_optimal_workers()})" ) parser.add_argument( "--worker-type", type=str, default="threads", choices=["threads", "greenlets", "gevent"], help="工作类型 (默认: threads)" ) parser.add_argument( "--check-status", action="store_true", help="检查队列状态后退出" ) parser.add_argument( "--profile", type=str, default="balanced", choices=["low_memory", "balanced", "high_performance"], help="性能配置文件" ) args = parser.parse_args() # 应用性能配置 if args.profile == "low_memory": os.environ['PYTHONOPTIMIZE'] = '1' if args.workers > 2: args.workers = 2 print(f"低内存模式: 调整工作线程数为 {args.workers}") elif args.profile == "high_performance": if args.workers < 4: args.workers = 4 print(f"高性能模式: 调整工作线程数为 {args.workers}") # 检查队列状态 if args.check_status: check_queue_status() return # 检查环境 try: import psutil memory = psutil.virtual_memory() print(f"[系统信息]") print(f"- CPU核心数: {multiprocessing.cpu_count()}") print(f"- 可用内存: {memory.available / (1024**3):.1f}GB") print(f"- 内存使用率: {memory.percent:.1f}%") except ImportError: print("[提示] 安装 psutil 可显示系统信息: pip install psutil") # 创建并启动队列消费者 consumer = OptimizedQueueConsumer( worker_type=args.worker_type, workers=args.workers ) consumer.start() if __name__ == "__main__": main()