agent加锁

This commit is contained in:
朱潮 2025-11-16 10:47:08 +08:00
parent f62dbf0484
commit 1a19d6d3db
4 changed files with 230 additions and 65 deletions

View File

@ -37,6 +37,12 @@ from .file_loaded_agent_manager import (
init_global_agent_manager init_global_agent_manager
) )
# Import config cache module
from .config_cache import (
config_cache,
ConfigFileCache
)
from .agent_pool import ( from .agent_pool import (
AgentPool, AgentPool,
get_agent_pool, get_agent_pool,

100
utils/config_cache.py Normal file
View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
配置文件缓存模块 - 提供异步文件读取缓存功能
用于优化并发请求时的文件I/O性能
"""
import asyncio
import os
import json
from typing import Dict, Tuple, Optional, Any
from utils.logger import logger
class ConfigFileCache:
"""配置文件缓存类
提供基于文件修改时间的缓存机制避免重复读取未修改的文件
"""
def __init__(self):
self._cache: Dict[str, Tuple[Any, float]] = {} # {file_path: (content, mtime)}
self._lock = asyncio.Lock()
async def get_text_file(self, file_path: str) -> Optional[str]:
"""获取文本文件内容,带缓存机制
Args:
file_path: 文件路径
Returns:
文件内容字符串如果文件不存在或读取失败返回None
"""
async with self._lock:
if not os.path.exists(file_path):
return None
current_mtime = os.path.getmtime(file_path)
# 检查缓存是否有效
if file_path in self._cache:
cached_content, cached_mtime = self._cache[file_path]
if current_mtime == cached_mtime:
logger.debug(f"使用缓存文件: {file_path}")
return cached_content
# 读取文件并更新缓存
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
self._cache[file_path] = (content, current_mtime)
logger.debug(f"缓存文件: {file_path}")
return content
except Exception as e:
logger.error(f"读取文本文件失败 {file_path}: {e}")
return None
async def get_json_file(self, file_path: str) -> Optional[Dict]:
"""获取JSON文件内容带缓存机制
Args:
file_path: JSON文件路径
Returns:
解析后的字典如果文件不存在读取失败或JSON解析失败返回None
"""
content = await self.get_text_file(file_path)
if content:
try:
return json.loads(content)
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败 {file_path}: {e}")
return None
def clear_cache(self, file_path: str = None):
"""清理缓存
Args:
file_path: 要清理的文件路径如果为None则清理所有缓存
"""
if file_path:
self._cache.pop(file_path, None)
logger.debug(f"清理文件缓存: {file_path}")
else:
cleared_count = len(self._cache)
self._cache.clear()
logger.debug(f"清理所有缓存,共{cleared_count}个文件")
def get_cache_stats(self) -> Dict:
"""获取缓存统计信息
Returns:
包含缓存统计信息的字典
"""
return {
"cached_files": len(self._cache),
"cached_paths": list(self._cache.keys())
}
# 全局缓存实例
config_cache = ConfigFileCache()

View File

@ -17,13 +17,14 @@
import hashlib import hashlib
import time import time
import json import json
import asyncio
from typing import Dict, List, Optional from typing import Dict, List, Optional
from qwen_agent.agents import Assistant from qwen_agent.agents import Assistant
from qwen_agent.log import logger from qwen_agent.log import logger
from modified_assistant import init_modified_agent_service_with_files, update_agent_llm from modified_assistant import init_modified_agent_service_with_files, update_agent_llm
from .prompt_loader import load_system_prompt, load_mcp_settings from .prompt_loader import load_system_prompt_async, load_mcp_settings_async
class FileLoadedAgentManager: class FileLoadedAgentManager:
@ -38,6 +39,7 @@ class FileLoadedAgentManager:
self.access_times: Dict[str, float] = {} # LRU 访问时间管理 self.access_times: Dict[str, float] = {} # LRU 访问时间管理
self.creation_times: Dict[str, float] = {} # 创建时间记录 self.creation_times: Dict[str, float] = {} # 创建时间记录
self.max_cached_agents = max_cached_agents self.max_cached_agents = max_cached_agents
self._creation_locks: Dict[str, asyncio.Lock] = {} # 防止并发创建相同agent的锁
def _get_cache_key(self, bot_id: str, model_name: str = None, api_key: str = None, def _get_cache_key(self, bot_id: str, model_name: str = None, api_key: str = None,
model_server: str = None, generate_cfg: Dict = None, model_server: str = None, generate_cfg: Dict = None,
@ -116,7 +118,7 @@ class FileLoadedAgentManager:
Args: Args:
bot_id: 项目的唯一标识符 bot_id: 项目的唯一标识符
project_dir: 项目目录路径用于读取system_prompt.md和mcp_settings.json可以为None project_dir: 项目目录路径用于读取README.md可以为None
model_name: 模型名称 model_name: 模型名称
api_key: API 密钥 api_key: API 密钥
model_server: 模型服务器地址 model_server: 模型服务器地址
@ -125,20 +127,28 @@ class FileLoadedAgentManager:
system_prompt: 可选的系统提示词优先级高于项目配置 system_prompt: 可选的系统提示词优先级高于项目配置
mcp_settings: 可选的MCP设置优先级高于项目配置 mcp_settings: 可选的MCP设置优先级高于项目配置
robot_type: 机器人类型取值 agent/catalog_agent robot_type: 机器人类型取值 agent/catalog_agent
user_identifier: 用户标识符
Returns: Returns:
Assistant: 配置好的助手实例 Assistant: 配置好的助手实例
""" """
import os import os
# 实现参数优先级逻辑:传入参数 > 项目配置 > 默认配置 # 使用异步加载配置文件(带缓存)
final_system_prompt = load_system_prompt(project_dir, language, system_prompt, robot_type, bot_id, user_identifier) final_system_prompt = await load_system_prompt_async(
final_mcp_settings = load_mcp_settings(project_dir, mcp_settings, bot_id, robot_type) project_dir, language, system_prompt, robot_type, bot_id, user_identifier
)
final_mcp_settings = await load_mcp_settings_async(
project_dir, mcp_settings, bot_id, robot_type
)
cache_key = self._get_cache_key(bot_id, model_name, api_key, model_server, cache_key = self._get_cache_key(bot_id, model_name, api_key, model_server,
generate_cfg, final_system_prompt, final_mcp_settings) generate_cfg, final_system_prompt, final_mcp_settings)
# 检查是否已存在该助手实例 # 使用异步锁防止并发创建相同的agent
creation_lock = self._creation_locks.setdefault(cache_key, asyncio.Lock())
async with creation_lock:
# 再次检查是否已存在该助手实例(获取锁后可能有其他请求已创建)
if cache_key in self.agents: if cache_key in self.agents:
self._update_access_time(cache_key) self._update_access_time(cache_key)
agent = self.agents[cache_key] agent = self.agents[cache_key]
@ -146,7 +156,7 @@ class FileLoadedAgentManager:
# 动态更新 LLM 配置和系统设置(如果参数有变化) # 动态更新 LLM 配置和系统设置(如果参数有变化)
update_agent_llm(agent, model_name, api_key, model_server, generate_cfg) update_agent_llm(agent, model_name, api_key, model_server, generate_cfg)
logger.info(f"复用现有的助手实例缓存: {cache_key} (bot_id: {bot_id}") logger.info(f"复用现有的助手实例缓存: {cache_key} (bot_id: {bot_id})")
return agent return agent
# 清理过期实例 # 清理过期实例
@ -171,6 +181,9 @@ class FileLoadedAgentManager:
self.access_times[cache_key] = current_time self.access_times[cache_key] = current_time
self.creation_times[cache_key] = current_time self.creation_times[cache_key] = current_time
# 清理创建锁
self._creation_locks.pop(cache_key, None)
logger.info(f"助手实例缓存创建完成: {cache_key}") logger.info(f"助手实例缓存创建完成: {cache_key}")
return agent return agent
@ -237,6 +250,7 @@ class FileLoadedAgentManager:
del self.unique_ids[cache_key] del self.unique_ids[cache_key]
del self.access_times[cache_key] del self.access_times[cache_key]
del self.creation_times[cache_key] del self.creation_times[cache_key]
self._creation_locks.pop(cache_key, None) # 清理创建锁
removed_count += 1 removed_count += 1
logger.info(f"已移除助手实例缓存: {cache_key} (unique_id: {unique_id})") logger.info(f"已移除助手实例缓存: {cache_key} (unique_id: {unique_id})")
except KeyError: except KeyError:

View File

@ -4,6 +4,7 @@ System prompt and MCP settings loader utilities
""" """
import os import os
import json import json
import asyncio
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
@ -35,7 +36,22 @@ def safe_replace(text: str, placeholder: str, value: Any) -> str:
return text.replace(placeholder, replacement) return text.replace(placeholder, replacement)
def load_system_prompt(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="", user_identifier: str = "") -> str: async def load_system_prompt_async(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="", user_identifier: str = "") -> str:
"""异步版本的系统prompt加载
Args:
project_dir: 项目目录路径可以为None
language: 语言代码 'zh', 'en', 'jp'
system_prompt: 可选的系统提示词优先级高于项目配置
robot_type: 机器人类型取值 agent/catalog_agent
bot_id: 机器人ID
user_identifier: 用户标识符
Returns:
str: 加载到的系统提示词内容
"""
from .config_cache import config_cache
# 获取语言显示名称 # 获取语言显示名称
language_display_map = { language_display_map = {
'zh': '中文', 'zh': '中文',
@ -54,38 +70,29 @@ def load_system_prompt(project_dir: str, language: str = None, system_prompt: st
return prompt or "" return prompt or ""
elif robot_type == "agent" or robot_type == "catalog_agent": elif robot_type == "agent" or robot_type == "catalog_agent":
""" """
优先使用项目目录的system_prompt_catalog_agent.md没有才使用默认的system_prompt_default.md 优先使用项目目录的README.md没有才使用默认的system_prompt_{robot_type}.md
Args:
project_dir: 项目目录路径可以为None
language: 语言代码 'zh', 'en', 'jp' 此参数将被忽略
system_prompt: 可选的系统提示词优先级高于项目配置
robot_type: 机器人类型取值 AGENT/CATALOG_AGENT
Returns:
str: 加载到的系统提示词内容如果都未找到则返回空字符串
""" """
system_prompt_default = None system_prompt_default = None
try: try:
# 使用缓存读取默认prompt文件
default_prompt_file = os.path.join("prompt", f"system_prompt_{robot_type}.md") default_prompt_file = os.path.join("prompt", f"system_prompt_{robot_type}.md")
with open(default_prompt_file, 'r', encoding='utf-8') as f: system_prompt_default = await config_cache.get_text_file(default_prompt_file)
system_prompt_default = f.read() if system_prompt_default:
print(f"Using default system prompt for catalog_agent from prompt folder") print(f"Using cached default system prompt for {robot_type} from prompt folder")
except Exception as e: except Exception as e:
print(f"Failed to load default system prompt for catalog_agent: {str(e)}") print(f"Failed to load default system prompt for {robot_type}: {str(e)}")
system_prompt_default = None system_prompt_default = None
readme = "" readme = ""
# 只有当 project_dir 不为 None 时才尝试读取 README.md # 只有当 project_dir 不为 None 时才尝试读取 README.md
if project_dir is not None: if project_dir is not None:
readme_path = os.path.join(project_dir, "README.md") readme_path = os.path.join(project_dir, "README.md")
if os.path.exists(readme_path): readme = await config_cache.get_text_file(readme_path) or ""
with open(readme_path, "r", encoding="utf-8") as f: if system_prompt_default:
readme = f.read().strip()
system_prompt_default = safe_replace(system_prompt_default, "{readme}", str(readme)) system_prompt_default = safe_replace(system_prompt_default, "{readme}", str(readme))
prompt = system_prompt_default prompt = system_prompt_default or ""
prompt = safe_replace(prompt, "{language}", language_display) prompt = safe_replace(prompt, "{language}", language_display)
prompt = safe_replace(prompt, "{extra_prompt}", system_prompt or "") prompt = safe_replace(prompt, "{extra_prompt}", system_prompt or "")
prompt = safe_replace(prompt, '{bot_id}', bot_id) prompt = safe_replace(prompt, '{bot_id}', bot_id)
@ -99,6 +106,25 @@ def load_system_prompt(project_dir: str, language: str = None, system_prompt: st
return prompt or "" return prompt or ""
def load_system_prompt(project_dir: str, language: str = None, system_prompt: str=None, robot_type: str = "agent", bot_id: str="", user_identifier: str = "") -> str:
"""同步版本的系统prompt加载内部调用异步版本以保持向后兼容"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
load_system_prompt_async(project_dir, language, system_prompt, robot_type, bot_id, user_identifier)
)
finally:
if loop.is_running():
pass
else:
loop.close()
def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id: str) -> List[Dict]: def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id: str) -> List[Dict]:
""" """
@ -128,10 +154,8 @@ def replace_mcp_placeholders(mcp_settings: List[Dict], dataset_dir: str, bot_id:
return replace_placeholders_in_obj(mcp_settings) return replace_placeholders_in_obj(mcp_settings)
def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]: async def load_mcp_settings_async(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]:
"""异步版本的MCP设置加载
"""
始终读取默认MCP设置然后与传入的mcp_settings合并合并方式为合并[0].mcpServers对象
Args: Args:
project_dir: 项目目录路径 project_dir: 项目目录路径
@ -146,14 +170,16 @@ def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="",
支持在 mcp_settings.json args 中使用 {dataset_dir} 占位符 支持在 mcp_settings.json args 中使用 {dataset_dir} 占位符
会在 init_modified_agent_service_with_files 中被替换为实际的路径 会在 init_modified_agent_service_with_files 中被替换为实际的路径
""" """
from .config_cache import config_cache
# 1. 首先读取默认MCP设置 # 1. 首先读取默认MCP设置
default_mcp_settings = [] default_mcp_settings = []
try: try:
# 使用缓存读取默认MCP设置文件
default_mcp_file = os.path.join("mcp", f"mcp_settings_{robot_type}.json") default_mcp_file = os.path.join("mcp", f"mcp_settings_{robot_type}.json")
if os.path.exists(default_mcp_file): default_mcp_settings = await config_cache.get_json_file(default_mcp_file) or []
with open(default_mcp_file, 'r', encoding='utf-8') as f: if default_mcp_settings:
default_mcp_settings = json.load(f) print(f"Using cached default mcp_settings_{robot_type} from mcp folder")
print(f"Loaded default mcp_settings_{robot_type} from mcp folder")
else: else:
print(f"No default mcp_settings_{robot_type} found, using empty default settings") print(f"No default mcp_settings_{robot_type} found, using empty default settings")
except Exception as e: except Exception as e:
@ -217,3 +243,22 @@ def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="",
merged_settings = replace_mcp_placeholders(merged_settings, dataset_dir, bot_id) merged_settings = replace_mcp_placeholders(merged_settings, dataset_dir, bot_id)
return merged_settings return merged_settings
def load_mcp_settings(project_dir: str, mcp_settings: list=None, bot_id: str="", robot_type: str = "agent") -> List[Dict]:
"""同步版本的MCP设置加载内部调用异步版本以保持向后兼容"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
load_mcp_settings_async(project_dir, mcp_settings, bot_id, robot_type)
)
finally:
if loop.is_running():
pass
else:
loop.close()