qwen_agent/agent/file_loaded_agent_manager.py
2025-11-27 21:50:03 +08:00

286 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright 2023
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""文件预加载助手管理器 - 管理基于unique_id的助手实例缓存"""
import hashlib
import time
import json
import asyncio
from typing import Dict, List, Optional
from qwen_agent.agents import Assistant
import logging
logger = logging.getLogger('app')
from agent.modified_assistant import init_modified_agent_service_with_files, update_agent_llm
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
class FileLoadedAgentManager:
"""文件预加载助手管理器
基于 unique_id 缓存助手实例,避免重复创建和文件解析
"""
def __init__(self, max_cached_agents: int = 20):
self.agents: Dict[str, Assistant] = {} # {cache_key: assistant_instance}
self.unique_ids: Dict[str, str] = {} # {cache_key: unique_id}
self.access_times: Dict[str, float] = {} # LRU 访问时间管理
self.creation_times: Dict[str, float] = {} # 创建时间记录
self.max_cached_agents = max_cached_agents
self._creation_locks: Dict[str, asyncio.Lock] = {} # 防止并发创建相同agent的锁
def _get_cache_key(self, bot_id: str, model_name: str = None, api_key: str = None,
model_server: str = None, generate_cfg: Dict = None,
system_prompt: str = None, mcp_settings: List[Dict] = None) -> str:
"""获取包含所有相关参数的哈希值作为缓存键
Args:
bot_id: 机器人项目ID
model_name: 模型名称
api_key: API密钥
model_server: 模型服务器地址
generate_cfg: 生成配置
system_prompt: 系统提示词
mcp_settings: MCP设置列表
Returns:
str: 缓存键的哈希值
"""
# 构建包含所有相关参数的字符串
cache_data = {
'bot_id': bot_id,
'model_name': model_name or '',
'api_key': api_key or '',
'model_server': model_server or '',
'generate_cfg': json.dumps(generate_cfg or {}, sort_keys=True),
'system_prompt': system_prompt or '',
'mcp_settings': json.dumps(mcp_settings or [], sort_keys=True)
}
# 将字典转换为JSON字符串并计算哈希值
cache_str = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_str.encode('utf-8')).hexdigest()[:16]
def _update_access_time(self, cache_key: str):
"""更新访问时间LRU 管理)"""
self.access_times[cache_key] = time.time()
def _cleanup_old_agents(self):
"""清理旧的助手实例,基于 LRU 策略"""
if len(self.agents) <= self.max_cached_agents:
return
# 按 LRU 顺序排序,删除最久未访问的实例
sorted_keys = sorted(self.access_times.keys(), key=lambda k: self.access_times[k])
keys_to_remove = sorted_keys[:-self.max_cached_agents]
removed_count = 0
for cache_key in keys_to_remove:
try:
del self.agents[cache_key]
del self.unique_ids[cache_key]
del self.access_times[cache_key]
del self.creation_times[cache_key]
removed_count += 1
logger.info(f"清理过期的助手实例缓存: {cache_key}")
except KeyError:
continue
if removed_count > 0:
logger.info(f"已清理 {removed_count} 个过期的助手实例缓存")
async def get_or_create_agent(self,
bot_id: str,
project_dir: Optional[str],
model_name: str = "qwen3-next",
api_key: Optional[str] = None,
model_server: Optional[str] = None,
generate_cfg: Optional[Dict] = None,
language: Optional[str] = None,
system_prompt: Optional[str] = None,
mcp_settings: Optional[List[Dict]] = None,
robot_type: Optional[str] = "general_agent",
user_identifier: Optional[str] = None) -> Assistant:
"""获取或创建文件预加载的助手实例
Args:
bot_id: 项目的唯一标识符
project_dir: 项目目录路径用于读取README.md可以为None
model_name: 模型名称
api_key: API 密钥
model_server: 模型服务器地址
generate_cfg: 生成配置
language: 语言代码,用于选择对应的系统提示词
system_prompt: 可选的系统提示词,优先级高于项目配置
mcp_settings: 可选的MCP设置优先级高于项目配置
robot_type: 机器人类型,取值 agent/catalog_agent
user_identifier: 用户标识符
Returns:
Assistant: 配置好的助手实例
"""
import os
# 使用异步加载配置文件(带缓存)
final_system_prompt = await load_system_prompt_async(
project_dir, language, system_prompt, robot_type, bot_id, user_identifier
)
final_mcp_settings = await load_mcp_settings_async(
project_dir, mcp_settings, bot_id, robot_type
)
cache_key = self._get_cache_key(bot_id, model_name, api_key, model_server,
generate_cfg, final_system_prompt, final_mcp_settings)
# 使用异步锁防止并发创建相同的agent
creation_lock = self._creation_locks.setdefault(cache_key, asyncio.Lock())
async with creation_lock:
# 再次检查是否已存在该助手实例(获取锁后可能有其他请求已创建)
if cache_key in self.agents:
self._update_access_time(cache_key)
agent = self.agents[cache_key]
# 动态更新 LLM 配置和系统设置(如果参数有变化)
update_agent_llm(agent, model_name, api_key, model_server, generate_cfg)
logger.info(f"复用现有的助手实例缓存: {cache_key} (bot_id: {bot_id})")
return agent
# 清理过期实例
self._cleanup_old_agents()
# 创建新的助手实例,预加载文件
logger.info(f"创建新的助手实例缓存: {cache_key}, bot_id: {bot_id}")
current_time = time.time()
agent = init_modified_agent_service_with_files(
model_name=model_name,
api_key=api_key,
model_server=model_server,
generate_cfg=generate_cfg,
system_prompt=final_system_prompt,
mcp=final_mcp_settings
)
# 缓存实例
self.agents[cache_key] = agent
self.unique_ids[cache_key] = bot_id
self.access_times[cache_key] = current_time
self.creation_times[cache_key] = current_time
# 清理创建锁
self._creation_locks.pop(cache_key, None)
logger.info(f"助手实例缓存创建完成: {cache_key}")
return agent
def get_cache_stats(self) -> Dict:
"""获取缓存统计信息"""
current_time = time.time()
stats = {
"total_cached_agents": len(self.agents),
"max_cached_agents": self.max_cached_agents,
"agents": {}
}
for cache_key, agent in self.agents.items():
stats["agents"][cache_key] = {
"unique_id": self.unique_ids.get(cache_key, "unknown"),
"created_at": self.creation_times.get(cache_key, 0),
"last_accessed": self.access_times.get(cache_key, 0),
"age_seconds": int(current_time - self.creation_times.get(cache_key, current_time)),
"idle_seconds": int(current_time - self.access_times.get(cache_key, current_time))
}
return stats
def clear_cache(self) -> int:
"""清空所有缓存
Returns:
int: 清理的实例数量
"""
cache_count = len(self.agents)
self.agents.clear()
self.unique_ids.clear()
self.access_times.clear()
self.creation_times.clear()
logger.info(f"已清空所有助手实例缓存,共清理 {cache_count} 个实例")
return cache_count
def remove_cache_by_unique_id(self, unique_id: str) -> int:
"""根据 unique_id 移除所有相关的缓存
由于缓存key现在包含 system_prompt 和 mcp_settings
一个 unique_id 可能对应多个缓存实例。
Args:
unique_id: 项目的唯一标识符
Returns:
int: 成功移除的实例数量
"""
keys_to_remove = []
# 找到所有匹配的 unique_id 的缓存键
for cache_key, stored_unique_id in self.unique_ids.items():
if stored_unique_id == unique_id:
keys_to_remove.append(cache_key)
# 移除找到的缓存
removed_count = 0
for cache_key in keys_to_remove:
try:
del self.agents[cache_key]
del self.unique_ids[cache_key]
del self.access_times[cache_key]
del self.creation_times[cache_key]
self._creation_locks.pop(cache_key, None) # 清理创建锁
removed_count += 1
logger.info(f"已移除助手实例缓存: {cache_key} (unique_id: {unique_id})")
except KeyError:
continue
if removed_count > 0:
logger.info(f"已移除 unique_id={unique_id}{removed_count} 个助手实例缓存")
else:
logger.warning(f"未找到 unique_id={unique_id} 的缓存实例")
return removed_count
# 全局文件预加载助手管理器实例
_global_agent_manager: Optional[FileLoadedAgentManager] = None
def get_global_agent_manager() -> FileLoadedAgentManager:
"""获取全局文件预加载助手管理器实例"""
global _global_agent_manager
if _global_agent_manager is None:
_global_agent_manager = FileLoadedAgentManager()
return _global_agent_manager
def init_global_agent_manager(max_cached_agents: int = 20):
"""初始化全局文件预加载助手管理器"""
global _global_agent_manager
_global_agent_manager = FileLoadedAgentManager(max_cached_agents)
return _global_agent_manager