#!/usr/bin/env python3 """ 优化的统一启动脚本 - 整合 FastAPI 应用和队列消费者 支持性能监控、自动重启、优雅关闭等功能 """ import os import sys import argparse import multiprocessing import signal import time import subprocess import threading from pathlib import Path from typing import List, Optional, Dict, Any class ProcessManager: """进程管理器,管理API服务和队列消费者""" def __init__(self): self.processes: Dict[str, subprocess.Popen] = {} self.running = False self.shutdown_event = threading.Event() # 注册信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """信号处理器,用于优雅关闭""" print(f"\n收到信号 {signum},正在关闭所有服务...") self.running = False self.shutdown_event.set() def start_api_server(self, args) -> Optional[subprocess.Popen]: """启动 FastAPI 服务器""" print("正在启动 FastAPI 服务器...") # 构建 API 服务器命令 cmd = [ sys.executable, "-m", "uvicorn", "fastapi_app:app", "--host", args.host, "--port", str(args.port), "--workers", str(args.api_workers), "--log-level", args.log_level, "--access-log", ] # 添加 uvloop 支持(如果可用) try: import uvloop cmd.extend(["--loop", "uvloop"]) except ImportError: pass try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) # 启动输出监控线程 threading.Thread( target=self._monitor_output, args=(process, "API服务器"), daemon=True ).start() return process except Exception as e: print(f"启动 API 服务器失败: {e}") return None def start_queue_consumer(self, args) -> Optional[subprocess.Popen]: """启动队列消费者""" print("正在启动队列消费者...") # 构建队列消费者命令 cmd = [ sys.executable, "task_queue/consumer.py", "--workers", str(args.queue_workers), "--worker-type", args.worker_type ] try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 ) # 启动输出监控线程 threading.Thread( target=self._monitor_output, args=(process, "队列消费者"), daemon=True ).start() return process except Exception as e: print(f"启动队列消费者失败: {e}") return None def _monitor_output(self, process: subprocess.Popen, name: str): """监控进程输出""" try: for line in iter(process.stdout.readline, ''): if line.strip(): print(f"[{name}] {line.strip()}") except Exception: pass def _check_process(self, name: str, process: subprocess.Popen) -> bool: """检查进程是否健康""" if process.poll() is not None: print(f"[警告] {name} 进程已退出,退出码: {process.returncode}") return False return True def _restart_process(self, name: str, args) -> bool: """重启进程""" print(f"[重启] 正在重启 {name}...") if name == "API服务器": new_process = self.start_api_server(args) elif name == "队列消费者": new_process = self.start_queue_consumer(args) else: return False if new_process: self.processes[name] = new_process print(f"[重启] {name} 重启成功,PID: {new_process.pid}") return True else: print(f"[错误] {name} 重启失败") return False def run(self, args): """运行所有服务""" print("=" * 70) print("Qwen-Agent 优化统一启动脚本") print("=" * 70) # 设置环境 self.setup_environment(args) # 创建必要目录 self.create_directories() # 启动服务 api_process = self.start_api_server(args) if not api_process: print("无法启动 API 服务器,退出") return False queue_process = self.start_queue_consumer(args) if not queue_process: print("无法启动队列消费者,退出") api_process.terminate() return False self.processes["API服务器"] = api_process self.processes["队列消费者"] = queue_process print("\n" + "=" * 70) print("所有服务启动成功!") print(f"API服务器: http://{args.host}:{args.port}") print(f"API PID: {api_process.pid}") print(f"队列消费者 PID: {queue_process.pid}") print("按 Ctrl+C 停止所有服务") print("=" * 70 + "\n") self.running = True # 主监控循环 restart_counts = {"API服务器": 0, "队列消费者": 0} max_restarts = args.max_restarts while self.running and not self.shutdown_event.is_set(): try: # 检查所有进程状态 for name, process in list(self.processes.items()): if not self._check_process(name, process): if restart_counts[name] < max_restarts: if self._restart_process(name, args): restart_counts[name] += 1 print(f"[重启] {name} 已重启 {restart_counts[name]}/{max_restarts} 次") else: print(f"[错误] {name} 重启失败,停止所有服务") self.running = False break else: print(f"[错误] {name} 重启次数已达上限 ({max_restarts}),停止所有服务") self.running = False break # 等待检查间隔 if self.shutdown_event.wait(timeout=args.check_interval): break except KeyboardInterrupt: break except Exception as e: print(f"[错误] 监控循环异常: {e}") time.sleep(1) # 优雅关闭所有进程 self.shutdown_all() return True def setup_environment(self, args): """设置环境变量""" # 根据配置文件设置环境变量 if args.profile == "low_memory": env_vars = { 'TOKENIZERS_PARALLELISM': 'false', 'AGENT_CACHE_MAX_SIZE': '20', } elif args.profile == "balanced": env_vars = { 'TOKENIZERS_PARALLELISM': 'true', 'TOKENIZERS_FAST': '1', 'AGENT_CACHE_MAX_SIZE': '50', } elif args.profile == "high_performance": env_vars = { 'TOKENIZERS_PARALLELISM': 'true', 'TOKENIZERS_FAST': '1', 'AGENT_CACHE_MAX_SIZE': '100', } # 通用优化 env_vars.update({ 'PYTHONUNBUFFERED': '1', 'PYTHONDONTWRITEBYTECODE': '1', }) for key, value in env_vars.items(): os.environ[key] = value print(f"已应用 {args.profile} 性能配置") def create_directories(self): """创建必要的目录""" directories = [ "projects/queue_data", "projects/data", "projects/uploads", "projects/robot", ] for directory in directories: Path(directory).mkdir(parents=True, exist_ok=True) print("项目目录创建完成") def shutdown_all(self): """优雅关闭所有进程""" print("\n" + "=" * 70) print("正在关闭所有服务...") print("=" * 70) for name, process in self.processes.items(): try: print(f"正在关闭 {name} (PID: {process.pid})...") # 发送 SIGTERM 信号 process.terminate() # 等待进程优雅关闭 try: process.wait(timeout=10) print(f"{name} 已优雅关闭") except subprocess.TimeoutExpired: print(f"{name} 未能在10秒内关闭,强制终止...") process.kill() process.wait() print(f"{name} 已强制终止") except Exception as e: print(f"关闭 {name} 时出错: {e}") print("所有服务已关闭") def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser(description="优化的Qwen-Agent统一启动脚本") # API服务器配置 parser.add_argument("--host", type=str, default="0.0.0.0", help="API绑定主机地址") parser.add_argument("--port", type=int, default=8001, help="API绑定端口") parser.add_argument("--api-workers", type=int, default=None, help="API工作进程数量") parser.add_argument("--log-level", type=str, default="info", choices=["debug", "info", "warning", "error"], help="日志级别") # 队列消费者配置 parser.add_argument("--queue-workers", type=int, default=2, help="队列消费者工作线程数") parser.add_argument("--worker-type", type=str, default="threads", choices=["threads", "greenlets", "gevent"], help="队列工作类型") # 性能配置 parser.add_argument("--profile", type=str, default="balanced", choices=["low_memory", "balanced", "high_performance"], help="性能配置文件") # 监控和重启配置 parser.add_argument("--check-interval", type=int, default=5, help="进程检查间隔(秒)") parser.add_argument("--max-restarts", type=int, default=3, help="最大重启次数") return parser.parse_args() def main(): """主函数""" args = parse_args() # 计算推荐的工作进程数 if args.api_workers is None: cpu_count = multiprocessing.cpu_count() if args.profile == "low_memory": args.api_workers = min(2, cpu_count) elif args.profile == "balanced": args.api_workers = min(4, cpu_count + 1) elif args.profile == "high_performance": args.api_workers = min(8, cpu_count * 2) # 创建进程管理器并运行 manager = ProcessManager() success = manager.run(args) sys.exit(0 if success else 1) if __name__ == "__main__": main()