diff --git a/utils/__init__.py b/utils/__init__.py index fd14a47..8f5ddac 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -37,6 +37,12 @@ from .file_loaded_agent_manager import ( init_global_agent_manager ) +# Import config cache module +from .config_cache import ( + config_cache, + ConfigFileCache +) + from .agent_pool import ( AgentPool, get_agent_pool, diff --git a/utils/config_cache.py b/utils/config_cache.py new file mode 100644 index 0000000..d663d00 --- /dev/null +++ b/utils/config_cache.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +配置文件缓存模块 - 提供异步文件读取缓存功能 +用于优化并发请求时的文件I/O性能 +""" +import asyncio +import os +import json +from typing import Dict, Tuple, Optional, Any +from utils.logger import logger + + +class ConfigFileCache: + """配置文件缓存类 + + 提供基于文件修改时间的缓存机制,避免重复读取未修改的文件 + """ + + def __init__(self): + self._cache: Dict[str, Tuple[Any, float]] = {} # {file_path: (content, mtime)} + self._lock = asyncio.Lock() + + async def get_text_file(self, file_path: str) -> Optional[str]: + """获取文本文件内容,带缓存机制 + + Args: + file_path: 文件路径 + + Returns: + 文件内容字符串,如果文件不存在或读取失败返回None + """ + async with self._lock: + if not os.path.exists(file_path): + return None + + current_mtime = os.path.getmtime(file_path) + + # 检查缓存是否有效 + if file_path in self._cache: + cached_content, cached_mtime = self._cache[file_path] + if current_mtime == cached_mtime: + logger.debug(f"使用缓存文件: {file_path}") + return cached_content + + # 读取文件并更新缓存 + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + self._cache[file_path] = (content, current_mtime) + logger.debug(f"缓存文件: {file_path}") + return content + except Exception as e: + logger.error(f"读取文本文件失败 {file_path}: {e}") + return None + + async def get_json_file(self, file_path: str) -> Optional[Dict]: + """获取JSON文件内容,带缓存机制 + + Args: + file_path: JSON文件路径 + + Returns: + 解析后的字典,如果文件不存在、读取失败或JSON解析失败返回None + """ + content = await self.get_text_file(file_path) + if content: + try: + return json.loads(content) + except json.JSONDecodeError as e: + logger.error(f"JSON解析失败 {file_path}: {e}") + return None + + def clear_cache(self, file_path: str = None): + """清理缓存 + + Args: + file_path: 要清理的文件路径,如果为None则清理所有缓存 + """ + if file_path: + self._cache.pop(file_path, None) + logger.debug(f"清理文件缓存: {file_path}") + else: + cleared_count = len(self._cache) + self._cache.clear() + logger.debug(f"清理所有缓存,共{cleared_count}个文件") + + def get_cache_stats(self) -> Dict: + """获取缓存统计信息 + + Returns: + 包含缓存统计信息的字典 + """ + return { + "cached_files": len(self._cache), + "cached_paths": list(self._cache.keys()) + } + + +# 全局缓存实例 +config_cache = ConfigFileCache() \ No newline at end of file diff --git a/utils/file_loaded_agent_manager.py b/utils/file_loaded_agent_manager.py index 23d82d5..620f272 100644 --- a/utils/file_loaded_agent_manager.py +++ b/utils/file_loaded_agent_manager.py @@ -17,13 +17,14 @@ import hashlib import time import json +import asyncio from typing import Dict, List, Optional from qwen_agent.agents import Assistant from qwen_agent.log import logger from modified_assistant import init_modified_agent_service_with_files, update_agent_llm -from .prompt_loader import load_system_prompt, load_mcp_settings +from .prompt_loader import load_system_prompt_async, load_mcp_settings_async class FileLoadedAgentManager: @@ -38,6 +39,7 @@ class FileLoadedAgentManager: self.access_times: Dict[str, float] = {} # LRU 访问时间管理 self.creation_times: Dict[str, float] = {} # 创建时间记录 self.max_cached_agents = max_cached_agents + self._creation_locks: Dict[str, asyncio.Lock] = {} # 防止并发创建相同agent的锁 def _get_cache_key(self, bot_id: str, model_name: str = None, api_key: str = None, model_server: str = None, generate_cfg: Dict = None, @@ -116,7 +118,7 @@ class FileLoadedAgentManager: Args: bot_id: 项目的唯一标识符 - project_dir: 项目目录路径,用于读取system_prompt.md和mcp_settings.json,可以为None + project_dir: 项目目录路径,用于读取README.md,可以为None model_name: 模型名称 api_key: API 密钥 model_server: 模型服务器地址 @@ -125,54 +127,65 @@ class FileLoadedAgentManager: system_prompt: 可选的系统提示词,优先级高于项目配置 mcp_settings: 可选的MCP设置,优先级高于项目配置 robot_type: 机器人类型,取值 agent/catalog_agent + user_identifier: 用户标识符 Returns: Assistant: 配置好的助手实例 """ import os - # 实现参数优先级逻辑:传入参数 > 项目配置 > 默认配置 - final_system_prompt = load_system_prompt(project_dir, language, system_prompt, robot_type, bot_id, user_identifier) - final_mcp_settings = load_mcp_settings(project_dir, mcp_settings, bot_id, robot_type) + # 使用异步加载配置文件(带缓存) + final_system_prompt = await load_system_prompt_async( + project_dir, language, system_prompt, robot_type, bot_id, user_identifier + ) + final_mcp_settings = await load_mcp_settings_async( + project_dir, mcp_settings, bot_id, robot_type + ) cache_key = self._get_cache_key(bot_id, model_name, api_key, model_server, generate_cfg, final_system_prompt, final_mcp_settings) - # 检查是否已存在该助手实例 - if cache_key in self.agents: - self._update_access_time(cache_key) - agent = self.agents[cache_key] + # 使用异步锁防止并发创建相同的agent + creation_lock = self._creation_locks.setdefault(cache_key, asyncio.Lock()) + async with creation_lock: + # 再次检查是否已存在该助手实例(获取锁后可能有其他请求已创建) + if cache_key in self.agents: + self._update_access_time(cache_key) + agent = self.agents[cache_key] + + # 动态更新 LLM 配置和系统设置(如果参数有变化) + update_agent_llm(agent, model_name, api_key, model_server, generate_cfg) + + logger.info(f"复用现有的助手实例缓存: {cache_key} (bot_id: {bot_id})") + return agent - # 动态更新 LLM 配置和系统设置(如果参数有变化) - update_agent_llm(agent, model_name, api_key, model_server, generate_cfg) + # 清理过期实例 + self._cleanup_old_agents() - logger.info(f"复用现有的助手实例缓存: {cache_key} (bot_id: {bot_id}") + # 创建新的助手实例,预加载文件 + logger.info(f"创建新的助手实例缓存: {cache_key}, bot_id: {bot_id}") + current_time = time.time() + + agent = init_modified_agent_service_with_files( + model_name=model_name, + api_key=api_key, + model_server=model_server, + generate_cfg=generate_cfg, + system_prompt=final_system_prompt, + mcp=final_mcp_settings + ) + + # 缓存实例 + self.agents[cache_key] = agent + self.unique_ids[cache_key] = bot_id + self.access_times[cache_key] = current_time + self.creation_times[cache_key] = current_time + + # 清理创建锁 + self._creation_locks.pop(cache_key, None) + + logger.info(f"助手实例缓存创建完成: {cache_key}") return agent - - # 清理过期实例 - self._cleanup_old_agents() - - # 创建新的助手实例,预加载文件 - logger.info(f"创建新的助手实例缓存: {cache_key}, bot_id: {bot_id}") - current_time = time.time() - - agent = init_modified_agent_service_with_files( - model_name=model_name, - api_key=api_key, - model_server=model_server, - generate_cfg=generate_cfg, - system_prompt=final_system_prompt, - mcp=final_mcp_settings - ) - - # 缓存实例 - self.agents[cache_key] = agent - self.unique_ids[cache_key] = bot_id - self.access_times[cache_key] = current_time - self.creation_times[cache_key] = current_time - - logger.info(f"助手实例缓存创建完成: {cache_key}") - return agent def get_cache_stats(self) -> Dict: """获取缓存统计信息""" @@ -237,6 +250,7 @@ class FileLoadedAgentManager: del self.unique_ids[cache_key] del self.access_times[cache_key] del self.creation_times[cache_key] + self._creation_locks.pop(cache_key, None) # 清理创建锁 removed_count += 1 logger.info(f"已移除助手实例缓存: {cache_key} (unique_id: {unique_id})") except KeyError: diff --git a/utils/prompt_loader.py b/utils/prompt_loader.py index b9c0e93..78801e6 100644 --- a/utils/prompt_loader.py +++ b/utils/prompt_loader.py @@ -4,6 +4,7 @@ System prompt and MCP settings loader utilities """ import os import json +import asyncio from typing import List, Dict, Optional, Any @@ -35,7 +36,22 @@ def safe_replace(text: str, placeholder: str, value: Any) -> str: return text.replace(placeholder, replacement) -def load_system_prompt(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="", user_identifier: str = "") -> str: +async def load_system_prompt_async(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="", user_identifier: str = "") -> str: + """异步版本的系统prompt加载 + + Args: + project_dir: 项目目录路径,可以为None + language: 语言代码,如 'zh', 'en', 'jp' 等 + system_prompt: 可选的系统提示词,优先级高于项目配置 + robot_type: 机器人类型,取值 agent/catalog_agent + bot_id: 机器人ID + user_identifier: 用户标识符 + + Returns: + str: 加载到的系统提示词内容 + """ + from .config_cache import config_cache + # 获取语言显示名称 language_display_map = { 'zh': '中文', @@ -54,38 +70,29 @@ def load_system_prompt(project_dir: str, language: str = None, system_prompt: st return prompt or "" elif robot_type == "agent" or robot_type == "catalog_agent": """ - 优先使用项目目录的system_prompt_catalog_agent.md,没有才使用默认的system_prompt_default.md - - Args: - project_dir: 项目目录路径,可以为None - language: 语言代码,如 'zh', 'en', 'jp' 等(此参数将被忽略) - system_prompt: 可选的系统提示词,优先级高于项目配置 - robot_type: 机器人类型,取值 AGENT/CATALOG_AGENT - - Returns: - str: 加载到的系统提示词内容,如果都未找到则返回空字符串 + 优先使用项目目录的README.md,没有才使用默认的system_prompt_{robot_type}.md """ system_prompt_default = None try: + # 使用缓存读取默认prompt文件 default_prompt_file = os.path.join("prompt", f"system_prompt_{robot_type}.md") - with open(default_prompt_file, 'r', encoding='utf-8') as f: - system_prompt_default = f.read() - print(f"Using default system prompt for catalog_agent from prompt folder") + system_prompt_default = await config_cache.get_text_file(default_prompt_file) + if system_prompt_default: + print(f"Using cached default system prompt for {robot_type} from prompt folder") except Exception as e: - print(f"Failed to load default system prompt for catalog_agent: {str(e)}") + print(f"Failed to load default system prompt for {robot_type}: {str(e)}") system_prompt_default = None readme = "" # 只有当 project_dir 不为 None 时才尝试读取 README.md if project_dir is not None: readme_path = os.path.join(project_dir, "README.md") - if os.path.exists(readme_path): - with open(readme_path, "r", encoding="utf-8") as f: - readme = f.read().strip() - system_prompt_default = safe_replace(system_prompt_default, "{readme}", str(readme)) + readme = await config_cache.get_text_file(readme_path) or "" + if system_prompt_default: + system_prompt_default = safe_replace(system_prompt_default, "{readme}", str(readme)) - prompt = system_prompt_default + prompt = system_prompt_default or "" prompt = safe_replace(prompt, "{language}", language_display) prompt = safe_replace(prompt, "{extra_prompt}", system_prompt or "") prompt = safe_replace(prompt, '{bot_id}', bot_id) @@ -99,6 +106,25 @@ def load_system_prompt(project_dir: str, language: str = None, system_prompt: st return prompt or "" +def load_system_prompt(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="", user_identifier: str = "") -> str: + """同步版本的系统prompt加载,内部调用异步版本以保持向后兼容""" + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete( + load_system_prompt_async(project_dir, language, system_prompt, robot_type, bot_id, user_identifier) + ) + finally: + if loop.is_running(): + pass + else: + loop.close() + + def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id: str) -> List[Dict]: """ @@ -128,10 +154,8 @@ def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id: return replace_placeholders_in_obj(mcp_settings) -def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]: - - """ - 始终读取默认MCP设置,然后与传入的mcp_settings合并,合并方式为合并[0].mcpServers对象 +async def load_mcp_settings_async(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]: + """异步版本的MCP设置加载 Args: project_dir: 项目目录路径 @@ -146,14 +170,16 @@ def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", 支持在 mcp_settings.json 的 args 中使用 {dataset_dir} 占位符, 会在 init_modified_agent_service_with_files 中被替换为实际的路径。 """ + from .config_cache import config_cache + # 1. 首先读取默认MCP设置 default_mcp_settings = [] try: + # 使用缓存读取默认MCP设置文件 default_mcp_file = os.path.join("mcp", f"mcp_settings_{robot_type}.json") - if os.path.exists(default_mcp_file): - with open(default_mcp_file, 'r', encoding='utf-8') as f: - default_mcp_settings = json.load(f) - print(f"Loaded default mcp_settings_{robot_type} from mcp folder") + default_mcp_settings = await config_cache.get_json_file(default_mcp_file) or [] + if default_mcp_settings: + print(f"Using cached default mcp_settings_{robot_type} from mcp folder") else: print(f"No default mcp_settings_{robot_type} found, using empty default settings") except Exception as e: @@ -217,3 +243,22 @@ def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", merged_settings = replace_mcp_placeholders(merged_settings, dataset_dir, bot_id) return merged_settings + +def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]: + """同步版本的MCP设置加载,内部调用异步版本以保持向后兼容""" + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + return loop.run_until_complete( + load_mcp_settings_async(project_dir, mcp_settings, bot_id, robot_type) + ) + finally: + if loop.is_running(): + pass + else: + loop.close() +