qwen_agent/start_unified.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

355 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Optimized unified startup script combining the FastAPI application and queue consumer.
Supports performance monitoring, automatic restart, graceful shutdown, and related features.
"""
import os
import sys
import argparse
import multiprocessing
import signal
import time
import subprocess
import threading
from pathlib import Path
from typing import List, Optional, Dict, Any
class ProcessManager:
"""Process manager that controls the API service and queue consumer."""
def __init__(self):
self.processes: Dict[str, subprocess.Popen] = {}
self.running = False
self.shutdown_event = threading.Event()
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Signal handler for graceful shutdown."""
print(f"\nReceived signal {signum}; shutting down all services...")
self.running = False
self.shutdown_event.set()
def start_api_server(self, args) -> Optional[subprocess.Popen]:
"""Start the FastAPI server."""
print("Starting FastAPI server...")
# Build the API server command
cmd = [
sys.executable, "-m", "uvicorn",
"fastapi_app:app",
"--host", args.host,
"--port", str(args.port),
"--workers", str(args.api_workers),
"--log-level", args.log_level,
"--access-log",
]
# Add uvloop support if available
try:
import uvloop
cmd.extend(["--loop", "uvloop"])
except ImportError:
pass
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# Start the output monitoring thread
threading.Thread(
target=self._monitor_output,
args=(process, "API server"),
daemon=True
).start()
return process
except Exception as e:
print(f"Failed to start API server: {e}")
return None
def start_queue_consumer(self, args) -> Optional[subprocess.Popen]:
"""Start the queue consumer."""
print("Starting queue consumer...")
consumer_script = Path("task_queue/consumer.py")
if not consumer_script.exists():
consumer_script = consumer_script.with_suffix(".pyc")
# Build the queue consumer command
cmd = [
sys.executable,
str(consumer_script),
"--workers", str(args.queue_workers),
"--worker-type", args.worker_type
]
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# Start the output monitoring thread
threading.Thread(
target=self._monitor_output,
args=(process, "Queue consumer"),
daemon=True
).start()
return process
except Exception as e:
print(f"Failed to start queue consumer: {e}")
return None
def _monitor_output(self, process: subprocess.Popen, name: str):
"""Monitor process output."""
try:
for line in iter(process.stdout.readline, ''):
if line.strip():
print(f"[{name}] {line.strip()}")
except Exception:
pass
def _check_process(self, name: str, process: subprocess.Popen) -> bool:
"""Check whether the process is healthy."""
if process.poll() is not None:
print(f"[Warning] {name} process exited with code: {process.returncode}")
return False
return True
def _restart_process(self, name: str, args) -> bool:
"""Restart a process."""
print(f"[Restart] Restarting {name}...")
if name == "API server":
new_process = self.start_api_server(args)
elif name == "Queue consumer":
new_process = self.start_queue_consumer(args)
else:
return False
if new_process:
self.processes[name] = new_process
print(f"[Restart] {name} restarted successfully, PID: {new_process.pid}")
return True
else:
print(f"[Error] Failed to restart {name}")
return False
def run(self, args):
"""Run all services."""
print("=" * 70)
print("Qwen-Agent optimized unified startup script")
print("=" * 70)
# Set up the environment
self.setup_environment(args)
# Create required directories
self.create_directories()
# Start services
api_process = self.start_api_server(args)
if not api_process:
print("Failed to start API server; exiting")
return False
queue_process = self.start_queue_consumer(args)
if not queue_process:
print("Failed to start queue consumer; exiting")
api_process.terminate()
return False
self.processes["API server"] = api_process
self.processes["Queue consumer"] = queue_process
print("\n" + "=" * 70)
print("All services started successfully!")
print(f"API server: http://{args.host}:{args.port}")
print(f"API PID: {api_process.pid}")
print(f"Queue consumer PID: {queue_process.pid}")
print("Press Ctrl+C to stop all services")
print("=" * 70 + "\n")
self.running = True
# Main monitoring loop
restart_counts = {"API server": 0, "Queue consumer": 0}
max_restarts = args.max_restarts
while self.running and not self.shutdown_event.is_set():
try:
# Check the status of all processes
for name, process in list(self.processes.items()):
if not self._check_process(name, process):
if restart_counts[name] < max_restarts:
if self._restart_process(name, args):
restart_counts[name] += 1
print(f"[Restart] {name} has been restarted {restart_counts[name]}/{max_restarts} times")
else:
print(f"[Error] Failed to restart {name}; stopping all services")
self.running = False
break
else:
print(f"[Error] {name} reached the restart limit ({max_restarts}); stopping all services")
self.running = False
break
# Wait for the next check interval
if self.shutdown_event.wait(timeout=args.check_interval):
break
except KeyboardInterrupt:
break
except Exception as e:
print(f"[Error] Exception in monitoring loop: {e}")
time.sleep(1)
# Gracefully shut down all processes
self.shutdown_all()
return True
def setup_environment(self, args):
"""Set environment variables."""
# Set environment variables based on the selected profile
if args.profile == "low_memory":
env_vars = {
'TOKENIZERS_PARALLELISM': 'false',
'TOOL_CACHE_MAX_SIZE': '10',
'CHECKPOINT_POOL_SIZE': '10',
}
elif args.profile == "balanced":
env_vars = {
'TOKENIZERS_PARALLELISM': 'true',
'TOKENIZERS_FAST': '1',
'TOOL_CACHE_MAX_SIZE': '20',
'CHECKPOINT_POOL_SIZE': '15',
}
elif args.profile == "high_performance":
env_vars = {
'TOKENIZERS_PARALLELISM': 'true',
'TOKENIZERS_FAST': '1',
'TOOL_CACHE_MAX_SIZE': '30',
'CHECKPOINT_POOL_SIZE': '20',
}
# General optimizations
env_vars.update({
'PYTHONUNBUFFERED': '1',
'PYTHONDONTWRITEBYTECODE': '1',
})
for key, value in env_vars.items():
os.environ[key] = value
print(f"Applied {args.profile} performance profile")
def create_directories(self):
"""Create the required directories."""
directories = [
"projects/queue_data",
"projects/data",
"projects/uploads",
"projects/robot",
]
for directory in directories:
Path(directory).mkdir(parents=True, exist_ok=True)
print("Project directories created")
def shutdown_all(self):
"""Gracefully shut down all processes."""
print("\n" + "=" * 70)
print("Shutting down all services...")
print("=" * 70)
for name, process in self.processes.items():
try:
print(f"Shutting down {name} (PID: {process.pid})...")
# Send SIGTERM
process.terminate()
# Wait for graceful shutdown
try:
process.wait(timeout=10)
print(f"{name} shut down gracefully")
except subprocess.TimeoutExpired:
print(f"{name} did not exit within 10 seconds; forcing termination...")
process.kill()
process.wait()
print(f"{name} was forcefully terminated")
except Exception as e:
print(f"Error while shutting down {name}: {e}")
print("All services shut down")
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Optimized Qwen-Agent unified startup script")
# API server configuration
parser.add_argument("--host", type=str, default="0.0.0.0", help="API bind host")
parser.add_argument("--port", type=int, default=8001, help="API bind port")
parser.add_argument("--api-workers", type=int, default=None, help="Number of API worker processes")
parser.add_argument("--log-level", type=str, default="info",
choices=["debug", "info", "warning", "error"], help="Log level")
# Queue consumer configuration
parser.add_argument("--queue-workers", type=int, default=2, help="Number of queue consumer worker threads")
parser.add_argument("--worker-type", type=str, default="threads",
choices=["threads", "greenlets", "gevent"], help="Queue worker type")
# Performance profile
parser.add_argument("--profile", type=str, default="low_memory",
choices=["low_memory", "balanced", "high_performance"], help="Performance profile")
# Monitoring and restart configuration
parser.add_argument("--check-interval", type=int, default=5, help="Process check interval in seconds")
parser.add_argument("--max-restarts", type=int, default=3, help="Maximum number of restarts")
return parser.parse_args()
def main():
"""Main entry point."""
args = parse_args()
# Compute the recommended number of worker processes
if args.api_workers is None:
cpu_count = multiprocessing.cpu_count()
if args.profile == "low_memory":
args.api_workers = min(1, cpu_count)
elif args.profile == "balanced":
args.api_workers = min(2, cpu_count + 1)
elif args.profile == "high_performance":
args.api_workers = min(4, cpu_count * 2)
# Create the process manager and run it
manager = ProcessManager()
success = manager.run(args)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()