286 lines
9.3 KiB
Python
Executable File
286 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
优化的队列消费者 - 集成性能优化功能
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import signal
|
|
import argparse
|
|
import multiprocessing
|
|
import logging
|
|
from pathlib import Path
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import threading
|
|
|
|
# 配置日志
|
|
logger = logging.getLogger('app')
|
|
|
|
# 添加项目根目录到Python路径
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from task_queue.config import huey
|
|
from task_queue.manager import queue_manager
|
|
from task_queue.integration_tasks import process_files_async, cleanup_project_async
|
|
from huey.consumer import Consumer
|
|
|
|
|
|
class OptimizedQueueConsumer:
|
|
"""优化的队列消费者,集成性能优化"""
|
|
|
|
def __init__(self, worker_type: str = "threads", workers: int = 2):
|
|
self.huey = huey
|
|
self.worker_type = worker_type
|
|
self.workers = workers
|
|
self.running = False
|
|
self.consumer = None
|
|
self.processed_count = 0
|
|
self.start_time = None
|
|
|
|
# 性能监控
|
|
self.performance_stats = {
|
|
'tasks_processed': 0,
|
|
'tasks_failed': 0,
|
|
'avg_processing_time': 0,
|
|
'start_time': None,
|
|
'last_activity': None
|
|
}
|
|
|
|
# 注册信号处理
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""信号处理器,用于优雅关闭"""
|
|
logger.info(f"\n收到信号 {signum},正在关闭队列消费者...")
|
|
self.running = False
|
|
if self.consumer:
|
|
self.consumer.stop()
|
|
|
|
def setup_optimizations(self):
|
|
"""设置性能优化"""
|
|
# 设置环境变量
|
|
env_vars = {
|
|
'PYTHONUNBUFFERED': '1',
|
|
'PYTHONDONTWRITEBYTECODE': '1',
|
|
}
|
|
|
|
for key, value in env_vars.items():
|
|
os.environ[key] = value
|
|
|
|
# 优化 huey 配置
|
|
if hasattr(huey, 'immediate'):
|
|
huey.immediate = False
|
|
|
|
# 根据工作类型调整
|
|
if self.worker_type == "threads":
|
|
# 线程池优化
|
|
if hasattr(huey, 'worker_type'):
|
|
huey.worker_type = 'threads'
|
|
|
|
# 设置线程池大小
|
|
if hasattr(huey, 'always_eager'):
|
|
huey.always_eager = False
|
|
|
|
logger.info(f"队列消费者优化设置完成:")
|
|
logger.info(f"- 工作类型: {self.worker_type}")
|
|
logger.info(f"- 工作线程数: {self.workers}")
|
|
|
|
def monitor_performance(self):
|
|
"""性能监控线程"""
|
|
while self.running:
|
|
time.sleep(30) # 每30秒输出一次统计
|
|
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
rate = self.performance_stats['tasks_processed'] / max(1, elapsed)
|
|
|
|
logger.info(f"\n[性能统计]")
|
|
logger.info(f"- 运行时间: {elapsed:.1f}秒")
|
|
logger.info(f"- 已处理任务: {self.performance_stats['tasks_processed']}")
|
|
logger.info(f"- 失败任务: {self.performance_stats['tasks_failed']}")
|
|
logger.info(f"- 平均处理速率: {rate:.2f} 任务/秒")
|
|
|
|
if self.performance_stats['avg_processing_time'] > 0:
|
|
logger.info(f"- 平均处理时间: {self.performance_stats['avg_processing_time']:.2f}秒")
|
|
|
|
def start(self):
|
|
"""启动队列消费者"""
|
|
logger.info("=" * 60)
|
|
logger.info("优化的队列消费者启动")
|
|
logger.info("=" * 60)
|
|
|
|
# 设置优化
|
|
self.setup_optimizations()
|
|
|
|
logger.info(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}")
|
|
logger.info("按 Ctrl+C 停止消费者")
|
|
|
|
self.running = True
|
|
self.start_time = time.time()
|
|
self.performance_stats['start_time'] = self.start_time
|
|
|
|
# 启动性能监控线程
|
|
monitor_thread = threading.Thread(target=self.monitor_performance, daemon=True)
|
|
monitor_thread.start()
|
|
|
|
try:
|
|
# 创建消费者
|
|
self.consumer = Consumer(
|
|
self.huey,
|
|
workers=self.workers,
|
|
worker_type=self.worker_type,
|
|
max_delay=60.0, # 最大延迟
|
|
check_delay=1.0, # 检查间隔
|
|
periodic=True, # 启用定期任务
|
|
)
|
|
|
|
logger.info("队列消费者已启动,等待任务...")
|
|
|
|
# 启动消费者
|
|
self.consumer.run()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\n收到键盘中断信号")
|
|
except Exception as e:
|
|
logger.error(f"队列消费者运行错误: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
self.shutdown()
|
|
|
|
def shutdown(self):
|
|
"""关闭队列消费者"""
|
|
logger.info("\n正在关闭队列消费者...")
|
|
self.running = False
|
|
|
|
if self.consumer:
|
|
try:
|
|
self.consumer.stop()
|
|
logger.info("队列消费者已停止")
|
|
except Exception as e:
|
|
logger.error(f"停止队列消费者时出错: {e}")
|
|
|
|
# 输出最终统计
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
logger.info(f"\n[最终统计]")
|
|
logger.info(f"- 总运行时间: {elapsed:.1f}秒")
|
|
logger.info(f"- 总处理任务: {self.performance_stats['tasks_processed']}")
|
|
logger.info(f"- 总失败任务: {self.performance_stats['tasks_failed']}")
|
|
|
|
if self.performance_stats['tasks_processed'] > 0:
|
|
rate = self.performance_stats['tasks_processed'] / elapsed
|
|
logger.info(f"- 平均处理速率: {rate:.2f} 任务/秒")
|
|
|
|
|
|
def calculate_optimal_workers():
|
|
"""计算最优的工作线程数"""
|
|
cpu_count = multiprocessing.cpu_count()
|
|
|
|
# 基于CPU核心数和系统资源
|
|
if cpu_count <= 2:
|
|
return 2
|
|
elif cpu_count <= 4:
|
|
return 4
|
|
else:
|
|
return min(8, cpu_count)
|
|
|
|
|
|
def check_queue_status():
|
|
"""检查队列状态"""
|
|
try:
|
|
stats = queue_manager.get_queue_stats()
|
|
|
|
logger.info("\n[队列状态]")
|
|
if isinstance(stats, dict):
|
|
if 'total_tasks' in stats:
|
|
logger.info(f"- 总任务数: {stats['total_tasks']}")
|
|
if 'pending_tasks' in stats:
|
|
logger.info(f"- 待处理任务: {stats['pending_tasks']}")
|
|
if 'scheduled_tasks' in stats:
|
|
logger.info(f"- 定时任务: {stats['scheduled_tasks']}")
|
|
|
|
# 检查数据库文件
|
|
db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')
|
|
if os.path.exists(db_path):
|
|
size = os.path.getsize(db_path)
|
|
logger.info(f"- 数据库大小: {size} 字节")
|
|
else:
|
|
logger.info("- 数据库文件: 不存在")
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取队列状态失败: {e}")
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
parser = argparse.ArgumentParser(description="优化的队列消费者")
|
|
parser.add_argument(
|
|
"--workers",
|
|
type=int,
|
|
default=calculate_optimal_workers(),
|
|
help=f"工作线程数 (默认: {calculate_optimal_workers()})"
|
|
)
|
|
parser.add_argument(
|
|
"--worker-type",
|
|
type=str,
|
|
default="threads",
|
|
choices=["threads", "greenlets", "gevent"],
|
|
help="工作类型 (默认: threads)"
|
|
)
|
|
parser.add_argument(
|
|
"--check-status",
|
|
action="store_true",
|
|
help="检查队列状态后退出"
|
|
)
|
|
parser.add_argument(
|
|
"--profile",
|
|
type=str,
|
|
default="balanced",
|
|
choices=["low_memory", "balanced", "high_performance"],
|
|
help="性能配置文件"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# 应用性能配置
|
|
if args.profile == "low_memory":
|
|
os.environ['PYTHONOPTIMIZE'] = '1'
|
|
if args.workers > 2:
|
|
args.workers = 2
|
|
logger.info(f"低内存模式: 调整工作线程数为 {args.workers}")
|
|
elif args.profile == "high_performance":
|
|
if args.workers < 4:
|
|
args.workers = 4
|
|
logger.info(f"高性能模式: 调整工作线程数为 {args.workers}")
|
|
|
|
# 检查队列状态
|
|
if args.check_status:
|
|
check_queue_status()
|
|
return
|
|
|
|
# 检查环境
|
|
try:
|
|
import psutil
|
|
memory = psutil.virtual_memory()
|
|
logger.info(f"[系统信息]")
|
|
logger.info(f"- CPU核心数: {multiprocessing.cpu_count()}")
|
|
logger.info(f"- 可用内存: {memory.available / (1024**3):.1f}GB")
|
|
logger.info(f"- 内存使用率: {memory.percent:.1f}%")
|
|
except ImportError:
|
|
logger.info("[提示] 安装 psutil 可显示系统信息: pip install psutil")
|
|
|
|
# 创建并启动队列消费者
|
|
consumer = OptimizedQueueConsumer(
|
|
worker_type=args.worker_type,
|
|
workers=args.workers
|
|
)
|
|
|
|
consumer.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |