qwen_agent/task_queue/optimized_consumer.py
2025-11-27 21:50:03 +08:00

286 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
优化的队列消费者 - 集成性能优化功能
"""
import sys
import os
import time
import signal
import argparse
import multiprocessing
import logging
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import threading
# 配置日志
logger = logging.getLogger('app')
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, cleanup_project_async
from huey.consumer import Consumer
class OptimizedQueueConsumer:
"""优化的队列消费者,集成性能优化"""
def __init__(self, worker_type: str = "threads", workers: int = 2):
self.huey = huey
self.worker_type = worker_type
self.workers = workers
self.running = False
self.consumer = None
self.processed_count = 0
self.start_time = None
# 性能监控
self.performance_stats = {
'tasks_processed': 0,
'tasks_failed': 0,
'avg_processing_time': 0,
'start_time': None,
'last_activity': None
}
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""信号处理器,用于优雅关闭"""
logger.info(f"\n收到信号 {signum},正在关闭队列消费者...")
self.running = False
if self.consumer:
self.consumer.stop()
def setup_optimizations(self):
"""设置性能优化"""
# 设置环境变量
env_vars = {
'PYTHONUNBUFFERED': '1',
'PYTHONDONTWRITEBYTECODE': '1',
}
for key, value in env_vars.items():
os.environ[key] = value
# 优化 huey 配置
if hasattr(huey, 'immediate'):
huey.immediate = False
# 根据工作类型调整
if self.worker_type == "threads":
# 线程池优化
if hasattr(huey, 'worker_type'):
huey.worker_type = 'threads'
# 设置线程池大小
if hasattr(huey, 'always_eager'):
huey.always_eager = False
logger.info(f"队列消费者优化设置完成:")
logger.info(f"- 工作类型: {self.worker_type}")
logger.info(f"- 工作线程数: {self.workers}")
def monitor_performance(self):
"""性能监控线程"""
while self.running:
time.sleep(30) # 每30秒输出一次统计
if self.start_time:
elapsed = time.time() - self.start_time
rate = self.performance_stats['tasks_processed'] / max(1, elapsed)
logger.info(f"\n[性能统计]")
logger.info(f"- 运行时间: {elapsed:.1f}")
logger.info(f"- 已处理任务: {self.performance_stats['tasks_processed']}")
logger.info(f"- 失败任务: {self.performance_stats['tasks_failed']}")
logger.info(f"- 平均处理速率: {rate:.2f} 任务/秒")
if self.performance_stats['avg_processing_time'] > 0:
logger.info(f"- 平均处理时间: {self.performance_stats['avg_processing_time']:.2f}")
def start(self):
"""启动队列消费者"""
logger.info("=" * 60)
logger.info("优化的队列消费者启动")
logger.info("=" * 60)
# 设置优化
self.setup_optimizations()
logger.info(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}")
logger.info("按 Ctrl+C 停止消费者")
self.running = True
self.start_time = time.time()
self.performance_stats['start_time'] = self.start_time
# 启动性能监控线程
monitor_thread = threading.Thread(target=self.monitor_performance, daemon=True)
monitor_thread.start()
try:
# 创建消费者
self.consumer = Consumer(
self.huey,
workers=self.workers,
worker_type=self.worker_type,
max_delay=60.0, # 最大延迟
check_delay=1.0, # 检查间隔
periodic=True, # 启用定期任务
)
logger.info("队列消费者已启动,等待任务...")
# 启动消费者
self.consumer.run()
except KeyboardInterrupt:
logger.info("\n收到键盘中断信号")
except Exception as e:
logger.error(f"队列消费者运行错误: {e}")
import traceback
traceback.print_exc()
finally:
self.shutdown()
def shutdown(self):
"""关闭队列消费者"""
logger.info("\n正在关闭队列消费者...")
self.running = False
if self.consumer:
try:
self.consumer.stop()
logger.info("队列消费者已停止")
except Exception as e:
logger.error(f"停止队列消费者时出错: {e}")
# 输出最终统计
if self.start_time:
elapsed = time.time() - self.start_time
logger.info(f"\n[最终统计]")
logger.info(f"- 总运行时间: {elapsed:.1f}")
logger.info(f"- 总处理任务: {self.performance_stats['tasks_processed']}")
logger.info(f"- 总失败任务: {self.performance_stats['tasks_failed']}")
if self.performance_stats['tasks_processed'] > 0:
rate = self.performance_stats['tasks_processed'] / elapsed
logger.info(f"- 平均处理速率: {rate:.2f} 任务/秒")
def calculate_optimal_workers():
"""计算最优的工作线程数"""
cpu_count = multiprocessing.cpu_count()
# 基于CPU核心数和系统资源
if cpu_count <= 2:
return 2
elif cpu_count <= 4:
return 4
else:
return min(8, cpu_count)
def check_queue_status():
"""检查队列状态"""
try:
stats = queue_manager.get_queue_stats()
logger.info("\n[队列状态]")
if isinstance(stats, dict):
if 'total_tasks' in stats:
logger.info(f"- 总任务数: {stats['total_tasks']}")
if 'pending_tasks' in stats:
logger.info(f"- 待处理任务: {stats['pending_tasks']}")
if 'scheduled_tasks' in stats:
logger.info(f"- 定时任务: {stats['scheduled_tasks']}")
# 检查数据库文件
db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')
if os.path.exists(db_path):
size = os.path.getsize(db_path)
logger.info(f"- 数据库大小: {size} 字节")
else:
logger.info("- 数据库文件: 不存在")
except Exception as e:
logger.error(f"获取队列状态失败: {e}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="优化的队列消费者")
parser.add_argument(
"--workers",
type=int,
default=calculate_optimal_workers(),
help=f"工作线程数 (默认: {calculate_optimal_workers()})"
)
parser.add_argument(
"--worker-type",
type=str,
default="threads",
choices=["threads", "greenlets", "gevent"],
help="工作类型 (默认: threads)"
)
parser.add_argument(
"--check-status",
action="store_true",
help="检查队列状态后退出"
)
parser.add_argument(
"--profile",
type=str,
default="balanced",
choices=["low_memory", "balanced", "high_performance"],
help="性能配置文件"
)
args = parser.parse_args()
# 应用性能配置
if args.profile == "low_memory":
os.environ['PYTHONOPTIMIZE'] = '1'
if args.workers > 2:
args.workers = 2
logger.info(f"低内存模式: 调整工作线程数为 {args.workers}")
elif args.profile == "high_performance":
if args.workers < 4:
args.workers = 4
logger.info(f"高性能模式: 调整工作线程数为 {args.workers}")
# 检查队列状态
if args.check_status:
check_queue_status()
return
# 检查环境
try:
import psutil
memory = psutil.virtual_memory()
logger.info(f"[系统信息]")
logger.info(f"- CPU核心数: {multiprocessing.cpu_count()}")
logger.info(f"- 可用内存: {memory.available / (1024**3):.1f}GB")
logger.info(f"- 内存使用率: {memory.percent:.1f}%")
except ImportError:
logger.info("[提示] 安装 psutil 可显示系统信息: pip install psutil")
# 创建并启动队列消费者
consumer = OptimizedQueueConsumer(
worker_type=args.worker_type,
workers=args.workers
)
consumer.start()
if __name__ == "__main__":
main()