352 lines
12 KiB
Python
Executable File
352 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
优化的统一启动脚本 - 整合 FastAPI 应用和队列消费者
|
||
支持性能监控、自动重启、优雅关闭等功能
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import multiprocessing
|
||
import signal
|
||
import time
|
||
import subprocess
|
||
import threading
|
||
from pathlib import Path
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
|
||
class ProcessManager:
|
||
"""进程管理器,管理API服务和队列消费者"""
|
||
|
||
def __init__(self):
|
||
self.processes: Dict[str, subprocess.Popen] = {}
|
||
self.running = False
|
||
self.shutdown_event = threading.Event()
|
||
|
||
# 注册信号处理
|
||
signal.signal(signal.SIGINT, self._signal_handler)
|
||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||
|
||
def _signal_handler(self, signum, frame):
|
||
"""信号处理器,用于优雅关闭"""
|
||
print(f"\n收到信号 {signum},正在关闭所有服务...")
|
||
self.running = False
|
||
self.shutdown_event.set()
|
||
|
||
def start_api_server(self, args) -> Optional[subprocess.Popen]:
|
||
"""启动 FastAPI 服务器"""
|
||
print("正在启动 FastAPI 服务器...")
|
||
|
||
# 构建 API 服务器命令
|
||
cmd = [
|
||
sys.executable, "-m", "uvicorn",
|
||
"fastapi_app:app",
|
||
"--host", args.host,
|
||
"--port", str(args.port),
|
||
"--workers", str(args.api_workers),
|
||
"--log-level", args.log_level,
|
||
"--access-log",
|
||
]
|
||
|
||
# 添加 uvloop 支持(如果可用)
|
||
try:
|
||
import uvloop
|
||
cmd.extend(["--loop", "uvloop"])
|
||
except ImportError:
|
||
pass
|
||
|
||
try:
|
||
process = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
universal_newlines=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# 启动输出监控线程
|
||
threading.Thread(
|
||
target=self._monitor_output,
|
||
args=(process, "API服务器"),
|
||
daemon=True
|
||
).start()
|
||
|
||
return process
|
||
|
||
except Exception as e:
|
||
print(f"启动 API 服务器失败: {e}")
|
||
return None
|
||
|
||
def start_queue_consumer(self, args) -> Optional[subprocess.Popen]:
|
||
"""启动队列消费者"""
|
||
print("正在启动队列消费者...")
|
||
|
||
# 构建队列消费者命令
|
||
cmd = [
|
||
sys.executable,
|
||
"task_queue/consumer.py",
|
||
"--workers", str(args.queue_workers),
|
||
"--worker-type", args.worker_type
|
||
]
|
||
|
||
try:
|
||
process = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
universal_newlines=True,
|
||
bufsize=1
|
||
)
|
||
|
||
# 启动输出监控线程
|
||
threading.Thread(
|
||
target=self._monitor_output,
|
||
args=(process, "队列消费者"),
|
||
daemon=True
|
||
).start()
|
||
|
||
return process
|
||
|
||
except Exception as e:
|
||
print(f"启动队列消费者失败: {e}")
|
||
return None
|
||
|
||
def _monitor_output(self, process: subprocess.Popen, name: str):
|
||
"""监控进程输出"""
|
||
try:
|
||
for line in iter(process.stdout.readline, ''):
|
||
if line.strip():
|
||
print(f"[{name}] {line.strip()}")
|
||
except Exception:
|
||
pass
|
||
|
||
def _check_process(self, name: str, process: subprocess.Popen) -> bool:
|
||
"""检查进程是否健康"""
|
||
if process.poll() is not None:
|
||
print(f"[警告] {name} 进程已退出,退出码: {process.returncode}")
|
||
return False
|
||
return True
|
||
|
||
def _restart_process(self, name: str, args) -> bool:
|
||
"""重启进程"""
|
||
print(f"[重启] 正在重启 {name}...")
|
||
|
||
if name == "API服务器":
|
||
new_process = self.start_api_server(args)
|
||
elif name == "队列消费者":
|
||
new_process = self.start_queue_consumer(args)
|
||
else:
|
||
return False
|
||
|
||
if new_process:
|
||
self.processes[name] = new_process
|
||
print(f"[重启] {name} 重启成功,PID: {new_process.pid}")
|
||
return True
|
||
else:
|
||
print(f"[错误] {name} 重启失败")
|
||
return False
|
||
|
||
def run(self, args):
|
||
"""运行所有服务"""
|
||
print("=" * 70)
|
||
print("Qwen-Agent 优化统一启动脚本")
|
||
print("=" * 70)
|
||
|
||
# 设置环境
|
||
self.setup_environment(args)
|
||
|
||
# 创建必要目录
|
||
self.create_directories()
|
||
|
||
# 启动服务
|
||
api_process = self.start_api_server(args)
|
||
if not api_process:
|
||
print("无法启动 API 服务器,退出")
|
||
return False
|
||
|
||
queue_process = self.start_queue_consumer(args)
|
||
if not queue_process:
|
||
print("无法启动队列消费者,退出")
|
||
api_process.terminate()
|
||
return False
|
||
|
||
self.processes["API服务器"] = api_process
|
||
self.processes["队列消费者"] = queue_process
|
||
|
||
print("\n" + "=" * 70)
|
||
print("所有服务启动成功!")
|
||
print(f"API服务器: http://{args.host}:{args.port}")
|
||
print(f"API PID: {api_process.pid}")
|
||
print(f"队列消费者 PID: {queue_process.pid}")
|
||
print("按 Ctrl+C 停止所有服务")
|
||
print("=" * 70 + "\n")
|
||
|
||
self.running = True
|
||
|
||
# 主监控循环
|
||
restart_counts = {"API服务器": 0, "队列消费者": 0}
|
||
max_restarts = args.max_restarts
|
||
|
||
while self.running and not self.shutdown_event.is_set():
|
||
try:
|
||
# 检查所有进程状态
|
||
for name, process in list(self.processes.items()):
|
||
if not self._check_process(name, process):
|
||
if restart_counts[name] < max_restarts:
|
||
if self._restart_process(name, args):
|
||
restart_counts[name] += 1
|
||
print(f"[重启] {name} 已重启 {restart_counts[name]}/{max_restarts} 次")
|
||
else:
|
||
print(f"[错误] {name} 重启失败,停止所有服务")
|
||
self.running = False
|
||
break
|
||
else:
|
||
print(f"[错误] {name} 重启次数已达上限 ({max_restarts}),停止所有服务")
|
||
self.running = False
|
||
break
|
||
|
||
# 等待检查间隔
|
||
if self.shutdown_event.wait(timeout=args.check_interval):
|
||
break
|
||
|
||
except KeyboardInterrupt:
|
||
break
|
||
except Exception as e:
|
||
print(f"[错误] 监控循环异常: {e}")
|
||
time.sleep(1)
|
||
|
||
# 优雅关闭所有进程
|
||
self.shutdown_all()
|
||
return True
|
||
|
||
def setup_environment(self, args):
|
||
"""设置环境变量"""
|
||
# 根据配置文件设置环境变量
|
||
if args.profile == "low_memory":
|
||
env_vars = {
|
||
'TOKENIZERS_PARALLELISM': 'false',
|
||
'TOOL_CACHE_MAX_SIZE': '10',
|
||
'CHECKPOINT_POOL_SIZE': '10',
|
||
}
|
||
elif args.profile == "balanced":
|
||
env_vars = {
|
||
'TOKENIZERS_PARALLELISM': 'true',
|
||
'TOKENIZERS_FAST': '1',
|
||
'TOOL_CACHE_MAX_SIZE': '20',
|
||
'CHECKPOINT_POOL_SIZE': '15',
|
||
}
|
||
elif args.profile == "high_performance":
|
||
env_vars = {
|
||
'TOKENIZERS_PARALLELISM': 'true',
|
||
'TOKENIZERS_FAST': '1',
|
||
'TOOL_CACHE_MAX_SIZE': '30',
|
||
'CHECKPOINT_POOL_SIZE': '20',
|
||
}
|
||
|
||
# 通用优化
|
||
env_vars.update({
|
||
'PYTHONUNBUFFERED': '1',
|
||
'PYTHONDONTWRITEBYTECODE': '1',
|
||
})
|
||
|
||
for key, value in env_vars.items():
|
||
os.environ[key] = value
|
||
|
||
print(f"已应用 {args.profile} 性能配置")
|
||
|
||
def create_directories(self):
|
||
"""创建必要的目录"""
|
||
directories = [
|
||
"projects/queue_data",
|
||
"projects/data",
|
||
"projects/uploads",
|
||
"projects/robot",
|
||
]
|
||
|
||
for directory in directories:
|
||
Path(directory).mkdir(parents=True, exist_ok=True)
|
||
|
||
print("项目目录创建完成")
|
||
|
||
def shutdown_all(self):
|
||
"""优雅关闭所有进程"""
|
||
print("\n" + "=" * 70)
|
||
print("正在关闭所有服务...")
|
||
print("=" * 70)
|
||
|
||
for name, process in self.processes.items():
|
||
try:
|
||
print(f"正在关闭 {name} (PID: {process.pid})...")
|
||
|
||
# 发送 SIGTERM 信号
|
||
process.terminate()
|
||
|
||
# 等待进程优雅关闭
|
||
try:
|
||
process.wait(timeout=10)
|
||
print(f"{name} 已优雅关闭")
|
||
except subprocess.TimeoutExpired:
|
||
print(f"{name} 未能在10秒内关闭,强制终止...")
|
||
process.kill()
|
||
process.wait()
|
||
print(f"{name} 已强制终止")
|
||
|
||
except Exception as e:
|
||
print(f"关闭 {name} 时出错: {e}")
|
||
|
||
print("所有服务已关闭")
|
||
|
||
|
||
|
||
def parse_args():
|
||
"""解析命令行参数"""
|
||
parser = argparse.ArgumentParser(description="优化的Qwen-Agent统一启动脚本")
|
||
|
||
# API服务器配置
|
||
parser.add_argument("--host", type=str, default="0.0.0.0", help="API绑定主机地址")
|
||
parser.add_argument("--port", type=int, default=8001, help="API绑定端口")
|
||
parser.add_argument("--api-workers", type=int, default=None, help="API工作进程数量")
|
||
parser.add_argument("--log-level", type=str, default="info",
|
||
choices=["debug", "info", "warning", "error"], help="日志级别")
|
||
|
||
# 队列消费者配置
|
||
parser.add_argument("--queue-workers", type=int, default=2, help="队列消费者工作线程数")
|
||
parser.add_argument("--worker-type", type=str, default="threads",
|
||
choices=["threads", "greenlets", "gevent"], help="队列工作类型")
|
||
|
||
# 性能配置
|
||
parser.add_argument("--profile", type=str, default="low_memory",
|
||
choices=["low_memory", "balanced", "high_performance"], help="性能配置文件")
|
||
|
||
# 监控和重启配置
|
||
parser.add_argument("--check-interval", type=int, default=5, help="进程检查间隔(秒)")
|
||
parser.add_argument("--max-restarts", type=int, default=3, help="最大重启次数")
|
||
|
||
return parser.parse_args()
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
args = parse_args()
|
||
|
||
# 计算推荐的工作进程数
|
||
if args.api_workers is None:
|
||
cpu_count = multiprocessing.cpu_count()
|
||
if args.profile == "low_memory":
|
||
args.api_workers = min(1, cpu_count)
|
||
elif args.profile == "balanced":
|
||
args.api_workers = min(2, cpu_count + 1)
|
||
elif args.profile == "high_performance":
|
||
args.api_workers = min(4, cpu_count * 2)
|
||
|
||
# 创建进程管理器并运行
|
||
manager = ProcessManager()
|
||
success = manager.run(args)
|
||
|
||
sys.exit(0 if success else 1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|