This commit is contained in:
朱潮 2025-12-17 23:05:42 +08:00
parent 436dab1a2f
commit 47f1d1c7eb
9 changed files with 1053 additions and 28 deletions

View File

@ -4,6 +4,7 @@ from typing import Optional, List, Dict, Any, TYPE_CHECKING
from dataclasses import dataclass, field
import logging
import json
import hashlib
logger = logging.getLogger('app')
@ -141,4 +142,36 @@ class AgentConfig:
config["callbacks"] = [self.logging_handler]
if self.session_id:
config["configurable"] = {"thread_id": self.session_id}
return config
return config
def get_unique_cache_id(self) -> Optional[str]:
"""
生成唯一的缓存键
基于session_id, bot_id, system_prompt, mcp_settings生成唯一的缓存键
如果没有session_id返回None表示不使用缓存
Returns:
str: 唯一的缓存键如果没有session_id则返回None
"""
# 创建用于生成哈希的数据
cache_components = {
'bot_id': self.bot_id,
'system_prompt': self.system_prompt,
'mcp_settings': self.mcp_settings,
'model_name': self.model_name,
'language': self.language,
'generate_cfg': self.generate_cfg,
'enable_thinking': self.enable_thinking,
'user_identifier': self.user_identifier,
'session_id': self.session_id,
}
# 将组件转换为字符串并连接
cache_string = json.dumps(cache_components, sort_keys=True)
# 生成SHA256哈希作为缓存键
cache_hash = hashlib.sha256(cache_string.encode('utf-8')).hexdigest()
# 返回带有前缀的缓存键,便于调试
return f"agent_cache_{cache_hash[:16]}" # 使用前16位哈希值

339
agent/agent_memory_cache.py Normal file
View File

@ -0,0 +1,339 @@
"""
基于内存的 Agent 缓存管理模块
使用 cachetools 库实现 TTLCache LRUCache
"""
import os
import logging
import time
import threading
from typing import Any, Optional, Dict, Tuple
from collections import OrderedDict
from datetime import datetime, timedelta
import cachetools
logger = logging.getLogger('app')
class AgentMemoryCacheManager:
"""
使用 cachetools 实现的内存缓存管理器
- 基于内存存储访问速度快
- 支持自动过期时间TTL
- 支持缓存大小限制和 LRU 淘汰策略
- 支持访问时自动延长过期时间
- 线程安全使用 threading.Lock
"""
def __init__(
self,
max_size: int = 1000, # 默认最多缓存 1000 个 Agent
default_ttl: int = 180, # 默认 3 分钟过期
auto_renew: bool = True # 访问时自动延长过期时间
):
"""
初始化内存缓存管理器
Args:
max_size: 最大缓存项数
default_ttl: 默认过期时间
auto_renew: 是否在访问时自动延长过期时间
"""
# 使用 TTLCache 实现带过期时间的缓存
self.cache = cachetools.TTLCache(
maxsize=max_size,
ttl=default_ttl,
timer=time.monotonic
)
# 用于存储每个键的过期时间信息(支持自动续期)
self._expire_times: Dict[str, float] = {}
# 用于存储创建时间
self._create_times: Dict[str, float] = {}
# 锁,确保线程安全
self._lock = threading.RLock()
self.default_ttl = default_ttl
self.auto_renew = auto_renew
self.max_size = max_size
# 统计信息
self._hits = 0
self._misses = 0
self._sets = 0
self._evictions = 0
logger.info(f"AgentMemoryCacheManager initialized with max_size: {max_size}, "
f"default_ttl: {default_ttl}s, auto_renew: {auto_renew}")
def get(self, cache_key: str) -> Optional[Any]:
"""
获取缓存的 Agent
Args:
cache_key: 缓存键
Returns:
Agent 对象或 None
"""
with self._lock:
current_time = time.monotonic()
# 首先检查是否过期
if cache_key in self._expire_times:
if current_time > self._expire_times[cache_key]:
# 已过期,清理
self._remove_expired(cache_key)
self._misses += 1
logger.debug(f"Cache miss (expired) for key: {cache_key}")
return None
# 尝试从缓存获取
try:
value = self.cache[cache_key]
# 如果启用自动续期
if self.auto_renew:
self._expire_times[cache_key] = current_time + self.default_ttl
logger.debug(f"Cache hit and renewed for key: {cache_key}")
else:
logger.debug(f"Cache hit for key: {cache_key}")
self._hits += 1
return value
except KeyError:
self._misses += 1
logger.debug(f"Cache miss for key: {cache_key}")
return None
def set(self, cache_key: str, agent: Any, ttl: Optional[int] = None) -> bool:
"""
缓存 Agent 对象
Args:
cache_key: 缓存键
agent: 要缓存的 Agent 对象
ttl: 过期时间如果为 None 则使用默认值
Returns:
是否成功设置缓存
"""
with self._lock:
try:
if ttl is None:
ttl = self.default_ttl
current_time = time.monotonic()
expire_time = current_time + ttl
# 检查是否需要驱逐项
evicted_key = None
if cache_key not in self.cache and len(self.cache) >= self.max_size:
# cachetools 的 TTLCache 会自动驱逐,但我们要记录
# 先获取可能被驱逐的键
oldest_key = next(iter(self.cache)) if self.cache else None
if oldest_key:
evicted_key = oldest_key
# 设置缓存
self.cache[cache_key] = agent
self._expire_times[cache_key] = expire_time
self._create_times[cache_key] = current_time
# 清理被驱逐的项的元数据
if evicted_key and evicted_key != cache_key:
self._cleanup_metadata(evicted_key)
self._evictions += 1
logger.debug(f"Evicted cache key: {evicted_key}")
self._sets += 1
logger.info(f"Cached agent for key: {cache_key}, ttl: {ttl}s")
return True
except Exception as e:
logger.error(f"Error setting cache for key {cache_key}: {e}")
return False
def delete(self, cache_key: str) -> bool:
"""
删除特定的缓存项
Args:
cache_key: 缓存键
Returns:
是否成功删除
"""
with self._lock:
try:
# 从缓存中删除
deleted = cache_key in self.cache
if deleted:
del self.cache[cache_key]
# 清理元数据
self._cleanup_metadata(cache_key)
if deleted:
logger.info(f"Deleted cache for key: {cache_key}")
else:
logger.warning(f"Cache key not found for deletion: {cache_key}")
return deleted
except Exception as e:
logger.error(f"Error deleting cache for key {cache_key}: {e}")
return False
def _remove_expired(self, cache_key: str):
"""移除过期的缓存项"""
if cache_key in self.cache:
del self.cache[cache_key]
self._cleanup_metadata(cache_key)
def _cleanup_metadata(self, cache_key: str):
"""清理指定键的元数据"""
self._expire_times.pop(cache_key, None)
self._create_times.pop(cache_key, None)
def clear_all(self) -> bool:
"""
清空所有缓存
Returns:
是否成功清空
"""
with self._lock:
try:
count = len(self.cache)
self.cache.clear()
self._expire_times.clear()
self._create_times.clear()
# 重置统计信息
self._hits = 0
self._misses = 0
self._sets = 0
self._evictions = 0
logger.info(f"Cleared all cache entries, total: {count}")
return True
except Exception as e:
logger.error(f"Error clearing all cache: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
"""
获取缓存统计信息
Returns:
包含统计信息的字典
"""
with self._lock:
total_requests = self._hits + self._misses
hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0
return {
"type": "memory",
"total_items": len(self.cache),
"max_size": self.max_size,
"default_ttl": self.default_ttl,
"auto_renew": self.auto_renew,
"hits": self._hits,
"misses": self._misses,
"hit_rate_percent": round(hit_rate, 2),
"sets": self._sets,
"evictions": self._evictions,
"memory_usage_mb": round(self._estimate_memory_usage() / 1024 / 1024, 2)
}
def _estimate_memory_usage(self) -> int:
"""估算内存使用量(字节)"""
# 这是一个粗略的估算
import sys
total_size = 0
# 估算缓存项的大小
for key, value in self.cache.items():
total_size += sys.getsizeof(key)
total_size += sys.getsizeof(value)
# 估算元数据的大小
total_size += sys.getsizeof(self._expire_times)
total_size += sys.getsizeof(self._create_times)
return total_size
def cleanup_old_entries(self, max_age_seconds: int = 3600) -> int:
"""
清理超过指定时间的所有缓存项
Args:
max_age_seconds: 最大存在时间
Returns:
清理的缓存项数量
"""
with self._lock:
current_time = time.monotonic()
keys_to_delete = []
# 查找超过最大时间的项
for cache_key, create_time in self._create_times.items():
age_seconds = current_time - create_time
if age_seconds > max_age_seconds:
keys_to_delete.append(cache_key)
# 删除旧项
deleted_count = 0
for key in keys_to_delete:
if self.delete(key):
deleted_count += 1
logger.info(f"Cleaned up {deleted_count} old cache entries older than {max_age_seconds}s")
return deleted_count
def get_keys(self) -> list:
"""
获取所有缓存键
Returns:
缓存键列表
"""
with self._lock:
return list(self.cache.keys())
def __len__(self) -> int:
"""返回缓存中的项数"""
return len(self.cache)
# 全局缓存管理器实例
_global_cache_manager: Optional[AgentMemoryCacheManager] = None
def get_memory_cache_manager() -> AgentMemoryCacheManager:
"""
获取全局内存缓存管理器实例单例模式
Returns:
AgentMemoryCacheManager 实例
"""
global _global_cache_manager
if _global_cache_manager is None:
# 从环境变量或使用默认值
max_size = int(os.getenv("AGENT_CACHE_MAX_SIZE", "20"))
default_ttl = int(os.getenv("AGENT_CACHE_TTL", "180"))
auto_renew = os.getenv("AGENT_CACHE_AUTO_RENEW", "true").lower() == "true"
_global_cache_manager = AgentMemoryCacheManager(
max_size=max_size,
default_ttl=default_ttl,
auto_renew=auto_renew
)
return _global_cache_manager

View File

@ -1,8 +1,7 @@
import json
import logging
import os
import sqlite3
from typing import Any, Dict, Optional, List
import time
from typing import Any, Dict
from langchain.chat_models import init_chat_model
# from deepagents import create_deep_agent
from langchain.agents import create_agent
@ -10,15 +9,18 @@ from langchain.agents.middleware import SummarizationMiddleware
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.checkpoint.memory import MemorySaver
from utils.fastapi_utils import detect_provider
# 全局 MemorySaver 实例
_global_checkpointer = MemorySaver()
from .guideline_middleware import GuidelineMiddleware
from .tool_output_length_middleware import ToolOutputLengthMiddleware
from utils.settings import SUMMARIZATION_MAX_TOKENS, TOOL_OUTPUT_MAX_LENGTH
from agent.agent_config import AgentConfig
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
from agent.agent_memory_cache import get_memory_cache_manager
logger = logging.getLogger('app')
# 全局 MemorySaver 实例
_global_checkpointer = MemorySaver()
# 使用内存缓存管理器
# Utility functions
def read_system_prompt():
@ -68,6 +70,25 @@ async def init_agent(config: AgentConfig):
config: AgentConfig 对象包含所有初始化参数
mcp: MCP配置如果为None则使用配置中的mcp_settings
"""
# 获取缓存管理器
cache_manager = get_memory_cache_manager()
# 获取唯一的缓存键
cache_key = config.get_unique_cache_id()
# 如果有缓存键,检查缓存
if cache_key:
# 尝试从缓存中获取 agent
cached_agent = cache_manager.get(cache_key)
if cached_agent is not None:
logger.info(f"Using cached agent for session: {config.session_id}, cache_key: {cache_key}")
return cached_agent
else:
logger.info(f"Cache miss for session: {config.session_id}, cache_key: {cache_key}")
# 没有缓存或缓存已过期,创建新的 agent
logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}")
final_system_prompt = await load_system_prompt_async(
config.project_dir, config.language, config.system_prompt, config.robot_type, config.bot_id, config.user_identifier
)
@ -137,8 +158,10 @@ async def init_agent(config: AgentConfig):
checkpointer=checkpointer # 传入 checkpointer 以启用持久化
)
# 将 checkpointer 作为属性附加到 agent 上,方便访问
if checkpointer:
agent._checkpointer = checkpointer
# 如果有缓存键,将 agent 加入缓存
if cache_key:
# 使用 DiskCache 缓存管理器存储 agent
cache_manager.set(cache_key, agent)
logger.info(f"Cached agent for session: {config.session_id}, cache_key: {cache_key}")
return agent

183
docs/agent_cache.md Normal file
View File

@ -0,0 +1,183 @@
# Agent 缓存系统
本文档描述了使用 DiskCache 实现的 Agent 缓存系统。
## 概述
Agent 缓存系统使用 DiskCache 来持久化缓存 Agent 实例,提供以下功能:
- ✅ **文件持久化**:缓存存储在磁盘上,重启后仍然保留
- ✅ **自动过期**:支持设置过期时间,自动清理过期项
- ✅ **缓存大小限制**:可设置最大缓存大小,自动淘汰旧数据
- ✅ **访问时自动续期**:每次访问时自动延长过期时间
- ✅ **线程安全**:支持多线程并发访问
## 配置
### 环境变量
可以通过以下环境变量配置缓存行为:
- `AGENT_CACHE_DIR`:缓存目录路径(默认:`./projects/agent_cache`
- `AGENT_CACHE_SIZE_LIMIT`:缓存大小限制,字节(默认:`1073741824` = 1GB
- `AGENT_CACHE_EXPIRE`:默认过期时间,秒(默认:`180` = 3分钟
示例:
```bash
export AGENT_CACHE_DIR="/var/cache/agent"
export AGENT_CACHE_SIZE_LIMIT="5368709120" # 5GB
export AGENT_CACHE_EXPIRE="600" # 10分钟
```
### 代码配置
```python
from agent.agent_cache_manager import AgentCacheManager
# 创建缓存管理器
cache = AgentCacheManager(
cache_dir="./cache/agent_cache",
size_limit=1024*1024*1024, # 1GB
default_expire=180, # 3分钟
eviction_policy="least-recently-used" # LRU淘汰策略
)
```
## 使用方法
### 1. 缓存 Agent
缓存系统已集成到 `init_agent` 函数中,会自动根据配置生成缓存键并缓存 Agent。
```python
from agent.deep_assistant import init_agent
from agent.agent_config import AgentConfig
# 创建配置
config = AgentConfig(
bot_id="your-bot-id",
session_id="user-session-123",
# ... 其他配置
)
# 初始化 Agent会自动使用缓存
agent = await init_agent(config)
```
### 2. 手动使用缓存
如果需要在其他地方使用缓存:
```python
from agent.agent_cache_manager import get_cache_manager
# 获取全局缓存管理器
cache = get_cache_manager()
# 设置缓存
cache.set("my-key", {"data": "value"}, expire=300) # 5分钟过期
# 获取缓存
value = cache.get("my-key")
if value is not None:
print("缓存命中!")
else:
print("缓存未命中")
# 删除缓存
cache.delete("my-key")
# 获取统计信息
stats = cache.get_stats()
print(f"缓存大小: {stats['size_mb']} MB")
print(f"缓存项数: {stats['total_items']}")
```
## 缓存管理工具
项目提供了缓存管理脚本:
```bash
# 查看缓存统计
poetry run python scripts/cache_manager.py stats
# 清理过期项
poetry run python scripts/cache_manager.py expired
# 清理超过1小时的项
poetry run python scripts/cache_manager.py cleanup --max-age 3600
# 清空所有缓存
poetry run python scripts/cache_manager.py clear
# 列出缓存键
poetry run python scripts/cache_manager.py list
# 删除特定缓存键
poetry run python scripts/cache_manager.py delete my-cache-key
```
## 缓存键生成
缓存键基于以下配置生成唯一哈希值:
- `bot_id`
- `system_prompt`
- `mcp_settings`
- `model_name`
- `language`
- `generate_cfg`
- `enable_thinking`
- `user_identifier`
- `session_id`
如果 `session_id` 为 None则不使用缓存。
## 注意事项
1. **性能考虑**
- 缓存存储在磁盘上,首次访问会有 I/O 开销
- 建议使用 SSD 存储缓存以提高性能
- 过期时间不宜设置过短,避免频繁重新创建 Agent
2. **内存管理**
- Agent 对象可能占用较大内存
- 设置合理的缓存大小限制
- 定期清理不用的缓存项
3. **并发安全**
- DiskCache 支持多进程并发访问
- 但在高并发场景下可能成为瓶颈
4. **调试**
- 查看日志中的缓存相关信息
- 使用 `stats` 命令监控缓存使用情况
- 必要时清空缓存重置状态
## 故障排查
### 缓存不生效
1. 检查 `session_id` 是否设置(必须有 session_id 才会使用缓存)
2. 检查缓存目录是否存在且有写权限
3. 查看日志中是否有缓存相关错误
### 缓存过多
1. 检查缓存大小限制设置
2. 使用 `cleanup` 命令清理旧缓存
3. 调整默认过期时间
### 性能问题
1. 监控缓存命中率
2. 考虑增加缓存大小
3. 优化 Agent 创建速度
## 示例输出
```
INFO: Using cached agent for session: user-123, cache_key: agent_cache_a1b2c3d4
INFO: Cache stats - Total items: 5, Size: 256.78 MB
INFO: Cached agent for session: user-456, cache_key: agent_cache_e5f6g7h8
```

251
docs/agent_memory_cache.md Normal file
View File

@ -0,0 +1,251 @@
# Agent 内存缓存系统
本文档描述了基于 cachetools 实现的 Agent 内存缓存系统。
## 概述
Agent 内存缓存系统使用 cachetools 库在内存中缓存 Agent 实例,提供以下功能:
- ✅ **高性能访问**:基于内存存储,访问速度极快(纳秒级)
- ✅ **自动过期**:支持设置 TTLTime To Live自动清理过期项
- ✅ **LRU 淘汰策略**:当缓存满时,自动淘汰最久未使用的项
- ✅ **访问时自动续期**:每次访问时自动延长过期时间
- ✅ **线程安全**:使用 RLock 确保多线程并发安全
- ✅ **统计信息**:提供详细的缓存命中率等统计信息
## 配置
### 环境变量
可以通过以下环境变量配置缓存行为:
- `AGENT_CACHE_MAX_SIZE`:最大缓存项数(默认:`1000`
- `AGENT_CACHE_TTL`:默认过期时间,秒(默认:`180` = 3分钟
- `AGENT_CACHE_AUTO_RENEW`:是否自动续期(默认:`true`
示例:
```bash
export AGENT_CACHE_MAX_SIZE="500"
export AGENT_CACHE_TTL="300" # 5分钟
export AGENT_CACHE_AUTO_RENEW="false"
```
### 代码配置
```python
from agent.agent_memory_cache import AgentMemoryCacheManager
# 创建缓存管理器
cache = AgentMemoryCacheManager(
max_size=1000, # 最多缓存 1000 个 Agent
default_ttl=180, # 3分钟过期
auto_renew=True # 访问时自动续期
)
```
## 使用方法
### 1. 自动缓存(推荐)
缓存系统已集成到 `init_agent` 函数中,会自动根据配置生成缓存键并缓存 Agent。
```python
from agent.deep_assistant import init_agent
from agent.agent_config import AgentConfig
# 创建配置
config = AgentConfig(
bot_id="your-bot-id",
session_id="user-session-123",
# ... 其他配置
)
# 初始化 Agent会自动使用内存缓存
agent = await init_agent(config)
```
### 2. 手动使用缓存
```python
from agent.agent_memory_cache import get_memory_cache_manager
# 获取全局缓存管理器
cache = get_memory_cache_manager()
# 设置缓存
cache.set("my-key", {"data": "value"}, ttl=300) # 5分钟过期
# 获取缓存
value = cache.get("my-key")
if value is not None:
print("缓存命中!")
print(f"数据: {value}")
else:
print("缓存未命中")
# 删除缓存
cache.delete("my-key")
# 获取统计信息
stats = cache.get_stats()
print(f"命中率: {stats['hit_rate_percent']}%")
print(f"内存使用: {stats['memory_usage_mb']} MB")
print(f"缓存项数: {stats['total_items']}")
```
## 缓存特性详解
### 1. TTLTime To Live过期
每个缓存项都有生存时间,超过时间后自动失效:
```python
# 设置 10 秒过期的缓存
cache.set("temp_data", "value", ttl=10)
# 10 秒后访问会返回 None
```
### 2. 自动续期
`auto_renew=True` 时,每次访问缓存会自动延长其过期时间:
```python
cache.set("data", {"key": "value"}, ttl=60) # 1分钟过期
# 50 秒后访问
value = cache.get("data") # 成功获取,且过期时间重置为 60 秒
```
### 3. LRU 淘汰策略
当缓存达到最大容量时,会自动淘汰最久未使用的项:
```python
cache = AgentMemoryCacheManager(max_size=2)
cache.set("key1", "value1") # 缓存: [key1]
cache.set("key2", "value2") # 缓存: [key1, key2]
cache.set("key3", "value3") # 淘汰 key1缓存: [key2, key3]
```
### 4. 线程安全
缓存管理器使用 RLock 确保线程安全,可以在多线程环境中安全使用:
```python
import threading
def worker(cache, thread_id):
for i in range(100):
cache.set(f"key_{thread_id}_{i}", f"value_{i}")
value = cache.get(f"key_{thread_id}_{i}")
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(cache, i))
threads.append(t)
t.start()
for t in threads:
t.join() # 安全等待所有线程完成
```
## 缓存管理工具
项目提供了缓存管理脚本:
```bash
# 查看缓存统计
poetry run python scripts/cache_manager.py stats
# 清空所有缓存
poetry run python scripts/cache_manager.py clear
# 列出缓存键
poetry run python scripts/cache_manager.py list
# 删除特定缓存键
poetry run python scripts/cache_manager.py delete my-cache-key
```
## 性能考虑
### 内存使用估算
内存缓存会占用系统内存,每个 Agent 对象的大小取决于其内部状态:
```python
# 获取内存使用估算
stats = cache.get_stats()
print(f"内存使用: {stats['memory_usage_mb']} MB")
```
### 性能优化建议
1. **合理的缓存大小**
- 根据可用内存设置 `max_size`
- 建议不超过可用内存的 20-30%
2. **适当的 TTL**
- 频繁访问的 Agent 可以设置较长的 TTL
- 不常用的 Agent 使用较短的 TTL
3. **监控命中率**
```python
stats = cache.get_stats()
if stats['hit_rate_percent'] < 70:
logger.warning("缓存命中率较低,考虑调整 TTL 或缓存大小")
```
4. **定期清理**
```python
# 清理超过 1 小时的缓存
cache.cleanup_old_entries(max_age_seconds=3600)
```
## 与磁盘缓存的比较
| 特性 | 内存缓存 | 磁盘缓存 |
|------|----------|----------|
| 访问速度 | 极快(纳秒级) | 较慢(毫秒级) |
| 持久化 | 进程重启后丢失 | 持久化保存 |
| 内存占用 | 较高 | 较低 |
| 并发性能 | 优秀 | 受 I/O 限制 |
| 使用场景 | 高频访问、临时缓存 | 长期存储、低频访问 |
## 注意事项
1. **内存限制**:内存缓存受限于系统可用内存,注意监控内存使用
2. **进程隔离**:每个进程有独立的缓存,多进程环境下不共享
3. **重启丢失**:服务重启后缓存会丢失,首次访问需要重建
4. **合理配置**:根据实际需求调整缓存大小和 TTL
## 故障排查
### 缓存命中率低
1. 检查 TTL 是否过短
2. 检查缓存大小是否过小
3. 查看日志中的缓存键是否一致
### 内存使用过高
1. 减少 `max_size` 配置
2. 缩短 TTL 时间
3. 定期调用 `cleanup_old_entries`
### 性能问题
1. 检查是否有大量并发访问
2. 考虑使用连接池或分布式缓存方案
3. 监控 GC垃圾回收压力
## 示例输出
```
INFO: AgentMemoryCacheManager initialized with max_size: 1000, default_ttl: 180s, auto_renew: True
INFO: Using cached agent for session: user-123, cache_key: agent_cache_a1b2c3d4
INFO: Cached agent for key: agent_cache_e5f6g7h8, ttl: 180s
INFO: Cache stats - Total items: 15, Hit rate: 85.5%, Memory: 12.34 MB
```

14
poetry.lock generated
View File

@ -265,6 +265,18 @@ files = [
{file = "bracex-2.6.tar.gz", hash = "sha256:98f1347cd77e22ee8d967a30ad4e310b233f7754dbf31ff3fceb76145ba47dc7"},
]
[[package]]
name = "cachetools"
version = "6.2.4"
description = "Extensible memoizing collections and decorators"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "cachetools-6.2.4-py3-none-any.whl", hash = "sha256:69a7a52634fed8b8bf6e24a050fb60bff1c9bd8f6d24572b99c32d4e71e62a51"},
{file = "cachetools-6.2.4.tar.gz", hash = "sha256:82c5c05585e70b6ba2d3ae09ea60b79548872185d2f24ae1f2709d37299fd607"},
]
[[package]]
name = "certifi"
version = "2025.10.5"
@ -4288,4 +4300,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "0bdd60e5655503f6d5b5a1aa401cf5dfc5b51f4de5de1fa9970d0ceb10914c28"
content-hash = "a68bc624d1e70c475a496d81739797e72492ac7b39c133132a6297287d2eaf52"

View File

@ -29,6 +29,7 @@ dependencies = [
"deepagents (>=0.3.0,<0.4.0)",
"langchain-mcp-adapters (>=0.2.1,<0.3.0)",
"langchain-openai (>=1.1.1,<2.0.0)",
"cachetools (>=6.2.4,<7.0.0)",
]
[tool.poetry.requires-plugins]

View File

@ -17,8 +17,8 @@ from utils.fastapi_utils import (
call_preamble_llm,
create_stream_chunk
)
from langchain_core.messages import AIMessageChunk, HumanMessage, ToolMessage, AIMessage
from utils.settings import MAX_OUTPUT_TOKENS, MAX_CACHED_AGENTS, SHARD_COUNT
from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage
from utils.settings import MAX_OUTPUT_TOKENS
from agent.agent_config import AgentConfig
from agent.deep_assistant import init_agent
@ -265,22 +265,14 @@ async def create_agent_and_generate_response(
agent = await init_agent(config)
# 如果有 checkpointer检查是否有历史记录
if config.session_id:
# 检查 checkpointer
checkpointer = None
if hasattr(agent, '_checkpointer'):
checkpointer = agent._checkpointer
if config.session_id and agent.checkpointer:
from agent.checkpoint_utils import check_checkpoint_history, prepare_messages_for_agent
has_history = await check_checkpoint_history(agent.checkpointer, config.session_id)
if checkpointer:
from agent.checkpoint_utils import check_checkpoint_history, prepare_messages_for_agent
has_history = await check_checkpoint_history(checkpointer, config.session_id)
# 更新 config.messages
config.messages = prepare_messages_for_agent(config.messages, has_history)
# 更新 config.messages
config.messages = prepare_messages_for_agent(config.messages, has_history)
logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages")
else:
logger.warning(f"No checkpointer found for session {config.session_id}")
logger.info(f"Session {config.session_id}: has_history={has_history}, sending {len(config.messages)} messages")
else:
logger.debug(f"No session_id provided, skipping checkpoint check")

191
scripts/cache_manager.py Normal file
View File

@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""
Agent 缓存管理工具
用于管理和维护 DiskAgent 缓存
"""
import sys
import os
import argparse
import json
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from agent.agent_cache_manager import get_cache_manager
from agent.agent_memory_cache import get_memory_cache_manager
def show_stats():
"""显示缓存统计信息"""
print("\n=== Agent Cache Statistics ===")
print("\n--- Memory Cache ---")
mem_cache = get_memory_cache_manager()
mem_stats = mem_cache.get_stats()
print(f"Cache Type: {mem_stats.get('type', 'memory')}")
print(f"Total Items: {mem_stats.get('total_items', 0)}")
print(f"Max Size: {mem_stats.get('max_size', 0)}")
print(f"Default TTL: {mem_stats.get('default_ttl', 0)} seconds")
print(f"Auto Renew: {mem_stats.get('auto_renew', False)}")
print(f"Hits: {mem_stats.get('hits', 0)}")
print(f"Misses: {mem_stats.get('misses', 0)}")
print(f"Hit Rate: {mem_stats.get('hit_rate_percent', 0)}%")
print(f"Memory Usage: {mem_stats.get('memory_usage_mb', 0)} MB")
print(f"Evictions: {mem_stats.get('evictions', 0)}")
print("\n--- Disk Cache (if used) ---")
try:
disk_cache = get_cache_manager()
disk_stats = disk_cache.get_stats()
print(f"Cache Directory: {disk_stats.get('cache_dir', 'N/A')}")
print(f"Total Items: {disk_stats.get('total_items', 0)}")
print(f"Current Size: {disk_stats.get('size_mb', 0)} MB")
print(f"Size Limit: {disk_stats.get('size_limit_mb', 0)} MB")
print(f"Usage: {disk_stats.get('size_mb', 0) / disk_stats.get('size_limit_mb', 1) * 100:.1f}%")
except:
print("No disk cache configured")
print("=" * 30)
def clear_all():
"""清空所有缓存"""
print("\nWarning: This will clear ALL cached agents!")
confirm = input("Are you sure? (yes/no): ")
if confirm.lower() == 'yes':
# 清空内存缓存
mem_cache = get_memory_cache_manager()
mem_success = mem_cache.clear_all()
print(f"Memory cache cleared: {'' if mem_success else ''}")
# 清空磁盘缓存(如果存在)
try:
disk_cache = get_cache_manager()
disk_success = disk_cache.clear_all()
print(f"Disk cache cleared: {'' if disk_success else ''}")
except:
print("No disk cache to clear")
else:
print("Operation cancelled")
def clear_expired():
"""清理过期的缓存"""
cache_manager = get_cache_manager()
count = cache_manager.clear_expired()
if count > 0:
print(f"✓ Cleared {count} expired cache entries")
else:
print("No expired entries found")
def cleanup_old():
"""清理旧的缓存项"""
parser = argparse.ArgumentParser(description='Clean up old cache entries')
parser.add_argument('--max-age', type=int, default=3600,
help='Maximum age in seconds (default: 3600 = 1 hour)')
args = parser.parse_args(sys.argv[2:])
cache_manager = get_cache_manager()
count = cache_manager.cleanup_old_entries(args.max_age)
if count > 0:
print(f"✓ Cleaned up {count} old cache entries (older than {args.max_age} seconds)")
else:
print("No old entries found")
def list_keys():
"""列出所有缓存键前10个"""
print("\n=== Cache Keys (showing first 10) ===")
print("\n--- Memory Cache Keys ---")
mem_cache = get_memory_cache_manager()
mem_keys = mem_cache.get_keys()
for i, key in enumerate(mem_keys[:10]):
print(f" - {key}")
if len(mem_keys) > 10:
print(f" ... and {len(mem_keys) - 10} more in memory cache")
print(f"\nTotal memory cache keys: {len(mem_keys)}")
print("\n--- Disk Cache Keys ---")
try:
disk_cache = get_cache_manager()
disk_keys = list(disk_cache.cache.iterkeys())
for i, key in enumerate(disk_keys[:10]):
print(f" - {key}")
if len(disk_keys) > 10:
print(f" ... and {len(disk_keys) - 10} more in disk cache")
print(f"\nTotal disk cache keys: {len(disk_keys)}")
except:
print("No disk cache configured")
print("=" * 30)
def delete_key():
"""删除特定的缓存键"""
if len(sys.argv) < 3:
print("Usage: python cache_manager.py delete <cache_key>")
return
cache_key = sys.argv[2]
cache_manager = get_cache_manager()
if cache_manager.delete(cache_key):
print(f"✓ Deleted cache key: {cache_key}")
else:
print(f"✗ Cache key not found: {cache_key}")
def main():
"""主函数"""
if len(sys.argv) < 2:
print("\nAgent Cache Manager")
print("\nUsage:")
print(" python cache_manager.py <command> [options]")
print("\nCommands:")
print(" stats - Show cache statistics")
print(" clear - Clear ALL cache entries")
print(" expired - Clear expired entries")
print(" cleanup - Clean up old entries (use --max-age)")
print(" list - List cache keys")
print(" delete - Delete specific cache key")
print("\nExamples:")
print(" python cache_manager.py stats")
print(" python cache_manager.py cleanup --max-age 7200")
print(" python cache_manager.py delete my-cache-key")
return
command = sys.argv[1]
try:
if command == 'stats':
show_stats()
elif command == 'clear':
clear_all()
elif command == 'expired':
clear_expired()
elif command == 'cleanup':
cleanup_old()
elif command == 'list':
list_keys()
elif command == 'delete':
delete_key()
else:
print(f"Unknown command: {command}")
print("Use 'python cache_manager.py' to see available commands")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()