#!/usr/bin/env python3 """ Optimized unified startup script for the FastAPI application. Supports performance monitoring, automatic restart, graceful shutdown, and related features. """ import os import sys import argparse import multiprocessing import signal import time import subprocess import threading from pathlib import Path from typing import List, Optional, Dict, Any class ProcessManager: """Process manager that controls the API service.""" def __init__(self): self.processes: Dict[str, subprocess.Popen] = {} self.running = False self.shutdown_event = threading.Event() # Register signal handlers signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """Signal handler for graceful shutdown.""" print(f"\nReceived signal {signum}; shutting down all services...") self.running = False self.shutdown_event.set() def start_api_server(self, args) -> Optional[subprocess.Popen]: """Start the FastAPI server.""" print("Starting FastAPI server...") # Build the API server command cmd = [ sys.executable, "-m", "uvicorn", "fastapi_app:app", "--host", args.host, "--port", str(args.port), "--workers", str(args.api_workers), "--log-level", args.log_level, "--access-log", ] # Add uvloop support if available try: import uvloop cmd.extend(["--loop", "uvloop"]) except ImportError: pass try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) # Start the output monitoring thread threading.Thread( target=self._monitor_output, args=(process, "API server"), daemon=True ).start() return process except Exception as e: print(f"Failed to start API server: {e}") return None def _monitor_output(self, process: subprocess.Popen, name: str): """Monitor process output.""" try: for line in iter(process.stdout.readline, ''): if line.strip(): print(f"[{name}] {line.strip()}") except Exception: pass def _check_process(self, name: str, process: subprocess.Popen) -> bool: """Check whether the process is healthy.""" if process.poll() is not None: print(f"[Warning] {name} process exited with code: {process.returncode}") return False return True def _restart_process(self, name: str, args) -> bool: """Restart a process.""" print(f"[Restart] Restarting {name}...") if name == "API server": new_process = self.start_api_server(args) else: return False if new_process: self.processes[name] = new_process print(f"[Restart] {name} restarted successfully, PID: {new_process.pid}") return True else: print(f"[Error] Failed to restart {name}") return False def run(self, args): """Run all services.""" print("=" * 70) print("Qwen-Agent optimized unified startup script") print("=" * 70) # Set up the environment self.setup_environment(args) # Create required directories self.create_directories() # Start services api_process = self.start_api_server(args) if not api_process: print("Failed to start API server; exiting") return False self.processes["API server"] = api_process print("\n" + "=" * 70) print("All services started successfully!") print(f"API server: http://{args.host}:{args.port}") print(f"API PID: {api_process.pid}") print("Press Ctrl+C to stop all services") print("=" * 70 + "\n") self.running = True # Main monitoring loop restart_counts = {"API server": 0} max_restarts = args.max_restarts while self.running and not self.shutdown_event.is_set(): try: # Check the status of all processes for name, process in list(self.processes.items()): if not self._check_process(name, process): if restart_counts[name] < max_restarts: if self._restart_process(name, args): restart_counts[name] += 1 print(f"[Restart] {name} has been restarted {restart_counts[name]}/{max_restarts} times") else: print(f"[Error] Failed to restart {name}; stopping all services") self.running = False break else: print(f"[Error] {name} reached the restart limit ({max_restarts}); stopping all services") self.running = False break # Wait for the next check interval if self.shutdown_event.wait(timeout=args.check_interval): break except KeyboardInterrupt: break except Exception as e: print(f"[Error] Exception in monitoring loop: {e}") time.sleep(1) # Gracefully shut down all processes self.shutdown_all() return True def setup_environment(self, args): """Set environment variables.""" # Set environment variables based on the selected profile if args.profile == "low_memory": env_vars = { 'TOKENIZERS_PARALLELISM': 'false', 'TOOL_CACHE_MAX_SIZE': '10', 'CHECKPOINT_POOL_SIZE': '10', } elif args.profile == "balanced": env_vars = { 'TOKENIZERS_PARALLELISM': 'true', 'TOKENIZERS_FAST': '1', 'TOOL_CACHE_MAX_SIZE': '20', 'CHECKPOINT_POOL_SIZE': '15', } elif args.profile == "high_performance": env_vars = { 'TOKENIZERS_PARALLELISM': 'true', 'TOKENIZERS_FAST': '1', 'TOOL_CACHE_MAX_SIZE': '30', 'CHECKPOINT_POOL_SIZE': '20', } # General optimizations env_vars.update({ 'PYTHONUNBUFFERED': '1', 'PYTHONDONTWRITEBYTECODE': '1', }) for key, value in env_vars.items(): os.environ[key] = value print(f"Applied {args.profile} performance profile") def create_directories(self): """Create the required directories.""" directories = [ "projects/data", "projects/uploads", "projects/robot", ] for directory in directories: Path(directory).mkdir(parents=True, exist_ok=True) print("Project directories created") def shutdown_all(self): """Gracefully shut down all processes.""" print("\n" + "=" * 70) print("Shutting down all services...") print("=" * 70) for name, process in self.processes.items(): try: print(f"Shutting down {name} (PID: {process.pid})...") # Send SIGTERM process.terminate() # Wait for graceful shutdown try: process.wait(timeout=10) print(f"{name} shut down gracefully") except subprocess.TimeoutExpired: print(f"{name} did not exit within 10 seconds; forcing termination...") process.kill() process.wait() print(f"{name} was forcefully terminated") except Exception as e: print(f"Error while shutting down {name}: {e}") print("All services shut down") def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser(description="Optimized Qwen-Agent unified startup script") # API server configuration parser.add_argument("--host", type=str, default="0.0.0.0", help="API bind host") parser.add_argument("--port", type=int, default=8001, help="API bind port") parser.add_argument("--api-workers", type=int, default=None, help="Number of API worker processes") parser.add_argument("--log-level", type=str, default="info", choices=["debug", "info", "warning", "error"], help="Log level") # Performance profile parser.add_argument("--profile", type=str, default="low_memory", choices=["low_memory", "balanced", "high_performance"], help="Performance profile") # Monitoring and restart configuration parser.add_argument("--check-interval", type=int, default=5, help="Process check interval in seconds") parser.add_argument("--max-restarts", type=int, default=3, help="Maximum number of restarts") return parser.parse_args() def main(): """Main entry point.""" args = parse_args() # Compute the recommended number of worker processes if args.api_workers is None: cpu_count = multiprocessing.cpu_count() if args.profile == "low_memory": args.api_workers = min(1, cpu_count) elif args.profile == "balanced": args.api_workers = min(2, cpu_count + 1) elif args.profile == "high_performance": args.api_workers = min(4, cpu_count * 2) # Create the process manager and run it manager = ProcessManager() success = manager.run(args) sys.exit(0 if success else 1) if __name__ == "__main__": main()