qwen_agent/task_queue/optimized_consumer.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

287 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Optimized queue consumer with integrated performance monitoring.
"""
import sys
import os
import time
import signal
import argparse
import multiprocessing
import logging
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import threading
# Configure logging
logger = logging.getLogger('app')
# Add project root directory to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from task_queue.config import huey
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, cleanup_project_async
from huey.consumer import Consumer
class OptimizedQueueConsumer:
"""Optimized queue consumer with integrated performance monitoring."""
def __init__(self, worker_type: str = "threads", workers: int = 2):
self.huey = huey
self.worker_type = worker_type
self.workers = workers
self.running = False
self.consumer = None
self.processed_count = 0
self.start_time = None
# Performance monitoring
self.performance_stats = {
'tasks_processed': 0,
'tasks_failed': 0,
'avg_processing_time': 0,
'start_time': None,
'last_activity': None
}
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Signal handler for graceful shutdown."""
logger.info(f"\nReceived signal {signum}, shutting down queue consumer...")
self.running = False
if self.consumer:
self.consumer.stop()
def setup_optimizations(self):
"""Set up performance optimizations."""
# Set environment variables
env_vars = {
'PYTHONUNBUFFERED': '1',
'PYTHONDONTWRITEBYTECODE': '1',
}
for key, value in env_vars.items():
os.environ[key] = value
# Optimize huey configuration
if hasattr(huey, 'immediate'):
huey.immediate = False
# Adjust based on worker type
if self.worker_type == "threads":
# Thread pool optimization
if hasattr(huey, 'worker_type'):
huey.worker_type = 'threads'
# Set thread pool size
if hasattr(huey, 'always_eager'):
huey.always_eager = False
logger.info("Queue consumer optimization setup complete:")
logger.info(f"- Worker type: {self.worker_type}")
logger.info(f"- Worker count: {self.workers}")
def monitor_performance(self):
"""Performance monitoring thread."""
while self.running:
time.sleep(30) # Output statistics every 30 seconds
if self.start_time:
elapsed = time.time() - self.start_time
rate = self.performance_stats['tasks_processed'] / max(1, elapsed)
logger.info(f"\n[Performance Stats]")
logger.info(f"- Uptime: {elapsed:.1f}s")
logger.info(f"- Tasks processed: {self.performance_stats['tasks_processed']}")
logger.info(f"- Failed tasks: {self.performance_stats['tasks_failed']}")
logger.info(f"- Average processing rate: {rate:.2f} tasks/s")
if self.performance_stats['avg_processing_time'] > 0:
logger.info(f"- Average processing time: {self.performance_stats['avg_processing_time']:.2f}s")
def start(self):
"""Start the queue consumer."""
logger.info("=" * 60)
logger.info("Optimized queue consumer starting")
logger.info("=" * 60)
# Apply optimizations
self.setup_optimizations()
logger.info(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}")
logger.info("Press Ctrl+C to stop the consumer")
self.running = True
self.start_time = time.time()
self.performance_stats['start_time'] = self.start_time
# Start performance monitoring thread
monitor_thread = threading.Thread(target=self.monitor_performance, daemon=True)
monitor_thread.start()
try:
# Create consumer
self.consumer = Consumer(
self.huey,
workers=self.workers,
worker_type=self.worker_type,
max_delay=60.0, # Maximum delay
check_delay=1.0, # Check interval
periodic=True, # Enable periodic tasks
)
logger.info("Queue consumer started, waiting for tasks...")
# Start the consumer
self.consumer.run()
except KeyboardInterrupt:
logger.info("\nReceived keyboard interrupt signal")
except Exception as e:
logger.error(f"Queue consumer runtime error: {e}")
import traceback
traceback.print_exc()
finally:
self.shutdown()
def shutdown(self):
"""Shut down the queue consumer."""
logger.info("\nShutting down queue consumer...")
self.running = False
if self.consumer:
try:
self.consumer.stop()
logger.info("Queue consumer stopped")
except Exception as e:
logger.error(f"Error stopping queue consumer: {e}")
# Output final statistics
if self.start_time:
elapsed = time.time() - self.start_time
logger.info(f"\n[Final Stats]")
logger.info(f"- Total uptime: {elapsed:.1f}s")
logger.info(f"- Total tasks processed: {self.performance_stats['tasks_processed']}")
logger.info(f"- Total failed tasks: {self.performance_stats['tasks_failed']}")
if self.performance_stats['tasks_processed'] > 0:
rate = self.performance_stats['tasks_processed'] / elapsed
logger.info(f"- Average processing rate: {rate:.2f} tasks/s")
def calculate_optimal_workers():
"""Calculate the optimal number of worker threads."""
cpu_count = multiprocessing.cpu_count()
# Based on CPU core count and system resources
if cpu_count <= 2:
return 2
elif cpu_count <= 4:
return 4
else:
return min(8, cpu_count)
def check_queue_status():
"""Check queue status."""
try:
stats = queue_manager.get_queue_stats()
logger.info("\n[Queue Status]")
if isinstance(stats, dict):
if 'total_tasks' in stats:
logger.info(f"- Total tasks: {stats['total_tasks']}")
if 'pending_tasks' in stats:
logger.info(f"- Pending tasks: {stats['pending_tasks']}")
if 'scheduled_tasks' in stats:
logger.info(f"- Scheduled tasks: {stats['scheduled_tasks']}")
# Check database file
db_path = os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')
if os.path.exists(db_path):
size = os.path.getsize(db_path)
logger.info(f"- Database size: {size} bytes")
else:
logger.info("- Database file: not found")
except Exception as e:
logger.error(f"Failed to get queue status: {e}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Optimized queue consumer")
parser.add_argument(
"--workers",
type=int,
default=calculate_optimal_workers(),
help=f"Number of worker threads (default: {calculate_optimal_workers()})"
)
parser.add_argument(
"--worker-type",
type=str,
default="threads",
choices=["threads", "greenlets", "gevent"],
help="Worker type (default: threads)"
)
parser.add_argument(
"--check-status",
action="store_true",
help="Check queue status and exit"
)
parser.add_argument(
"--profile",
type=str,
default="balanced",
choices=["low_memory", "balanced", "high_performance"],
help="Performance profile"
)
args = parser.parse_args()
# Apply performance profile
if args.profile == "low_memory":
os.environ['PYTHONOPTIMIZE'] = '1'
if args.workers > 2:
args.workers = 2
logger.info(f"Low memory mode: adjusted worker count to {args.workers}")
elif args.profile == "high_performance":
if args.workers < 4:
args.workers = 4
logger.info(f"High performance mode: adjusted worker count to {args.workers}")
# Check queue status
if args.check_status:
check_queue_status()
return
# Check environment
try:
import psutil
memory = psutil.virtual_memory()
logger.info("[System Info]")
logger.info(f"- CPU cores: {multiprocessing.cpu_count()}")
logger.info(f"- Available memory: {memory.available / (1024**3):.1f}GB")
logger.info(f"- Memory usage: {memory.percent:.1f}%")
except ImportError:
logger.info("[Tip] Install psutil to display system info: pip install psutil")
# Create and start the queue consumer
consumer = OptimizedQueueConsumer(
worker_type=args.worker_type,
workers=args.workers
)
consumer.start()
if __name__ == "__main__":
main()