#!/usr/bin/env python3 """ 系统配置优化 - 调整系统参数以提高并发性能 """ import os import asyncio import multiprocessing import threading import time import resource import logging from typing import Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor # 配置日志 logger = logging.getLogger('app') class SystemOptimizer: """系统优化器 调整系统参数以提高并发性能 """ def __init__(self): self.original_settings = {} self.optimized = False def optimize_system_settings(self): """优化系统设置""" if self.optimized: return # 保存原始设置 self._backup_original_settings() # 1. 优化文件描述符限制 try: soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) new_limit = min(65536, hard) # 增加到65536或硬件限制 resource.setrlimit(resource.RLIMIT_NOFILE, (new_limit, hard)) self.original_settings['RLIMIT_NOFILE'] = (soft, hard) logger.info(f"文件描述符限制从 {soft} 增加到 {new_limit}") except (ValueError, OSError) as e: logger.error(f"无法设置文件描述符限制: {e}") # 2. 优化线程栈大小 try: soft, hard = resource.getrlimit(resource.RLIMIT_STACK) new_stack = min(8 * 1024 * 1024, hard) # 8MB栈大小 resource.setrlimit(resource.RLIMIT_STACK, (new_stack, hard)) self.original_settings['RLIMIT_STACK'] = (soft, hard) logger.info(f"线程栈大小设置为 {new_stack // (1024*1024)}MB") except (ValueError, OSError) as e: logger.error(f"无法设置线程栈大小: {e}") # 3. 环境变量优化 env_vars = { # Python优化 'PYTHONUNBUFFERED': '1', # 禁用输出缓冲 'PYTHONDONTWRITEBYTECODE': '1', # 不写.pyc文件 # Tokenizer优化 - 允许适度的并行度 'TOKENIZERS_PARALLELISM': 'true', # 改为true以提高并发 'TOKENIZERS_FAST': '1', # 启用快速tokenizer # OpenMP优化 'OMP_NUM_THREADS': str(min(8, multiprocessing.cpu_count())), # 限制OpenMP线程 'OMP_WAIT_POLICY': 'PASSIVE', # 被动等待策略 # 内存优化 'MALLOC_TRIM_THRESHOLD_': '100000', # 内存整理阈值 # 网络优化 'TCP_NODELAY': '1', # 禁用Nagle算法 # Hugging Face优化 'TRANSFORMERS_CACHE': '/tmp/transformers_cache', # 使用tmpfs加速缓存 'HF_OFFLINE': '0', # 在线模式 # CUDA优化(如果使用GPU) 'CUDA_LAUNCH_BLOCKING': '0', # 异步CUDA启动 # asyncio优化 'UVLOOP_ENABLED': '1', # 启用uvloop(如果可用) } for key, value in env_vars.items(): if key not in os.environ: os.environ[key] = value logger.info(f"设置环境变量: {key}={value}") self.optimized = True logger.info("系统优化完成") def _backup_original_settings(self): """备份原始设置""" try: self.original_settings['RLIMIT_NOFILE'] = resource.getrlimit(resource.RLIMIT_NOFILE) self.original_settings['RLIMIT_STACK'] = resource.getrlimit(resource.RLIMIT_STACK) except: pass # 备份重要环境变量 env_keys = [ 'TOKENIZERS_PARALLELISM', 'PYTHONUNBUFFERED', 'PYTHONDONTWRITEBYTECODE', 'OMP_NUM_THREADS' ] for key in env_keys: if key in os.environ: self.original_settings[key] = os.environ[key] def restore_original_settings(self): """恢复原始设置""" if not self.original_settings: return logger.info("恢复原始系统设置...") # 恢复资源限制 if 'RLIMIT_NOFILE' in self.original_settings: try: resource.setrlimit(resource.RLIMIT_NOFILE, self.original_settings['RLIMIT_NOFILE']) logger.info(f"恢复文件描述符限制") except: pass if 'RLIMIT_STACK' in self.original_settings: try: resource.setrlimit(resource.RLIMIT_STACK, self.original_settings['RLIMIT_STACK']) logger.info(f"恢复线程栈大小") except: pass # 恢复环境变量 for key, value in self.original_settings.items(): if key.startswith('TOKENIZERS_') or key in ['PYTHONUNBUFFERED', 'PYTHONDONTWRITEBYTECODE']: if key in os.environ: del os.environ[key] logger.info(f"移除环境变量: {key}") elif key in ['OMP_NUM_THREADS'] and value is not None: os.environ[key] = value logger.info(f"恢复环境变量: {key}={value}") self.optimized = False logger.info("系统设置已恢复") class AsyncioOptimizer: """asyncio优化器""" @staticmethod def setup_event_loop_policy(): """设置优化的事件循环策略""" try: # 尝试使用uvloop(如果可用) import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logger.info("使用uvloop事件循环策略") except ImportError: logger.info("使用默认事件循环策略") # 设置线程池大小 cpu_count = multiprocessing.cpu_count() thread_pool_size = min(32, cpu_count * 4) # 每CPU核心4个线程,最多32个 # 注意:不能在这里设置默认线程池执行器,因为还没有运行事件循环 # 这个设置会在应用启动时进行 logger.info(f"建议线程池大小: {thread_pool_size}") @staticmethod def optimize_gunicorn_settings() -> Dict[str, Any]: """获取优化的Gunicorn设置""" cpu_count = multiprocessing.cpu_count() return { # Worker配置 'workers': min(8, cpu_count + 1), # 工作进程数 'worker_class': 'uvicorn.workers.UvicornWorker', # 使用Uvicorn worker 'worker_connections': 2000, # 每个worker的连接数 'max_requests': 5000, # 最大请求数后重启worker 'max_requests_jitter': 500, # 随机抖动 'preload_app': True, # 预加载应用 # 超时设置 'timeout': 120, # 工作超时 'keepalive': 5, # Keep-Alive超时 'graceful_timeout': 30, # 优雅关闭超时 # 性能优化 'worker_tmp_dir': '/dev/shm', # 使用内存文件系统 # 日志设置 'accesslog': '-', # 标准输出 'errorlog': '-', # 标准错误输出 'loglevel': 'info', } def setup_system_optimizations(): """设置系统优化""" # 1. 系统级优化 system_optimizer = SystemOptimizer() system_optimizer.optimize_system_settings() # 2. asyncio优化 asyncio_optimizer = AsyncioOptimizer() asyncio_optimizer.setup_event_loop_policy() return system_optimizer def create_performance_monitor() -> Dict[str, Any]: """创建性能监控配置""" return { 'monitor_interval': 60, # 监控间隔(秒) 'metrics': { 'memory_usage': True, 'cpu_usage': True, 'disk_io': True, 'network_io': True, 'active_connections': True, 'request_latency': True, 'cache_hit_rate': True, 'error_rate': True, }, 'alerts': { 'memory_threshold': 0.9, # 90%内存使用率告警 'cpu_threshold': 0.8, # 80%CPU使用率告警 'disk_threshold': 0.9, # 90%磁盘使用率告警 'error_threshold': 0.05, # 5%错误率告警 'latency_threshold': 5.0, # 5秒延迟告警 } } def get_optimized_worker_config() -> Dict[str, Any]: """获取优化的worker配置""" cpu_count = multiprocessing.cpu_count() memory_gb = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024.0 ** 3) # 基于资源限制的配置 max_workers = min( 16, # 最大worker数 max(2, cpu_count), # 至少2个worker,最多CPU核心数 int(memory_gb / 2) # 基于内存的worker限制(每worker 2GB) ) return { 'max_workers': max_workers, 'worker_connections': 1000, # 每个worker的连接数 'connection_pool_size': 100, # 连接池大小 'buffer_size': 8192, # 缓冲区大小 'timeout': 120, # 超时时间 'keepalive_timeout': 30, # Keep-Alive超时 } # 预定义的优化配置 OPTIMIZATION_CONFIGS = { 'low_memory': { 'max_workers': 2, 'worker_connections': 500, 'buffer_size': 4096, 'cache_size': 500, }, 'balanced': { 'max_workers': 4, 'worker_connections': 1000, 'buffer_size': 8192, 'cache_size': 1000, }, 'high_performance': { 'max_workers': 8, 'worker_connections': 2000, 'buffer_size': 16384, 'cache_size': 2000, } } def apply_optimization_profile(profile_name: str) -> Dict[str, Any]: """应用优化配置文件""" if profile_name not in OPTIMIZATION_CONFIGS: raise ValueError(f"未知的优化配置: {profile_name}") config = OPTIMIZATION_CONFIGS[profile_name].copy() # 添加系统特定配置 config.update({ 'profile_name': profile_name, 'applied_at': time.time(), 'cpu_count': multiprocessing.cpu_count(), 'memory_gb': os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024.0 ** 3) }) return config # 全局系统优化器实例 _global_system_optimizer: Optional[SystemOptimizer] = None def get_global_system_optimizer() -> SystemOptimizer: """获取全局系统优化器""" global _global_system_optimizer if _global_system_optimizer is None: _global_system_optimizer = SystemOptimizer() return _global_system_optimizer