The local file-parsing pipeline (upload -> Huey async parse -> generate projects/data/.../document.txt) is no longer needed: RAG retrieval runs against the backend vector store and does not read the local parse output, so removing this has zero impact on existing bot Q&A. - Delete task_queue/ (Huey queue, consumer, tasks, task status store) - Delete parsing utils: dataset_manager, single_file_processor, data_merger, project_manager - Delete db_manager.py (only managed task_status.db) - routes/files.py: keep only POST /api/v1/upload; drop all parse/queue/task endpoints - routes/projects.py: drop /tasks endpoint and task_status import - utils/__init__.py & api_models.py: remove exports/models for deleted modules and queue task models - start_unified.py & start_all_optimized.sh: no longer launch the queue consumer - Drop huey dependency (keep redis) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
301 lines
10 KiB
Python
Executable File
301 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Optimized unified startup script for the FastAPI application.
|
|
Supports performance monitoring, automatic restart, graceful shutdown, and related features.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import multiprocessing
|
|
import signal
|
|
import time
|
|
import subprocess
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
|
|
class ProcessManager:
|
|
"""Process manager that controls the API service."""
|
|
|
|
def __init__(self):
|
|
self.processes: Dict[str, subprocess.Popen] = {}
|
|
self.running = False
|
|
self.shutdown_event = threading.Event()
|
|
|
|
# Register signal handlers
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Signal handler for graceful shutdown."""
|
|
print(f"\nReceived signal {signum}; shutting down all services...")
|
|
self.running = False
|
|
self.shutdown_event.set()
|
|
|
|
def start_api_server(self, args) -> Optional[subprocess.Popen]:
|
|
"""Start the FastAPI server."""
|
|
print("Starting FastAPI server...")
|
|
|
|
# Build the API server command
|
|
cmd = [
|
|
sys.executable, "-m", "uvicorn",
|
|
"fastapi_app:app",
|
|
"--host", args.host,
|
|
"--port", str(args.port),
|
|
"--workers", str(args.api_workers),
|
|
"--log-level", args.log_level,
|
|
"--access-log",
|
|
]
|
|
|
|
# Add uvloop support if available
|
|
try:
|
|
import uvloop
|
|
cmd.extend(["--loop", "uvloop"])
|
|
except ImportError:
|
|
pass
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True,
|
|
bufsize=1
|
|
)
|
|
|
|
# Start the output monitoring thread
|
|
threading.Thread(
|
|
target=self._monitor_output,
|
|
args=(process, "API server"),
|
|
daemon=True
|
|
).start()
|
|
|
|
return process
|
|
|
|
except Exception as e:
|
|
print(f"Failed to start API server: {e}")
|
|
return None
|
|
|
|
def _monitor_output(self, process: subprocess.Popen, name: str):
|
|
"""Monitor process output."""
|
|
try:
|
|
for line in iter(process.stdout.readline, ''):
|
|
if line.strip():
|
|
print(f"[{name}] {line.strip()}")
|
|
except Exception:
|
|
pass
|
|
|
|
def _check_process(self, name: str, process: subprocess.Popen) -> bool:
|
|
"""Check whether the process is healthy."""
|
|
if process.poll() is not None:
|
|
print(f"[Warning] {name} process exited with code: {process.returncode}")
|
|
return False
|
|
return True
|
|
|
|
def _restart_process(self, name: str, args) -> bool:
|
|
"""Restart a process."""
|
|
print(f"[Restart] Restarting {name}...")
|
|
|
|
if name == "API server":
|
|
new_process = self.start_api_server(args)
|
|
else:
|
|
return False
|
|
|
|
if new_process:
|
|
self.processes[name] = new_process
|
|
print(f"[Restart] {name} restarted successfully, PID: {new_process.pid}")
|
|
return True
|
|
else:
|
|
print(f"[Error] Failed to restart {name}")
|
|
return False
|
|
|
|
def run(self, args):
|
|
"""Run all services."""
|
|
print("=" * 70)
|
|
print("Qwen-Agent optimized unified startup script")
|
|
print("=" * 70)
|
|
|
|
# Set up the environment
|
|
self.setup_environment(args)
|
|
|
|
# Create required directories
|
|
self.create_directories()
|
|
|
|
# Start services
|
|
api_process = self.start_api_server(args)
|
|
if not api_process:
|
|
print("Failed to start API server; exiting")
|
|
return False
|
|
|
|
self.processes["API server"] = api_process
|
|
|
|
print("\n" + "=" * 70)
|
|
print("All services started successfully!")
|
|
print(f"API server: http://{args.host}:{args.port}")
|
|
print(f"API PID: {api_process.pid}")
|
|
print("Press Ctrl+C to stop all services")
|
|
print("=" * 70 + "\n")
|
|
|
|
self.running = True
|
|
|
|
# Main monitoring loop
|
|
restart_counts = {"API server": 0}
|
|
max_restarts = args.max_restarts
|
|
|
|
while self.running and not self.shutdown_event.is_set():
|
|
try:
|
|
# Check the status of all processes
|
|
for name, process in list(self.processes.items()):
|
|
if not self._check_process(name, process):
|
|
if restart_counts[name] < max_restarts:
|
|
if self._restart_process(name, args):
|
|
restart_counts[name] += 1
|
|
print(f"[Restart] {name} has been restarted {restart_counts[name]}/{max_restarts} times")
|
|
else:
|
|
print(f"[Error] Failed to restart {name}; stopping all services")
|
|
self.running = False
|
|
break
|
|
else:
|
|
print(f"[Error] {name} reached the restart limit ({max_restarts}); stopping all services")
|
|
self.running = False
|
|
break
|
|
|
|
# Wait for the next check interval
|
|
if self.shutdown_event.wait(timeout=args.check_interval):
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
print(f"[Error] Exception in monitoring loop: {e}")
|
|
time.sleep(1)
|
|
|
|
# Gracefully shut down all processes
|
|
self.shutdown_all()
|
|
return True
|
|
|
|
def setup_environment(self, args):
|
|
"""Set environment variables."""
|
|
# Set environment variables based on the selected profile
|
|
if args.profile == "low_memory":
|
|
env_vars = {
|
|
'TOKENIZERS_PARALLELISM': 'false',
|
|
'TOOL_CACHE_MAX_SIZE': '10',
|
|
'CHECKPOINT_POOL_SIZE': '10',
|
|
}
|
|
elif args.profile == "balanced":
|
|
env_vars = {
|
|
'TOKENIZERS_PARALLELISM': 'true',
|
|
'TOKENIZERS_FAST': '1',
|
|
'TOOL_CACHE_MAX_SIZE': '20',
|
|
'CHECKPOINT_POOL_SIZE': '15',
|
|
}
|
|
elif args.profile == "high_performance":
|
|
env_vars = {
|
|
'TOKENIZERS_PARALLELISM': 'true',
|
|
'TOKENIZERS_FAST': '1',
|
|
'TOOL_CACHE_MAX_SIZE': '30',
|
|
'CHECKPOINT_POOL_SIZE': '20',
|
|
}
|
|
|
|
# General optimizations
|
|
env_vars.update({
|
|
'PYTHONUNBUFFERED': '1',
|
|
'PYTHONDONTWRITEBYTECODE': '1',
|
|
})
|
|
|
|
for key, value in env_vars.items():
|
|
os.environ[key] = value
|
|
|
|
print(f"Applied {args.profile} performance profile")
|
|
|
|
def create_directories(self):
|
|
"""Create the required directories."""
|
|
directories = [
|
|
"projects/data",
|
|
"projects/uploads",
|
|
"projects/robot",
|
|
]
|
|
|
|
for directory in directories:
|
|
Path(directory).mkdir(parents=True, exist_ok=True)
|
|
|
|
print("Project directories created")
|
|
|
|
def shutdown_all(self):
|
|
"""Gracefully shut down all processes."""
|
|
print("\n" + "=" * 70)
|
|
print("Shutting down all services...")
|
|
print("=" * 70)
|
|
|
|
for name, process in self.processes.items():
|
|
try:
|
|
print(f"Shutting down {name} (PID: {process.pid})...")
|
|
|
|
# Send SIGTERM
|
|
process.terminate()
|
|
|
|
# Wait for graceful shutdown
|
|
try:
|
|
process.wait(timeout=10)
|
|
print(f"{name} shut down gracefully")
|
|
except subprocess.TimeoutExpired:
|
|
print(f"{name} did not exit within 10 seconds; forcing termination...")
|
|
process.kill()
|
|
process.wait()
|
|
print(f"{name} was forcefully terminated")
|
|
|
|
except Exception as e:
|
|
print(f"Error while shutting down {name}: {e}")
|
|
|
|
print("All services shut down")
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(description="Optimized Qwen-Agent unified startup script")
|
|
|
|
# API server configuration
|
|
parser.add_argument("--host", type=str, default="0.0.0.0", help="API bind host")
|
|
parser.add_argument("--port", type=int, default=8001, help="API bind port")
|
|
parser.add_argument("--api-workers", type=int, default=None, help="Number of API worker processes")
|
|
parser.add_argument("--log-level", type=str, default="info",
|
|
choices=["debug", "info", "warning", "error"], help="Log level")
|
|
|
|
# Performance profile
|
|
parser.add_argument("--profile", type=str, default="low_memory",
|
|
choices=["low_memory", "balanced", "high_performance"], help="Performance profile")
|
|
|
|
# Monitoring and restart configuration
|
|
parser.add_argument("--check-interval", type=int, default=5, help="Process check interval in seconds")
|
|
parser.add_argument("--max-restarts", type=int, default=3, help="Maximum number of restarts")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
args = parse_args()
|
|
|
|
# Compute the recommended number of worker processes
|
|
if args.api_workers is None:
|
|
cpu_count = multiprocessing.cpu_count()
|
|
if args.profile == "low_memory":
|
|
args.api_workers = min(1, cpu_count)
|
|
elif args.profile == "balanced":
|
|
args.api_workers = min(2, cpu_count + 1)
|
|
elif args.profile == "high_performance":
|
|
args.api_workers = min(4, cpu_count * 2)
|
|
|
|
# Create the process manager and run it
|
|
manager = ProcessManager()
|
|
success = manager.run(args)
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|