diff --git a/agent/agent_config.py b/agent/agent_config.py index f3704ff..d2e111e 100644 --- a/agent/agent_config.py +++ b/agent/agent_config.py @@ -4,6 +4,7 @@ from typing import Optional, List, Dict, Any, TYPE_CHECKING from dataclasses import dataclass, field import logging import json +import hashlib logger = logging.getLogger('app') @@ -141,4 +142,36 @@ class AgentConfig: config["callbacks"] = [self.logging_handler] if self.session_id: config["configurable"] = {"thread_id": self.session_id} - return config \ No newline at end of file + return config + + def get_unique_cache_id(self) -> Optional[str]: + """ + 生成唯一的缓存键 + + 基于session_id, bot_id, system_prompt, mcp_settings生成唯一的缓存键。 + 如果没有session_id,返回None,表示不使用缓存。 + + Returns: + str: 唯一的缓存键,如果没有session_id则返回None + """ + # 创建用于生成哈希的数据 + cache_components = { + 'bot_id': self.bot_id, + 'system_prompt': self.system_prompt, + 'mcp_settings': self.mcp_settings, + 'model_name': self.model_name, + 'language': self.language, + 'generate_cfg': self.generate_cfg, + 'enable_thinking': self.enable_thinking, + 'user_identifier': self.user_identifier, + 'session_id': self.session_id, + } + + # 将组件转换为字符串并连接 + cache_string = json.dumps(cache_components, sort_keys=True) + + # 生成SHA256哈希作为缓存键 + cache_hash = hashlib.sha256(cache_string.encode('utf-8')).hexdigest() + + # 返回带有前缀的缓存键,便于调试 + return f"agent_cache_{cache_hash[:16]}" # 使用前16位哈希值 \ No newline at end of file diff --git a/agent/agent_memory_cache.py b/agent/agent_memory_cache.py new file mode 100644 index 0000000..d0797d8 --- /dev/null +++ b/agent/agent_memory_cache.py @@ -0,0 +1,339 @@ +""" +基于内存的 Agent 缓存管理模块 +使用 cachetools 库实现 TTLCache 和 LRUCache +""" +import os +import logging +import time +import threading +from typing import Any, Optional, Dict, Tuple +from collections import OrderedDict +from datetime import datetime, timedelta + +import cachetools + +logger = logging.getLogger('app') + + +class AgentMemoryCacheManager: + """ + 使用 cachetools 实现的内存缓存管理器 + - 基于内存存储,访问速度快 + - 支持自动过期时间(TTL) + - 支持缓存大小限制和 LRU 淘汰策略 + - 支持访问时自动延长过期时间 + - 线程安全(使用 threading.Lock) + """ + + def __init__( + self, + max_size: int = 1000, # 默认最多缓存 1000 个 Agent + default_ttl: int = 180, # 默认 3 分钟过期 + auto_renew: bool = True # 访问时自动延长过期时间 + ): + """ + 初始化内存缓存管理器 + + Args: + max_size: 最大缓存项数 + default_ttl: 默认过期时间(秒) + auto_renew: 是否在访问时自动延长过期时间 + """ + # 使用 TTLCache 实现带过期时间的缓存 + self.cache = cachetools.TTLCache( + maxsize=max_size, + ttl=default_ttl, + timer=time.monotonic + ) + + # 用于存储每个键的过期时间信息(支持自动续期) + self._expire_times: Dict[str, float] = {} + + # 用于存储创建时间 + self._create_times: Dict[str, float] = {} + + # 锁,确保线程安全 + self._lock = threading.RLock() + + self.default_ttl = default_ttl + self.auto_renew = auto_renew + self.max_size = max_size + + # 统计信息 + self._hits = 0 + self._misses = 0 + self._sets = 0 + self._evictions = 0 + + logger.info(f"AgentMemoryCacheManager initialized with max_size: {max_size}, " + f"default_ttl: {default_ttl}s, auto_renew: {auto_renew}") + + def get(self, cache_key: str) -> Optional[Any]: + """ + 获取缓存的 Agent + + Args: + cache_key: 缓存键 + + Returns: + Agent 对象或 None + """ + with self._lock: + current_time = time.monotonic() + + # 首先检查是否过期 + if cache_key in self._expire_times: + if current_time > self._expire_times[cache_key]: + # 已过期,清理 + self._remove_expired(cache_key) + self._misses += 1 + logger.debug(f"Cache miss (expired) for key: {cache_key}") + return None + + # 尝试从缓存获取 + try: + value = self.cache[cache_key] + + # 如果启用自动续期 + if self.auto_renew: + self._expire_times[cache_key] = current_time + self.default_ttl + logger.debug(f"Cache hit and renewed for key: {cache_key}") + else: + logger.debug(f"Cache hit for key: {cache_key}") + + self._hits += 1 + return value + + except KeyError: + self._misses += 1 + logger.debug(f"Cache miss for key: {cache_key}") + return None + + def set(self, cache_key: str, agent: Any, ttl: Optional[int] = None) -> bool: + """ + 缓存 Agent 对象 + + Args: + cache_key: 缓存键 + agent: 要缓存的 Agent 对象 + ttl: 过期时间(秒),如果为 None 则使用默认值 + + Returns: + 是否成功设置缓存 + """ + with self._lock: + try: + if ttl is None: + ttl = self.default_ttl + + current_time = time.monotonic() + expire_time = current_time + ttl + + # 检查是否需要驱逐项 + evicted_key = None + if cache_key not in self.cache and len(self.cache) >= self.max_size: + # cachetools 的 TTLCache 会自动驱逐,但我们要记录 + # 先获取可能被驱逐的键 + oldest_key = next(iter(self.cache)) if self.cache else None + if oldest_key: + evicted_key = oldest_key + + # 设置缓存 + self.cache[cache_key] = agent + self._expire_times[cache_key] = expire_time + self._create_times[cache_key] = current_time + + # 清理被驱逐的项的元数据 + if evicted_key and evicted_key != cache_key: + self._cleanup_metadata(evicted_key) + self._evictions += 1 + logger.debug(f"Evicted cache key: {evicted_key}") + + self._sets += 1 + + logger.info(f"Cached agent for key: {cache_key}, ttl: {ttl}s") + return True + + except Exception as e: + logger.error(f"Error setting cache for key {cache_key}: {e}") + return False + + def delete(self, cache_key: str) -> bool: + """ + 删除特定的缓存项 + + Args: + cache_key: 缓存键 + + Returns: + 是否成功删除 + """ + with self._lock: + try: + # 从缓存中删除 + deleted = cache_key in self.cache + if deleted: + del self.cache[cache_key] + + # 清理元数据 + self._cleanup_metadata(cache_key) + + if deleted: + logger.info(f"Deleted cache for key: {cache_key}") + else: + logger.warning(f"Cache key not found for deletion: {cache_key}") + + return deleted + except Exception as e: + logger.error(f"Error deleting cache for key {cache_key}: {e}") + return False + + def _remove_expired(self, cache_key: str): + """移除过期的缓存项""" + if cache_key in self.cache: + del self.cache[cache_key] + self._cleanup_metadata(cache_key) + + def _cleanup_metadata(self, cache_key: str): + """清理指定键的元数据""" + self._expire_times.pop(cache_key, None) + self._create_times.pop(cache_key, None) + + def clear_all(self) -> bool: + """ + 清空所有缓存 + + Returns: + 是否成功清空 + """ + with self._lock: + try: + count = len(self.cache) + self.cache.clear() + self._expire_times.clear() + self._create_times.clear() + + # 重置统计信息 + self._hits = 0 + self._misses = 0 + self._sets = 0 + self._evictions = 0 + + logger.info(f"Cleared all cache entries, total: {count}") + return True + except Exception as e: + logger.error(f"Error clearing all cache: {e}") + return False + + def get_stats(self) -> Dict[str, Any]: + """ + 获取缓存统计信息 + + Returns: + 包含统计信息的字典 + """ + with self._lock: + total_requests = self._hits + self._misses + hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0 + + return { + "type": "memory", + "total_items": len(self.cache), + "max_size": self.max_size, + "default_ttl": self.default_ttl, + "auto_renew": self.auto_renew, + "hits": self._hits, + "misses": self._misses, + "hit_rate_percent": round(hit_rate, 2), + "sets": self._sets, + "evictions": self._evictions, + "memory_usage_mb": round(self._estimate_memory_usage() / 1024 / 1024, 2) + } + + def _estimate_memory_usage(self) -> int: + """估算内存使用量(字节)""" + # 这是一个粗略的估算 + import sys + total_size = 0 + + # 估算缓存项的大小 + for key, value in self.cache.items(): + total_size += sys.getsizeof(key) + total_size += sys.getsizeof(value) + + # 估算元数据的大小 + total_size += sys.getsizeof(self._expire_times) + total_size += sys.getsizeof(self._create_times) + + return total_size + + def cleanup_old_entries(self, max_age_seconds: int = 3600) -> int: + """ + 清理超过指定时间的所有缓存项 + + Args: + max_age_seconds: 最大存在时间(秒) + + Returns: + 清理的缓存项数量 + """ + with self._lock: + current_time = time.monotonic() + keys_to_delete = [] + + # 查找超过最大时间的项 + for cache_key, create_time in self._create_times.items(): + age_seconds = current_time - create_time + if age_seconds > max_age_seconds: + keys_to_delete.append(cache_key) + + # 删除旧项 + deleted_count = 0 + for key in keys_to_delete: + if self.delete(key): + deleted_count += 1 + + logger.info(f"Cleaned up {deleted_count} old cache entries older than {max_age_seconds}s") + return deleted_count + + def get_keys(self) -> list: + """ + 获取所有缓存键 + + Returns: + 缓存键列表 + """ + with self._lock: + return list(self.cache.keys()) + + def __len__(self) -> int: + """返回缓存中的项数""" + return len(self.cache) + + +# 全局缓存管理器实例 +_global_cache_manager: Optional[AgentMemoryCacheManager] = None + + +def get_memory_cache_manager() -> AgentMemoryCacheManager: + """ + 获取全局内存缓存管理器实例(单例模式) + + Returns: + AgentMemoryCacheManager 实例 + """ + global _global_cache_manager + + if _global_cache_manager is None: + # 从环境变量或使用默认值 + max_size = int(os.getenv("AGENT_CACHE_MAX_SIZE", "20")) + default_ttl = int(os.getenv("AGENT_CACHE_TTL", "180")) + auto_renew = os.getenv("AGENT_CACHE_AUTO_RENEW", "true").lower() == "true" + + _global_cache_manager = AgentMemoryCacheManager( + max_size=max_size, + default_ttl=default_ttl, + auto_renew=auto_renew + ) + + return _global_cache_manager \ No newline at end of file diff --git a/agent/deep_assistant.py b/agent/deep_assistant.py index 2a70cc3..9c6acc1 100644 --- a/agent/deep_assistant.py +++ b/agent/deep_assistant.py @@ -1,8 +1,7 @@ import json import logging -import os -import sqlite3 -from typing import Any, Dict, Optional, List +import time +from typing import Any, Dict from langchain.chat_models import init_chat_model # from deepagents import create_deep_agent from langchain.agents import create_agent @@ -10,15 +9,18 @@ from langchain.agents.middleware import SummarizationMiddleware from langchain_mcp_adapters.client import MultiServerMCPClient from langgraph.checkpoint.memory import MemorySaver from utils.fastapi_utils import detect_provider - -# 全局 MemorySaver 实例 -_global_checkpointer = MemorySaver() - from .guideline_middleware import GuidelineMiddleware from .tool_output_length_middleware import ToolOutputLengthMiddleware from utils.settings import SUMMARIZATION_MAX_TOKENS, TOOL_OUTPUT_MAX_LENGTH from agent.agent_config import AgentConfig from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async +from agent.agent_memory_cache import get_memory_cache_manager + +logger = logging.getLogger('app') + +# 全局 MemorySaver 实例 +_global_checkpointer = MemorySaver() +# 使用内存缓存管理器 # Utility functions def read_system_prompt(): @@ -68,6 +70,25 @@ async def init_agent(config: AgentConfig): config: AgentConfig 对象,包含所有初始化参数 mcp: MCP配置(如果为None则使用配置中的mcp_settings) """ + # 获取缓存管理器 + cache_manager = get_memory_cache_manager() + + # 获取唯一的缓存键 + cache_key = config.get_unique_cache_id() + + # 如果有缓存键,检查缓存 + if cache_key: + # 尝试从缓存中获取 agent + cached_agent = cache_manager.get(cache_key) + if cached_agent is not None: + logger.info(f"Using cached agent for session: {config.session_id}, cache_key: {cache_key}") + return cached_agent + else: + logger.info(f"Cache miss for session: {config.session_id}, cache_key: {cache_key}") + + # 没有缓存或缓存已过期,创建新的 agent + logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}") + final_system_prompt = await load_system_prompt_async( config.project_dir, config.language, config.system_prompt, config.robot_type, config.bot_id, config.user_identifier ) @@ -137,8 +158,10 @@ async def init_agent(config: AgentConfig): checkpointer=checkpointer # 传入 checkpointer 以启用持久化 ) - # 将 checkpointer 作为属性附加到 agent 上,方便访问 - if checkpointer: - agent._checkpointer = checkpointer + # 如果有缓存键,将 agent 加入缓存 + if cache_key: + # 使用 DiskCache 缓存管理器存储 agent + cache_manager.set(cache_key, agent) + logger.info(f"Cached agent for session: {config.session_id}, cache_key: {cache_key}") return agent \ No newline at end of file diff --git a/docs/agent_cache.md b/docs/agent_cache.md new file mode 100644 index 0000000..d888b5d --- /dev/null +++ b/docs/agent_cache.md @@ -0,0 +1,183 @@ +# Agent 缓存系统 + +本文档描述了使用 DiskCache 实现的 Agent 缓存系统。 + +## 概述 + +Agent 缓存系统使用 DiskCache 来持久化缓存 Agent 实例,提供以下功能: + +- ✅ **文件持久化**:缓存存储在磁盘上,重启后仍然保留 +- ✅ **自动过期**:支持设置过期时间,自动清理过期项 +- ✅ **缓存大小限制**:可设置最大缓存大小,自动淘汰旧数据 +- ✅ **访问时自动续期**:每次访问时自动延长过期时间 +- ✅ **线程安全**:支持多线程并发访问 + +## 配置 + +### 环境变量 + +可以通过以下环境变量配置缓存行为: + +- `AGENT_CACHE_DIR`:缓存目录路径(默认:`./projects/agent_cache`) +- `AGENT_CACHE_SIZE_LIMIT`:缓存大小限制,字节(默认:`1073741824` = 1GB) +- `AGENT_CACHE_EXPIRE`:默认过期时间,秒(默认:`180` = 3分钟) + +示例: +```bash +export AGENT_CACHE_DIR="/var/cache/agent" +export AGENT_CACHE_SIZE_LIMIT="5368709120" # 5GB +export AGENT_CACHE_EXPIRE="600" # 10分钟 +``` + +### 代码配置 + +```python +from agent.agent_cache_manager import AgentCacheManager + +# 创建缓存管理器 +cache = AgentCacheManager( + cache_dir="./cache/agent_cache", + size_limit=1024*1024*1024, # 1GB + default_expire=180, # 3分钟 + eviction_policy="least-recently-used" # LRU淘汰策略 +) +``` + +## 使用方法 + +### 1. 缓存 Agent + +缓存系统已集成到 `init_agent` 函数中,会自动根据配置生成缓存键并缓存 Agent。 + +```python +from agent.deep_assistant import init_agent +from agent.agent_config import AgentConfig + +# 创建配置 +config = AgentConfig( + bot_id="your-bot-id", + session_id="user-session-123", + # ... 其他配置 +) + +# 初始化 Agent(会自动使用缓存) +agent = await init_agent(config) +``` + +### 2. 手动使用缓存 + +如果需要在其他地方使用缓存: + +```python +from agent.agent_cache_manager import get_cache_manager + +# 获取全局缓存管理器 +cache = get_cache_manager() + +# 设置缓存 +cache.set("my-key", {"data": "value"}, expire=300) # 5分钟过期 + +# 获取缓存 +value = cache.get("my-key") +if value is not None: + print("缓存命中!") +else: + print("缓存未命中") + +# 删除缓存 +cache.delete("my-key") + +# 获取统计信息 +stats = cache.get_stats() +print(f"缓存大小: {stats['size_mb']} MB") +print(f"缓存项数: {stats['total_items']}") +``` + +## 缓存管理工具 + +项目提供了缓存管理脚本: + +```bash +# 查看缓存统计 +poetry run python scripts/cache_manager.py stats + +# 清理过期项 +poetry run python scripts/cache_manager.py expired + +# 清理超过1小时的项 +poetry run python scripts/cache_manager.py cleanup --max-age 3600 + +# 清空所有缓存 +poetry run python scripts/cache_manager.py clear + +# 列出缓存键 +poetry run python scripts/cache_manager.py list + +# 删除特定缓存键 +poetry run python scripts/cache_manager.py delete my-cache-key +``` + +## 缓存键生成 + +缓存键基于以下配置生成唯一哈希值: + +- `bot_id` +- `system_prompt` +- `mcp_settings` +- `model_name` +- `language` +- `generate_cfg` +- `enable_thinking` +- `user_identifier` +- `session_id` + +如果 `session_id` 为 None,则不使用缓存。 + +## 注意事项 + +1. **性能考虑**: + - 缓存存储在磁盘上,首次访问会有 I/O 开销 + - 建议使用 SSD 存储缓存以提高性能 + - 过期时间不宜设置过短,避免频繁重新创建 Agent + +2. **内存管理**: + - Agent 对象可能占用较大内存 + - 设置合理的缓存大小限制 + - 定期清理不用的缓存项 + +3. **并发安全**: + - DiskCache 支持多进程并发访问 + - 但在高并发场景下可能成为瓶颈 + +4. **调试**: + - 查看日志中的缓存相关信息 + - 使用 `stats` 命令监控缓存使用情况 + - 必要时清空缓存重置状态 + +## 故障排查 + +### 缓存不生效 + +1. 检查 `session_id` 是否设置(必须有 session_id 才会使用缓存) +2. 检查缓存目录是否存在且有写权限 +3. 查看日志中是否有缓存相关错误 + +### 缓存过多 + +1. 检查缓存大小限制设置 +2. 使用 `cleanup` 命令清理旧缓存 +3. 调整默认过期时间 + +### 性能问题 + +1. 监控缓存命中率 +2. 考虑增加缓存大小 +3. 优化 Agent 创建速度 + +## 示例输出 + +``` +INFO: Using cached agent for session: user-123, cache_key: agent_cache_a1b2c3d4 +INFO: Cache stats - Total items: 5, Size: 256.78 MB +INFO: Cached agent for session: user-456, cache_key: agent_cache_e5f6g7h8 +``` \ No newline at end of file diff --git a/docs/agent_memory_cache.md b/docs/agent_memory_cache.md new file mode 100644 index 0000000..9fd5b30 --- /dev/null +++ b/docs/agent_memory_cache.md @@ -0,0 +1,251 @@ +# Agent 内存缓存系统 + +本文档描述了基于 cachetools 实现的 Agent 内存缓存系统。 + +## 概述 + +Agent 内存缓存系统使用 cachetools 库在内存中缓存 Agent 实例,提供以下功能: + +- ✅ **高性能访问**:基于内存存储,访问速度极快(纳秒级) +- ✅ **自动过期**:支持设置 TTL(Time To Live),自动清理过期项 +- ✅ **LRU 淘汰策略**:当缓存满时,自动淘汰最久未使用的项 +- ✅ **访问时自动续期**:每次访问时自动延长过期时间 +- ✅ **线程安全**:使用 RLock 确保多线程并发安全 +- ✅ **统计信息**:提供详细的缓存命中率等统计信息 + +## 配置 + +### 环境变量 + +可以通过以下环境变量配置缓存行为: + +- `AGENT_CACHE_MAX_SIZE`:最大缓存项数(默认:`1000`) +- `AGENT_CACHE_TTL`:默认过期时间,秒(默认:`180` = 3分钟) +- `AGENT_CACHE_AUTO_RENEW`:是否自动续期(默认:`true`) + +示例: +```bash +export AGENT_CACHE_MAX_SIZE="500" +export AGENT_CACHE_TTL="300" # 5分钟 +export AGENT_CACHE_AUTO_RENEW="false" +``` + +### 代码配置 + +```python +from agent.agent_memory_cache import AgentMemoryCacheManager + +# 创建缓存管理器 +cache = AgentMemoryCacheManager( + max_size=1000, # 最多缓存 1000 个 Agent + default_ttl=180, # 3分钟过期 + auto_renew=True # 访问时自动续期 +) +``` + +## 使用方法 + +### 1. 自动缓存(推荐) + +缓存系统已集成到 `init_agent` 函数中,会自动根据配置生成缓存键并缓存 Agent。 + +```python +from agent.deep_assistant import init_agent +from agent.agent_config import AgentConfig + +# 创建配置 +config = AgentConfig( + bot_id="your-bot-id", + session_id="user-session-123", + # ... 其他配置 +) + +# 初始化 Agent(会自动使用内存缓存) +agent = await init_agent(config) +``` + +### 2. 手动使用缓存 + +```python +from agent.agent_memory_cache import get_memory_cache_manager + +# 获取全局缓存管理器 +cache = get_memory_cache_manager() + +# 设置缓存 +cache.set("my-key", {"data": "value"}, ttl=300) # 5分钟过期 + +# 获取缓存 +value = cache.get("my-key") +if value is not None: + print("缓存命中!") + print(f"数据: {value}") +else: + print("缓存未命中") + +# 删除缓存 +cache.delete("my-key") + +# 获取统计信息 +stats = cache.get_stats() +print(f"命中率: {stats['hit_rate_percent']}%") +print(f"内存使用: {stats['memory_usage_mb']} MB") +print(f"缓存项数: {stats['total_items']}") +``` + +## 缓存特性详解 + +### 1. TTL(Time To Live)过期 + +每个缓存项都有生存时间,超过时间后自动失效: + +```python +# 设置 10 秒过期的缓存 +cache.set("temp_data", "value", ttl=10) + +# 10 秒后访问会返回 None +``` + +### 2. 自动续期 + +当 `auto_renew=True` 时,每次访问缓存会自动延长其过期时间: + +```python +cache.set("data", {"key": "value"}, ttl=60) # 1分钟过期 + +# 50 秒后访问 +value = cache.get("data") # 成功获取,且过期时间重置为 60 秒 +``` + +### 3. LRU 淘汰策略 + +当缓存达到最大容量时,会自动淘汰最久未使用的项: + +```python +cache = AgentMemoryCacheManager(max_size=2) + +cache.set("key1", "value1") # 缓存: [key1] +cache.set("key2", "value2") # 缓存: [key1, key2] +cache.set("key3", "value3") # 淘汰 key1,缓存: [key2, key3] +``` + +### 4. 线程安全 + +缓存管理器使用 RLock 确保线程安全,可以在多线程环境中安全使用: + +```python +import threading + +def worker(cache, thread_id): + for i in range(100): + cache.set(f"key_{thread_id}_{i}", f"value_{i}") + value = cache.get(f"key_{thread_id}_{i}") + +threads = [] +for i in range(10): + t = threading.Thread(target=worker, args=(cache, i)) + threads.append(t) + t.start() + +for t in threads: + t.join() # 安全等待所有线程完成 +``` + +## 缓存管理工具 + +项目提供了缓存管理脚本: + +```bash +# 查看缓存统计 +poetry run python scripts/cache_manager.py stats + +# 清空所有缓存 +poetry run python scripts/cache_manager.py clear + +# 列出缓存键 +poetry run python scripts/cache_manager.py list + +# 删除特定缓存键 +poetry run python scripts/cache_manager.py delete my-cache-key +``` + +## 性能考虑 + +### 内存使用估算 + +内存缓存会占用系统内存,每个 Agent 对象的大小取决于其内部状态: + +```python +# 获取内存使用估算 +stats = cache.get_stats() +print(f"内存使用: {stats['memory_usage_mb']} MB") +``` + +### 性能优化建议 + +1. **合理的缓存大小**: + - 根据可用内存设置 `max_size` + - 建议不超过可用内存的 20-30% + +2. **适当的 TTL**: + - 频繁访问的 Agent 可以设置较长的 TTL + - 不常用的 Agent 使用较短的 TTL + +3. **监控命中率**: + ```python + stats = cache.get_stats() + if stats['hit_rate_percent'] < 70: + logger.warning("缓存命中率较低,考虑调整 TTL 或缓存大小") + ``` + +4. **定期清理**: + ```python + # 清理超过 1 小时的缓存 + cache.cleanup_old_entries(max_age_seconds=3600) + ``` + +## 与磁盘缓存的比较 + +| 特性 | 内存缓存 | 磁盘缓存 | +|------|----------|----------| +| 访问速度 | 极快(纳秒级) | 较慢(毫秒级) | +| 持久化 | 进程重启后丢失 | 持久化保存 | +| 内存占用 | 较高 | 较低 | +| 并发性能 | 优秀 | 受 I/O 限制 | +| 使用场景 | 高频访问、临时缓存 | 长期存储、低频访问 | + +## 注意事项 + +1. **内存限制**:内存缓存受限于系统可用内存,注意监控内存使用 +2. **进程隔离**:每个进程有独立的缓存,多进程环境下不共享 +3. **重启丢失**:服务重启后缓存会丢失,首次访问需要重建 +4. **合理配置**:根据实际需求调整缓存大小和 TTL + +## 故障排查 + +### 缓存命中率低 + +1. 检查 TTL 是否过短 +2. 检查缓存大小是否过小 +3. 查看日志中的缓存键是否一致 + +### 内存使用过高 + +1. 减少 `max_size` 配置 +2. 缩短 TTL 时间 +3. 定期调用 `cleanup_old_entries` + +### 性能问题 + +1. 检查是否有大量并发访问 +2. 考虑使用连接池或分布式缓存方案 +3. 监控 GC(垃圾回收)压力 + +## 示例输出 + +``` +INFO: AgentMemoryCacheManager initialized with max_size: 1000, default_ttl: 180s, auto_renew: True +INFO: Using cached agent for session: user-123, cache_key: agent_cache_a1b2c3d4 +INFO: Cached agent for key: agent_cache_e5f6g7h8, ttl: 180s +INFO: Cache stats - Total items: 15, Hit rate: 85.5%, Memory: 12.34 MB +``` \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index f052c70..eb2a21b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -265,6 +265,18 @@ files = [ {file = "bracex-2.6.tar.gz", hash = "sha256:98f1347cd77e22ee8d967a30ad4e310b233f7754dbf31ff3fceb76145ba47dc7"}, ] +[[package]] +name = "cachetools" +version = "6.2.4" +description = "Extensible memoizing collections and decorators" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "cachetools-6.2.4-py3-none-any.whl", hash = "sha256:69a7a52634fed8b8bf6e24a050fb60bff1c9bd8f6d24572b99c32d4e71e62a51"}, + {file = "cachetools-6.2.4.tar.gz", hash = "sha256:82c5c05585e70b6ba2d3ae09ea60b79548872185d2f24ae1f2709d37299fd607"}, +] + [[package]] name = "certifi" version = "2025.10.5" @@ -4288,4 +4300,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "0bdd60e5655503f6d5b5a1aa401cf5dfc5b51f4de5de1fa9970d0ceb10914c28" +content-hash = "a68bc624d1e70c475a496d81739797e72492ac7b39c133132a6297287d2eaf52" diff --git a/pyproject.toml b/pyproject.toml index 45746b6..2f4a976 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "deepagents (>=0.3.0,<0.4.0)", "langchain-mcp-adapters (>=0.2.1,<0.3.0)", "langchain-openai (>=1.1.1,<2.0.0)", + "cachetools (>=6.2.4,<7.0.0)", ] [tool.poetry.requires-plugins] diff --git a/routes/chat.py b/routes/chat.py index 692d71f..d50e153 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -17,8 +17,8 @@ from utils.fastapi_utils import ( call_preamble_llm, create_stream_chunk ) -from langchain_core.messages import AIMessageChunk, HumanMessage, ToolMessage, AIMessage -from utils.settings import MAX_OUTPUT_TOKENS, MAX_CACHED_AGENTS, SHARD_COUNT +from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage +from utils.settings import MAX_OUTPUT_TOKENS from agent.agent_config import AgentConfig from agent.deep_assistant import init_agent @@ -265,22 +265,14 @@ async def create_agent_and_generate_response( agent = await init_agent(config) # 如果有 checkpointer,检查是否有历史记录 - if config.session_id: - # 检查 checkpointer - checkpointer = None - if hasattr(agent, '_checkpointer'): - checkpointer = agent._checkpointer + if config.session_id and agent.checkpointer: + from agent.checkpoint_utils import check_checkpoint_history, prepare_messages_for_agent + has_history = await check_checkpoint_history(agent.checkpointer, config.session_id) - if checkpointer: - from agent.checkpoint_utils import check_checkpoint_history, prepare_messages_for_agent - has_history = await check_checkpoint_history(checkpointer, config.session_id) + # 更新 config.messages + config.messages = prepare_messages_for_agent(config.messages, has_history) - # 更新 config.messages - config.messages = prepare_messages_for_agent(config.messages, has_history) - - logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages") - else: - logger.warning(f"No checkpointer found for session {config.session_id}") + logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages") else: logger.debug(f"No session_id provided, skipping checkpoint check") diff --git a/scripts/cache_manager.py b/scripts/cache_manager.py new file mode 100644 index 0000000..e49cfed --- /dev/null +++ b/scripts/cache_manager.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +""" +Agent 缓存管理工具 +用于管理和维护 DiskAgent 缓存 +""" +import sys +import os +import argparse +import json + +# 添加项目根目录到 Python 路径 +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from agent.agent_cache_manager import get_cache_manager +from agent.agent_memory_cache import get_memory_cache_manager + + +def show_stats(): + """显示缓存统计信息""" + print("\n=== Agent Cache Statistics ===") + + print("\n--- Memory Cache ---") + mem_cache = get_memory_cache_manager() + mem_stats = mem_cache.get_stats() + print(f"Cache Type: {mem_stats.get('type', 'memory')}") + print(f"Total Items: {mem_stats.get('total_items', 0)}") + print(f"Max Size: {mem_stats.get('max_size', 0)}") + print(f"Default TTL: {mem_stats.get('default_ttl', 0)} seconds") + print(f"Auto Renew: {mem_stats.get('auto_renew', False)}") + print(f"Hits: {mem_stats.get('hits', 0)}") + print(f"Misses: {mem_stats.get('misses', 0)}") + print(f"Hit Rate: {mem_stats.get('hit_rate_percent', 0)}%") + print(f"Memory Usage: {mem_stats.get('memory_usage_mb', 0)} MB") + print(f"Evictions: {mem_stats.get('evictions', 0)}") + + print("\n--- Disk Cache (if used) ---") + try: + disk_cache = get_cache_manager() + disk_stats = disk_cache.get_stats() + print(f"Cache Directory: {disk_stats.get('cache_dir', 'N/A')}") + print(f"Total Items: {disk_stats.get('total_items', 0)}") + print(f"Current Size: {disk_stats.get('size_mb', 0)} MB") + print(f"Size Limit: {disk_stats.get('size_limit_mb', 0)} MB") + print(f"Usage: {disk_stats.get('size_mb', 0) / disk_stats.get('size_limit_mb', 1) * 100:.1f}%") + except: + print("No disk cache configured") + + print("=" * 30) + + +def clear_all(): + """清空所有缓存""" + print("\nWarning: This will clear ALL cached agents!") + confirm = input("Are you sure? (yes/no): ") + + if confirm.lower() == 'yes': + # 清空内存缓存 + mem_cache = get_memory_cache_manager() + mem_success = mem_cache.clear_all() + print(f"Memory cache cleared: {'✓' if mem_success else '✗'}") + + # 清空磁盘缓存(如果存在) + try: + disk_cache = get_cache_manager() + disk_success = disk_cache.clear_all() + print(f"Disk cache cleared: {'✓' if disk_success else '✗'}") + except: + print("No disk cache to clear") + else: + print("Operation cancelled") + + +def clear_expired(): + """清理过期的缓存""" + cache_manager = get_cache_manager() + + count = cache_manager.clear_expired() + if count > 0: + print(f"✓ Cleared {count} expired cache entries") + else: + print("No expired entries found") + + +def cleanup_old(): + """清理旧的缓存项""" + parser = argparse.ArgumentParser(description='Clean up old cache entries') + parser.add_argument('--max-age', type=int, default=3600, + help='Maximum age in seconds (default: 3600 = 1 hour)') + args = parser.parse_args(sys.argv[2:]) + + cache_manager = get_cache_manager() + + count = cache_manager.cleanup_old_entries(args.max_age) + if count > 0: + print(f"✓ Cleaned up {count} old cache entries (older than {args.max_age} seconds)") + else: + print("No old entries found") + + +def list_keys(): + """列出所有缓存键(前10个)""" + print("\n=== Cache Keys (showing first 10) ===") + + print("\n--- Memory Cache Keys ---") + mem_cache = get_memory_cache_manager() + mem_keys = mem_cache.get_keys() + + for i, key in enumerate(mem_keys[:10]): + print(f" - {key}") + + if len(mem_keys) > 10: + print(f" ... and {len(mem_keys) - 10} more in memory cache") + + print(f"\nTotal memory cache keys: {len(mem_keys)}") + + print("\n--- Disk Cache Keys ---") + try: + disk_cache = get_cache_manager() + disk_keys = list(disk_cache.cache.iterkeys()) + + for i, key in enumerate(disk_keys[:10]): + print(f" - {key}") + + if len(disk_keys) > 10: + print(f" ... and {len(disk_keys) - 10} more in disk cache") + + print(f"\nTotal disk cache keys: {len(disk_keys)}") + except: + print("No disk cache configured") + + print("=" * 30) + + +def delete_key(): + """删除特定的缓存键""" + if len(sys.argv) < 3: + print("Usage: python cache_manager.py delete ") + return + + cache_key = sys.argv[2] + cache_manager = get_cache_manager() + + if cache_manager.delete(cache_key): + print(f"✓ Deleted cache key: {cache_key}") + else: + print(f"✗ Cache key not found: {cache_key}") + + +def main(): + """主函数""" + if len(sys.argv) < 2: + print("\nAgent Cache Manager") + print("\nUsage:") + print(" python cache_manager.py [options]") + print("\nCommands:") + print(" stats - Show cache statistics") + print(" clear - Clear ALL cache entries") + print(" expired - Clear expired entries") + print(" cleanup - Clean up old entries (use --max-age)") + print(" list - List cache keys") + print(" delete - Delete specific cache key") + print("\nExamples:") + print(" python cache_manager.py stats") + print(" python cache_manager.py cleanup --max-age 7200") + print(" python cache_manager.py delete my-cache-key") + return + + command = sys.argv[1] + + try: + if command == 'stats': + show_stats() + elif command == 'clear': + clear_all() + elif command == 'expired': + clear_expired() + elif command == 'cleanup': + cleanup_old() + elif command == 'list': + list_keys() + elif command == 'delete': + delete_key() + else: + print(f"Unknown command: {command}") + print("Use 'python cache_manager.py' to see available commands") + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file