catalog-agent/task_queue/consumer.py
2025-10-18 09:20:59 +08:00

171 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Queue consumer for processing file tasks.
"""
import sys
import os
import time
import signal
import argparse
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, cleanup_project_async
from huey.consumer import Consumer
class QueueConsumer:
"""队列消费者,用于处理异步任务"""
def __init__(self, worker_type: str = "threads", workers: int = 2):
self.huey = huey
self.worker_type = worker_type
self.workers = workers
self.running = False
self.consumer = None
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""信号处理器,用于优雅关闭"""
print(f"\n收到信号 {signum},正在关闭队列消费者...")
self.running = False
def start(self):
"""启动队列消费者"""
print(f"启动队列消费者...")
print(f"工作线程数: {self.workers}")
print(f"工作类型: {self.worker_type}")
print(f"数据库: {os.path.join(os.path.dirname(__file__), '..', 'queue_data', 'huey.db')}")
print("按 Ctrl+C 停止消费者")
self.running = True
try:
# 创建Huey消费者
self.consumer = Consumer(self.huey, workers=self.workers, worker_type=self.worker_type.rstrip('s'))
# 显示队列统计信息
stats = queue_manager.get_queue_stats()
print(f"当前队列状态: {stats}")
# 启动消费者运行
print("消费者开始处理任务...")
self.consumer.run()
except KeyboardInterrupt:
print("\n收到中断信号,正在关闭...")
except Exception as e:
print(f"队列消费者运行时发生错误: {str(e)}")
finally:
self.stop()
def stop(self):
"""停止队列消费者"""
print("正在停止队列消费者...")
try:
if self.consumer:
# 停止消费者
self.consumer.stop()
self.consumer = None
print("队列消费者已停止")
except Exception as e:
print(f"停止队列消费者时发生错误: {str(e)}")
def process_scheduled_tasks(self):
"""处理定时任务"""
print("处理定时任务...")
# 这里可以添加额外的定时任务处理逻辑
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="文件处理队列消费者")
parser.add_argument(
"--workers",
type=int,
default=2,
help="工作线程数 (默认: 2)"
)
parser.add_argument(
"--worker-type",
choices=["threads", "greenlets", "processes"],
default="threads",
help="工作线程类型 (默认: threads)"
)
parser.add_argument(
"--stats",
action="store_true",
help="显示队列统计信息并退出"
)
parser.add_argument(
"--flush",
action="store_true",
help="清空队列并退出"
)
parser.add_argument(
"--check",
action="store_true",
help="检查队列状态并退出"
)
args = parser.parse_args()
# 初始化消费者
consumer = QueueConsumer(
worker_type=args.worker_type,
workers=args.workers
)
# 处理不同的命令行选项
if args.stats:
print("=== 队列统计信息 ===")
stats = queue_manager.get_queue_stats()
print(f"总任务数: {stats.get('total_tasks', 0)}")
print(f"待处理任务: {stats.get('pending_tasks', 0)}")
print(f"运行中任务: {stats.get('running_tasks', 0)}")
print(f"已完成任务: {stats.get('completed_tasks', 0)}")
print(f"错误任务: {stats.get('error_tasks', 0)}")
print(f"定时任务: {stats.get('scheduled_tasks', 0)}")
print(f"数据库: {stats.get('queue_database', 'N/A')}")
return
if args.flush:
print("=== 清空队列 ===")
try:
# 清空所有任务
consumer.huey.flush()
print("队列已清空")
except Exception as e:
print(f"清空队列失败: {str(e)}")
return
if args.check:
print("=== 检查队列状态 ===")
stats = queue_manager.get_queue_stats()
print(f"队列状态: 正常" if "error" not in stats else f"队列状态: 错误 - {stats['error']}")
pending_tasks = queue_manager.list_pending_tasks(limit=10)
if pending_tasks:
print(f"\n待处理任务 (最多显示10个):")
for task in pending_tasks:
print(f" 任务ID: {task['task_id']}, 状态: {task['status']}, 创建时间: {task['created_time']}")
else:
print("当前没有待处理任务")
return
# 启动消费者
print("=== 启动文件处理队列消费者 ===")
consumer.start()
if __name__ == "__main__":
main()