339 lines
10 KiB
Python
339 lines
10 KiB
Python
"""
|
||
基于内存的 Agent 缓存管理模块
|
||
使用 cachetools 库实现 TTLCache 和 LRUCache
|
||
"""
|
||
import os
|
||
import logging
|
||
import time
|
||
import threading
|
||
from typing import Any, Optional, Dict, Tuple
|
||
from collections import OrderedDict
|
||
from datetime import datetime, timedelta
|
||
|
||
import cachetools
|
||
|
||
logger = logging.getLogger('app')
|
||
|
||
|
||
class AgentMemoryCacheManager:
|
||
"""
|
||
使用 cachetools 实现的内存缓存管理器
|
||
- 基于内存存储,访问速度快
|
||
- 支持自动过期时间(TTL)
|
||
- 支持缓存大小限制和 LRU 淘汰策略
|
||
- 支持访问时自动延长过期时间
|
||
- 线程安全(使用 threading.Lock)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
max_size: int = 1000, # 默认最多缓存 1000 个 Agent
|
||
default_ttl: int = 180, # 默认 3 分钟过期
|
||
auto_renew: bool = True # 访问时自动延长过期时间
|
||
):
|
||
"""
|
||
初始化内存缓存管理器
|
||
|
||
Args:
|
||
max_size: 最大缓存项数
|
||
default_ttl: 默认过期时间(秒)
|
||
auto_renew: 是否在访问时自动延长过期时间
|
||
"""
|
||
# 使用 TTLCache 实现带过期时间的缓存
|
||
self.cache = cachetools.TTLCache(
|
||
maxsize=max_size,
|
||
ttl=default_ttl,
|
||
timer=time.monotonic
|
||
)
|
||
|
||
# 用于存储每个键的过期时间信息(支持自动续期)
|
||
self._expire_times: Dict[str, float] = {}
|
||
|
||
# 用于存储创建时间
|
||
self._create_times: Dict[str, float] = {}
|
||
|
||
# 锁,确保线程安全
|
||
self._lock = threading.RLock()
|
||
|
||
self.default_ttl = default_ttl
|
||
self.auto_renew = auto_renew
|
||
self.max_size = max_size
|
||
|
||
# 统计信息
|
||
self._hits = 0
|
||
self._misses = 0
|
||
self._sets = 0
|
||
self._evictions = 0
|
||
|
||
logger.info(f"AgentMemoryCacheManager initialized with max_size: {max_size}, "
|
||
f"default_ttl: {default_ttl}s, auto_renew: {auto_renew}")
|
||
|
||
def get(self, cache_key: str) -> Optional[Any]:
|
||
"""
|
||
获取缓存的 Agent
|
||
|
||
Args:
|
||
cache_key: 缓存键
|
||
|
||
Returns:
|
||
Agent 对象或 None
|
||
"""
|
||
with self._lock:
|
||
current_time = time.monotonic()
|
||
|
||
# 首先检查是否过期
|
||
if cache_key in self._expire_times:
|
||
if current_time > self._expire_times[cache_key]:
|
||
# 已过期,清理
|
||
self._remove_expired(cache_key)
|
||
self._misses += 1
|
||
logger.debug(f"Cache miss (expired) for key: {cache_key}")
|
||
return None
|
||
|
||
# 尝试从缓存获取
|
||
try:
|
||
value = self.cache[cache_key]
|
||
|
||
# 如果启用自动续期
|
||
if self.auto_renew:
|
||
self._expire_times[cache_key] = current_time + self.default_ttl
|
||
logger.debug(f"Cache hit and renewed for key: {cache_key}")
|
||
else:
|
||
logger.debug(f"Cache hit for key: {cache_key}")
|
||
|
||
self._hits += 1
|
||
return value
|
||
|
||
except KeyError:
|
||
self._misses += 1
|
||
logger.debug(f"Cache miss for key: {cache_key}")
|
||
return None
|
||
|
||
def set(self, cache_key: str, agent: Any, ttl: Optional[int] = None) -> bool:
|
||
"""
|
||
缓存 Agent 对象
|
||
|
||
Args:
|
||
cache_key: 缓存键
|
||
agent: 要缓存的 Agent 对象
|
||
ttl: 过期时间(秒),如果为 None 则使用默认值
|
||
|
||
Returns:
|
||
是否成功设置缓存
|
||
"""
|
||
with self._lock:
|
||
try:
|
||
if ttl is None:
|
||
ttl = self.default_ttl
|
||
|
||
current_time = time.monotonic()
|
||
expire_time = current_time + ttl
|
||
|
||
# 检查是否需要驱逐项
|
||
evicted_key = None
|
||
if cache_key not in self.cache and len(self.cache) >= self.max_size:
|
||
# cachetools 的 TTLCache 会自动驱逐,但我们要记录
|
||
# 先获取可能被驱逐的键
|
||
oldest_key = next(iter(self.cache)) if self.cache else None
|
||
if oldest_key:
|
||
evicted_key = oldest_key
|
||
|
||
# 设置缓存
|
||
self.cache[cache_key] = agent
|
||
self._expire_times[cache_key] = expire_time
|
||
self._create_times[cache_key] = current_time
|
||
|
||
# 清理被驱逐的项的元数据
|
||
if evicted_key and evicted_key != cache_key:
|
||
self._cleanup_metadata(evicted_key)
|
||
self._evictions += 1
|
||
logger.debug(f"Evicted cache key: {evicted_key}")
|
||
|
||
self._sets += 1
|
||
|
||
logger.info(f"Cached agent for key: {cache_key}, ttl: {ttl}s")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error setting cache for key {cache_key}: {e}")
|
||
return False
|
||
|
||
def delete(self, cache_key: str) -> bool:
|
||
"""
|
||
删除特定的缓存项
|
||
|
||
Args:
|
||
cache_key: 缓存键
|
||
|
||
Returns:
|
||
是否成功删除
|
||
"""
|
||
with self._lock:
|
||
try:
|
||
# 从缓存中删除
|
||
deleted = cache_key in self.cache
|
||
if deleted:
|
||
del self.cache[cache_key]
|
||
|
||
# 清理元数据
|
||
self._cleanup_metadata(cache_key)
|
||
|
||
if deleted:
|
||
logger.info(f"Deleted cache for key: {cache_key}")
|
||
else:
|
||
logger.warning(f"Cache key not found for deletion: {cache_key}")
|
||
|
||
return deleted
|
||
except Exception as e:
|
||
logger.error(f"Error deleting cache for key {cache_key}: {e}")
|
||
return False
|
||
|
||
def _remove_expired(self, cache_key: str):
|
||
"""移除过期的缓存项"""
|
||
if cache_key in self.cache:
|
||
del self.cache[cache_key]
|
||
self._cleanup_metadata(cache_key)
|
||
|
||
def _cleanup_metadata(self, cache_key: str):
|
||
"""清理指定键的元数据"""
|
||
self._expire_times.pop(cache_key, None)
|
||
self._create_times.pop(cache_key, None)
|
||
|
||
def clear_all(self) -> bool:
|
||
"""
|
||
清空所有缓存
|
||
|
||
Returns:
|
||
是否成功清空
|
||
"""
|
||
with self._lock:
|
||
try:
|
||
count = len(self.cache)
|
||
self.cache.clear()
|
||
self._expire_times.clear()
|
||
self._create_times.clear()
|
||
|
||
# 重置统计信息
|
||
self._hits = 0
|
||
self._misses = 0
|
||
self._sets = 0
|
||
self._evictions = 0
|
||
|
||
logger.info(f"Cleared all cache entries, total: {count}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Error clearing all cache: {e}")
|
||
return False
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""
|
||
获取缓存统计信息
|
||
|
||
Returns:
|
||
包含统计信息的字典
|
||
"""
|
||
with self._lock:
|
||
total_requests = self._hits + self._misses
|
||
hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0
|
||
|
||
return {
|
||
"type": "memory",
|
||
"total_items": len(self.cache),
|
||
"max_size": self.max_size,
|
||
"default_ttl": self.default_ttl,
|
||
"auto_renew": self.auto_renew,
|
||
"hits": self._hits,
|
||
"misses": self._misses,
|
||
"hit_rate_percent": round(hit_rate, 2),
|
||
"sets": self._sets,
|
||
"evictions": self._evictions,
|
||
"memory_usage_mb": round(self._estimate_memory_usage() / 1024 / 1024, 2)
|
||
}
|
||
|
||
def _estimate_memory_usage(self) -> int:
|
||
"""估算内存使用量(字节)"""
|
||
# 这是一个粗略的估算
|
||
import sys
|
||
total_size = 0
|
||
|
||
# 估算缓存项的大小
|
||
for key, value in self.cache.items():
|
||
total_size += sys.getsizeof(key)
|
||
total_size += sys.getsizeof(value)
|
||
|
||
# 估算元数据的大小
|
||
total_size += sys.getsizeof(self._expire_times)
|
||
total_size += sys.getsizeof(self._create_times)
|
||
|
||
return total_size
|
||
|
||
def cleanup_old_entries(self, max_age_seconds: int = 3600) -> int:
|
||
"""
|
||
清理超过指定时间的所有缓存项
|
||
|
||
Args:
|
||
max_age_seconds: 最大存在时间(秒)
|
||
|
||
Returns:
|
||
清理的缓存项数量
|
||
"""
|
||
with self._lock:
|
||
current_time = time.monotonic()
|
||
keys_to_delete = []
|
||
|
||
# 查找超过最大时间的项
|
||
for cache_key, create_time in self._create_times.items():
|
||
age_seconds = current_time - create_time
|
||
if age_seconds > max_age_seconds:
|
||
keys_to_delete.append(cache_key)
|
||
|
||
# 删除旧项
|
||
deleted_count = 0
|
||
for key in keys_to_delete:
|
||
if self.delete(key):
|
||
deleted_count += 1
|
||
|
||
logger.info(f"Cleaned up {deleted_count} old cache entries older than {max_age_seconds}s")
|
||
return deleted_count
|
||
|
||
def get_keys(self) -> list:
|
||
"""
|
||
获取所有缓存键
|
||
|
||
Returns:
|
||
缓存键列表
|
||
"""
|
||
with self._lock:
|
||
return list(self.cache.keys())
|
||
|
||
def __len__(self) -> int:
|
||
"""返回缓存中的项数"""
|
||
return len(self.cache)
|
||
|
||
|
||
# 全局缓存管理器实例
|
||
_global_cache_manager: Optional[AgentMemoryCacheManager] = None
|
||
|
||
|
||
def get_memory_cache_manager() -> AgentMemoryCacheManager:
|
||
"""
|
||
获取全局内存缓存管理器实例(单例模式)
|
||
|
||
Returns:
|
||
AgentMemoryCacheManager 实例
|
||
"""
|
||
global _global_cache_manager
|
||
|
||
if _global_cache_manager is None:
|
||
# 从环境变量或使用默认值
|
||
max_size = int(os.getenv("AGENT_CACHE_MAX_SIZE", "20"))
|
||
default_ttl = int(os.getenv("AGENT_CACHE_TTL", "180"))
|
||
auto_renew = os.getenv("AGENT_CACHE_AUTO_RENEW", "true").lower() == "true"
|
||
|
||
_global_cache_manager = AgentMemoryCacheManager(
|
||
max_size=max_size,
|
||
default_ttl=default_ttl,
|
||
auto_renew=auto_renew
|
||
)
|
||
|
||
return _global_cache_manager |