#!/usr/bin/env python3 """ Queue consumer for processing file tasks. """ import sys import os import time import signal import argparse from pathlib import Path # Add project root directory to Python path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from task_queue.config import huey from task_queue.manager import queue_manager from task_queue.integration_tasks import process_files_async, cleanup_project_async from huey.consumer import Consumer class QueueConsumer: """Queue consumer for processing async tasks.""" def __init__(self, worker_type: str = "threads", workers: int = 2): self.huey = huey self.worker_type = worker_type self.workers = workers self.running = False self.consumer = None # Register signal handlers signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """Signal handler for graceful shutdown.""" print(f"\nReceived signal {signum}, shutting down queue consumer...") self.running = False def start(self): """Start the queue consumer.""" print(f"Starting queue consumer...") print(f"Worker threads: {self.workers}") print(f"Worker type: {self.worker_type}") print(f"Database: {os.path.join(os.path.dirname(__file__), '..', 'projects', 'queue_data', 'huey.db')}") print("Press Ctrl+C to stop the consumer") self.running = True try: # Create Huey consumer self.consumer = Consumer(self.huey, workers=self.workers, worker_type=self.worker_type.rstrip('s')) # Display queue statistics stats = queue_manager.get_queue_stats() print(f"Current queue status: {stats}") # Start consumer run loop print("Consumer starting task processing...") self.consumer.run() except KeyboardInterrupt: print("\nReceived interrupt signal, shutting down...") except Exception as e: print(f"Queue consumer runtime error: {str(e)}") finally: self.stop() def stop(self): """Stop the queue consumer.""" print("Stopping queue consumer...") try: if self.consumer: # Stop the consumer self.consumer.stop() self.consumer = None print("Queue consumer stopped") except Exception as e: print(f"Error stopping queue consumer: {str(e)}") def process_scheduled_tasks(self): """Process scheduled tasks.""" print("Processing scheduled tasks...") # Additional scheduled task processing logic can be added here def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="File processing queue consumer") parser.add_argument( "--workers", type=int, default=2, help="Number of worker threads (default: 2)" ) parser.add_argument( "--worker-type", choices=["threads", "greenlets", "processes"], default="threads", help="Worker thread type (default: threads)" ) parser.add_argument( "--stats", action="store_true", help="Display queue statistics and exit" ) parser.add_argument( "--flush", action="store_true", help="Flush the queue and exit" ) parser.add_argument( "--check", action="store_true", help="Check queue status and exit" ) args = parser.parse_args() # Initialize consumer consumer = QueueConsumer( worker_type=args.worker_type, workers=args.workers ) # Handle different command-line options if args.stats: print("=== Queue Statistics ===") stats = queue_manager.get_queue_stats() print(f"Total tasks: {stats.get('total_tasks', 0)}") print(f"Pending tasks: {stats.get('pending_tasks', 0)}") print(f"Running tasks: {stats.get('running_tasks', 0)}") print(f"Completed tasks: {stats.get('completed_tasks', 0)}") print(f"Error tasks: {stats.get('error_tasks', 0)}") print(f"Scheduled tasks: {stats.get('scheduled_tasks', 0)}") print(f"Database: {stats.get('queue_database', 'N/A')}") return if args.flush: print("=== Flushing Queue ===") try: # Flush all tasks consumer.huey.flush() print("Queue flushed") except Exception as e: print(f"Failed to flush queue: {str(e)}") return if args.check: print("=== Checking Queue Status ===") stats = queue_manager.get_queue_stats() print(f"Queue status: OK" if "error" not in stats else f"Queue status: ERROR - {stats['error']}") pending_tasks = queue_manager.list_pending_tasks(limit=10) if pending_tasks: print(f"\nPending tasks (showing up to 10):") for task in pending_tasks: print(f" Task ID: {task['task_id']}, Status: {task['status']}, Created: {task['created_time']}") else: print("No pending tasks") return # Start consumer print("=== Starting File Processing Queue Consumer ===") consumer.start() if __name__ == "__main__": main()