qwen_agent/start_unified.py
2025-12-24 11:05:10 +08:00

352 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
优化的统一启动脚本 - 整合 FastAPI 应用和队列消费者
支持性能监控、自动重启、优雅关闭等功能
"""
import os
import sys
import argparse
import multiprocessing
import signal
import time
import subprocess
import threading
from pathlib import Path
from typing import List, Optional, Dict, Any
class ProcessManager:
"""进程管理器管理API服务和队列消费者"""
def __init__(self):
self.processes: Dict[str, subprocess.Popen] = {}
self.running = False
self.shutdown_event = threading.Event()
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""信号处理器,用于优雅关闭"""
print(f"\n收到信号 {signum},正在关闭所有服务...")
self.running = False
self.shutdown_event.set()
def start_api_server(self, args) -> Optional[subprocess.Popen]:
"""启动 FastAPI 服务器"""
print("正在启动 FastAPI 服务器...")
# 构建 API 服务器命令
cmd = [
sys.executable, "-m", "uvicorn",
"fastapi_app:app",
"--host", args.host,
"--port", str(args.port),
"--workers", str(args.api_workers),
"--log-level", args.log_level,
"--access-log",
]
# 添加 uvloop 支持(如果可用)
try:
import uvloop
cmd.extend(["--loop", "uvloop"])
except ImportError:
pass
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# 启动输出监控线程
threading.Thread(
target=self._monitor_output,
args=(process, "API服务器"),
daemon=True
).start()
return process
except Exception as e:
print(f"启动 API 服务器失败: {e}")
return None
def start_queue_consumer(self, args) -> Optional[subprocess.Popen]:
"""启动队列消费者"""
print("正在启动队列消费者...")
# 构建队列消费者命令
cmd = [
sys.executable,
"task_queue/consumer.py",
"--workers", str(args.queue_workers),
"--worker-type", args.worker_type
]
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# 启动输出监控线程
threading.Thread(
target=self._monitor_output,
args=(process, "队列消费者"),
daemon=True
).start()
return process
except Exception as e:
print(f"启动队列消费者失败: {e}")
return None
def _monitor_output(self, process: subprocess.Popen, name: str):
"""监控进程输出"""
try:
for line in iter(process.stdout.readline, ''):
if line.strip():
print(f"[{name}] {line.strip()}")
except Exception:
pass
def _check_process(self, name: str, process: subprocess.Popen) -> bool:
"""检查进程是否健康"""
if process.poll() is not None:
print(f"[警告] {name} 进程已退出,退出码: {process.returncode}")
return False
return True
def _restart_process(self, name: str, args) -> bool:
"""重启进程"""
print(f"[重启] 正在重启 {name}...")
if name == "API服务器":
new_process = self.start_api_server(args)
elif name == "队列消费者":
new_process = self.start_queue_consumer(args)
else:
return False
if new_process:
self.processes[name] = new_process
print(f"[重启] {name} 重启成功PID: {new_process.pid}")
return True
else:
print(f"[错误] {name} 重启失败")
return False
def run(self, args):
"""运行所有服务"""
print("=" * 70)
print("Qwen-Agent 优化统一启动脚本")
print("=" * 70)
# 设置环境
self.setup_environment(args)
# 创建必要目录
self.create_directories()
# 启动服务
api_process = self.start_api_server(args)
if not api_process:
print("无法启动 API 服务器,退出")
return False
queue_process = self.start_queue_consumer(args)
if not queue_process:
print("无法启动队列消费者,退出")
api_process.terminate()
return False
self.processes["API服务器"] = api_process
self.processes["队列消费者"] = queue_process
print("\n" + "=" * 70)
print("所有服务启动成功!")
print(f"API服务器: http://{args.host}:{args.port}")
print(f"API PID: {api_process.pid}")
print(f"队列消费者 PID: {queue_process.pid}")
print("按 Ctrl+C 停止所有服务")
print("=" * 70 + "\n")
self.running = True
# 主监控循环
restart_counts = {"API服务器": 0, "队列消费者": 0}
max_restarts = args.max_restarts
while self.running and not self.shutdown_event.is_set():
try:
# 检查所有进程状态
for name, process in list(self.processes.items()):
if not self._check_process(name, process):
if restart_counts[name] < max_restarts:
if self._restart_process(name, args):
restart_counts[name] += 1
print(f"[重启] {name} 已重启 {restart_counts[name]}/{max_restarts}")
else:
print(f"[错误] {name} 重启失败,停止所有服务")
self.running = False
break
else:
print(f"[错误] {name} 重启次数已达上限 ({max_restarts}),停止所有服务")
self.running = False
break
# 等待检查间隔
if self.shutdown_event.wait(timeout=args.check_interval):
break
except KeyboardInterrupt:
break
except Exception as e:
print(f"[错误] 监控循环异常: {e}")
time.sleep(1)
# 优雅关闭所有进程
self.shutdown_all()
return True
def setup_environment(self, args):
"""设置环境变量"""
# 根据配置文件设置环境变量
if args.profile == "low_memory":
env_vars = {
'TOKENIZERS_PARALLELISM': 'false',
'TOOL_CACHE_MAX_SIZE': '10',
'CHECKPOINT_POOL_SIZE': '10',
}
elif args.profile == "balanced":
env_vars = {
'TOKENIZERS_PARALLELISM': 'true',
'TOKENIZERS_FAST': '1',
'TOOL_CACHE_MAX_SIZE': '20',
'CHECKPOINT_POOL_SIZE': '15',
}
elif args.profile == "high_performance":
env_vars = {
'TOKENIZERS_PARALLELISM': 'true',
'TOKENIZERS_FAST': '1',
'TOOL_CACHE_MAX_SIZE': '30',
'CHECKPOINT_POOL_SIZE': '20',
}
# 通用优化
env_vars.update({
'PYTHONUNBUFFERED': '1',
'PYTHONDONTWRITEBYTECODE': '1',
})
for key, value in env_vars.items():
os.environ[key] = value
print(f"已应用 {args.profile} 性能配置")
def create_directories(self):
"""创建必要的目录"""
directories = [
"projects/queue_data",
"projects/data",
"projects/uploads",
"projects/robot",
]
for directory in directories:
Path(directory).mkdir(parents=True, exist_ok=True)
print("项目目录创建完成")
def shutdown_all(self):
"""优雅关闭所有进程"""
print("\n" + "=" * 70)
print("正在关闭所有服务...")
print("=" * 70)
for name, process in self.processes.items():
try:
print(f"正在关闭 {name} (PID: {process.pid})...")
# 发送 SIGTERM 信号
process.terminate()
# 等待进程优雅关闭
try:
process.wait(timeout=10)
print(f"{name} 已优雅关闭")
except subprocess.TimeoutExpired:
print(f"{name} 未能在10秒内关闭强制终止...")
process.kill()
process.wait()
print(f"{name} 已强制终止")
except Exception as e:
print(f"关闭 {name} 时出错: {e}")
print("所有服务已关闭")
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="优化的Qwen-Agent统一启动脚本")
# API服务器配置
parser.add_argument("--host", type=str, default="0.0.0.0", help="API绑定主机地址")
parser.add_argument("--port", type=int, default=8001, help="API绑定端口")
parser.add_argument("--api-workers", type=int, default=None, help="API工作进程数量")
parser.add_argument("--log-level", type=str, default="info",
choices=["debug", "info", "warning", "error"], help="日志级别")
# 队列消费者配置
parser.add_argument("--queue-workers", type=int, default=2, help="队列消费者工作线程数")
parser.add_argument("--worker-type", type=str, default="threads",
choices=["threads", "greenlets", "gevent"], help="队列工作类型")
# 性能配置
parser.add_argument("--profile", type=str, default="low_memory",
choices=["low_memory", "balanced", "high_performance"], help="性能配置文件")
# 监控和重启配置
parser.add_argument("--check-interval", type=int, default=5, help="进程检查间隔(秒)")
parser.add_argument("--max-restarts", type=int, default=3, help="最大重启次数")
return parser.parse_args()
def main():
"""主函数"""
args = parse_args()
# 计算推荐的工作进程数
if args.api_workers is None:
cpu_count = multiprocessing.cpu_count()
if args.profile == "low_memory":
args.api_workers = min(1, cpu_count)
elif args.profile == "balanced":
args.api_workers = min(2, cpu_count + 1)
elif args.profile == "high_performance":
args.api_workers = min(4, cpu_count * 2)
# 创建进程管理器并运行
manager = ProcessManager()
success = manager.run(args)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()