223 lines
8.5 KiB
Python
223 lines
8.5 KiB
Python
# Copyright 2023
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
|
||
"""文件预加载助手管理器 - 管理基于unique_id的助手实例缓存"""
|
||
|
||
import hashlib
|
||
import time
|
||
import json
|
||
from typing import Dict, List, Optional
|
||
|
||
from qwen_agent.agents import Assistant
|
||
from qwen_agent.log import logger
|
||
|
||
from modified_assistant import init_modified_agent_service_with_files, update_agent_llm
|
||
from .prompt_loader import load_system_prompt, load_mcp_settings
|
||
|
||
|
||
class FileLoadedAgentManager:
|
||
"""文件预加载助手管理器
|
||
|
||
基于 unique_id 缓存助手实例,避免重复创建和文件解析
|
||
"""
|
||
|
||
def __init__(self, max_cached_agents: int = 20):
|
||
self.agents: Dict[str, Assistant] = {} # {cache_key: assistant_instance}
|
||
self.unique_ids: Dict[str, str] = {} # {cache_key: unique_id}
|
||
self.access_times: Dict[str, float] = {} # LRU 访问时间管理
|
||
self.creation_times: Dict[str, float] = {} # 创建时间记录
|
||
self.max_cached_agents = max_cached_agents
|
||
|
||
def _get_cache_key(self, unique_id: str) -> str:
|
||
"""获取 unique_id 的哈希值作为缓存键"""
|
||
return hashlib.md5(unique_id.encode('utf-8')).hexdigest()[:16]
|
||
|
||
def _update_access_time(self, cache_key: str):
|
||
"""更新访问时间(LRU 管理)"""
|
||
self.access_times[cache_key] = time.time()
|
||
|
||
def _cleanup_old_agents(self):
|
||
"""清理旧的助手实例,基于 LRU 策略"""
|
||
if len(self.agents) <= self.max_cached_agents:
|
||
return
|
||
|
||
# 按 LRU 顺序排序,删除最久未访问的实例
|
||
sorted_keys = sorted(self.access_times.keys(), key=lambda k: self.access_times[k])
|
||
|
||
keys_to_remove = sorted_keys[:-self.max_cached_agents]
|
||
removed_count = 0
|
||
|
||
for cache_key in keys_to_remove:
|
||
try:
|
||
del self.agents[cache_key]
|
||
del self.unique_ids[cache_key]
|
||
del self.access_times[cache_key]
|
||
del self.creation_times[cache_key]
|
||
removed_count += 1
|
||
logger.info(f"清理过期的助手实例缓存: {cache_key}")
|
||
except KeyError:
|
||
continue
|
||
|
||
if removed_count > 0:
|
||
logger.info(f"已清理 {removed_count} 个过期的助手实例缓存")
|
||
|
||
async def get_or_create_agent(self,
|
||
unique_id: str,
|
||
project_dir: str,
|
||
model_name: str = "qwen3-next",
|
||
api_key: Optional[str] = None,
|
||
model_server: Optional[str] = None,
|
||
generate_cfg: Optional[Dict] = None,
|
||
language: Optional[str] = None,
|
||
system_prompt: Optional[str] = None,
|
||
mcp_settings: Optional[List[Dict]] = None,
|
||
robot_type: Optional[str] = "AGENT") -> Assistant:
|
||
"""获取或创建文件预加载的助手实例
|
||
|
||
Args:
|
||
unique_id: 项目的唯一标识符
|
||
project_dir: 项目目录路径,用于读取system_prompt.md和mcp_settings.json
|
||
model_name: 模型名称
|
||
api_key: API 密钥
|
||
model_server: 模型服务器地址
|
||
generate_cfg: 生成配置
|
||
language: 语言代码,用于选择对应的系统提示词
|
||
system_prompt: 可选的系统提示词,优先级高于项目配置
|
||
mcp_settings: 可选的MCP设置,优先级高于项目配置
|
||
robot_type: 机器人类型,取值 AGENT/CATALOG_AGENT
|
||
|
||
Returns:
|
||
Assistant: 配置好的助手实例
|
||
"""
|
||
import os
|
||
|
||
# 实现参数优先级逻辑:传入参数 > 项目配置 > 默认配置
|
||
final_system_prompt = load_system_prompt(project_dir, language, system_prompt, robot_type)
|
||
final_mcp_settings = load_mcp_settings(project_dir, mcp_settings)
|
||
|
||
cache_key = self._get_cache_key(unique_id)
|
||
|
||
# 检查是否已存在该助手实例
|
||
if cache_key in self.agents:
|
||
self._update_access_time(cache_key)
|
||
agent = self.agents[cache_key]
|
||
|
||
# 动态更新 LLM 配置和系统设置(如果参数有变化)
|
||
update_agent_llm(agent, model_name, api_key, model_server, generate_cfg, final_system_prompt, mcp_settings)
|
||
|
||
logger.info(f"复用现有的助手实例缓存: {cache_key} (unique_id: {unique_id}")
|
||
return agent
|
||
|
||
# 清理过期实例
|
||
self._cleanup_old_agents()
|
||
|
||
# 创建新的助手实例,预加载文件
|
||
logger.info(f"创建新的助手实例缓存: {cache_key}, unique_id: {unique_id}")
|
||
current_time = time.time()
|
||
|
||
agent = init_modified_agent_service_with_files(
|
||
model_name=model_name,
|
||
api_key=api_key,
|
||
model_server=model_server,
|
||
generate_cfg=generate_cfg,
|
||
system_prompt=final_system_prompt,
|
||
mcp=final_mcp_settings
|
||
)
|
||
|
||
# 缓存实例
|
||
self.agents[cache_key] = agent
|
||
self.unique_ids[cache_key] = unique_id
|
||
self.access_times[cache_key] = current_time
|
||
self.creation_times[cache_key] = current_time
|
||
|
||
logger.info(f"助手实例缓存创建完成: {cache_key}")
|
||
return agent
|
||
|
||
def get_cache_stats(self) -> Dict:
|
||
"""获取缓存统计信息"""
|
||
current_time = time.time()
|
||
stats = {
|
||
"total_cached_agents": len(self.agents),
|
||
"max_cached_agents": self.max_cached_agents,
|
||
"agents": {}
|
||
}
|
||
|
||
for cache_key, agent in self.agents.items():
|
||
stats["agents"][cache_key] = {
|
||
"unique_id": self.unique_ids.get(cache_key, "unknown"),
|
||
"created_at": self.creation_times.get(cache_key, 0),
|
||
"last_accessed": self.access_times.get(cache_key, 0),
|
||
"age_seconds": int(current_time - self.creation_times.get(cache_key, current_time)),
|
||
"idle_seconds": int(current_time - self.access_times.get(cache_key, current_time))
|
||
}
|
||
|
||
return stats
|
||
|
||
def clear_cache(self) -> int:
|
||
"""清空所有缓存
|
||
|
||
Returns:
|
||
int: 清理的实例数量
|
||
"""
|
||
cache_count = len(self.agents)
|
||
|
||
self.agents.clear()
|
||
self.unique_ids.clear()
|
||
self.access_times.clear()
|
||
self.creation_times.clear()
|
||
|
||
logger.info(f"已清空所有助手实例缓存,共清理 {cache_count} 个实例")
|
||
return cache_count
|
||
|
||
def remove_cache_by_unique_id(self, unique_id: str) -> bool:
|
||
"""根据 unique_id 移除特定的缓存
|
||
|
||
Args:
|
||
unique_id: 项目的唯一标识符
|
||
|
||
Returns:
|
||
bool: 是否成功移除
|
||
"""
|
||
cache_key = self._get_cache_key(unique_id)
|
||
|
||
if cache_key in self.agents:
|
||
del self.agents[cache_key]
|
||
del self.unique_ids[cache_key]
|
||
del self.access_times[cache_key]
|
||
del self.creation_times[cache_key]
|
||
|
||
logger.info(f"已移除特定 unique_id 的助手实例缓存: {unique_id}")
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
# 全局文件预加载助手管理器实例
|
||
_global_agent_manager: Optional[FileLoadedAgentManager] = None
|
||
|
||
|
||
def get_global_agent_manager() -> FileLoadedAgentManager:
|
||
"""获取全局文件预加载助手管理器实例"""
|
||
global _global_agent_manager
|
||
if _global_agent_manager is None:
|
||
_global_agent_manager = FileLoadedAgentManager()
|
||
return _global_agent_manager
|
||
|
||
|
||
def init_global_agent_manager(max_cached_agents: int = 20):
|
||
"""初始化全局文件预加载助手管理器"""
|
||
global _global_agent_manager
|
||
_global_agent_manager = FileLoadedAgentManager(max_cached_agents)
|
||
return _global_agent_manager
|