qwen_agent/agent/agent_memory_cache.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

378 lines
12 KiB
Python

"""
In-memory agent cache management module.
Uses the cachetools library to provide TTLCache and LRUCache behavior.
"""
import hashlib
import json
import logging
import time
import threading
from typing import Any, Optional, Dict, List
import cachetools
from utils.settings import TOOL_CACHE_MAX_SIZE, TOOL_CACHE_TTL, TOOL_CACHE_AUTO_RENEW
logger = logging.getLogger('app')
class AgentMemoryCacheManager:
"""
In-memory cache manager implemented with cachetools.
- Uses in-memory storage for fast access
- Supports automatic expiration (TTL)
- Supports cache size limits and LRU eviction
- Supports automatic TTL renewal on access
- Thread-safe via threading.Lock
"""
def __init__(
self,
max_size: int = 1000, # Cache up to 1000 agents by default
default_ttl: int = 180, # Expire after 3 minutes by default
auto_renew: bool = True # Automatically renew TTL on access
):
"""
Initialize the in-memory cache manager.
Args:
max_size: Maximum number of cached items
default_ttl: Default expiration time in seconds
auto_renew: Whether to automatically renew TTL on access
"""
# Use TTLCache to provide expiration-aware caching
self.cache = cachetools.TTLCache(
maxsize=max_size,
ttl=default_ttl,
timer=time.monotonic
)
# Store per-key expiration timestamps to support auto-renewal
self._expire_times: Dict[str, float] = {}
# Store per-key creation timestamps
self._create_times: Dict[str, float] = {}
# Lock to ensure thread safety
self._lock = threading.RLock()
self.default_ttl = default_ttl
self.auto_renew = auto_renew
self.max_size = max_size
# Statistics
self._hits = 0
self._misses = 0
self._sets = 0
self._evictions = 0
logger.info(f"AgentMemoryCacheManager initialized with max_size: {max_size}, "
f"default_ttl: {default_ttl}s, auto_renew: {auto_renew}")
def get(self, cache_key: str) -> Optional[Any]:
"""
Get a cached agent.
Args:
cache_key: Cache key
Returns:
Agent object or None
"""
with self._lock:
current_time = time.monotonic()
# Check expiration first
if cache_key in self._expire_times:
if current_time > self._expire_times[cache_key]:
# Expired, clean it up
self._remove_expired(cache_key)
self._misses += 1
logger.debug(f"Cache miss (expired) for key: {cache_key}")
return None
# Try to get the value from cache
try:
value = self.cache[cache_key]
# Renew TTL automatically if enabled
if self.auto_renew:
self._expire_times[cache_key] = current_time + self.default_ttl
logger.debug(f"Cache hit and renewed for key: {cache_key}")
else:
logger.debug(f"Cache hit for key: {cache_key}")
self._hits += 1
return value
except KeyError:
self._misses += 1
logger.debug(f"Cache miss for key: {cache_key}")
return None
def set(self, cache_key: str, agent: Any, ttl: Optional[int] = None) -> bool:
"""
Cache an agent object.
Args:
cache_key: Cache key
agent: Agent object to cache
ttl: Expiration time in seconds; uses default if None
Returns:
Whether the cache entry was set successfully
"""
with self._lock:
try:
if ttl is None:
ttl = self.default_ttl
current_time = time.monotonic()
expire_time = current_time + ttl
# Check whether an item may need to be evicted
evicted_key = None
if cache_key not in self.cache and len(self.cache) >= self.max_size:
# TTLCache evicts automatically, but we still want to record it
# Capture the oldest key that may be evicted first
oldest_key = next(iter(self.cache)) if self.cache else None
if oldest_key:
evicted_key = oldest_key
# Set the cache entry
self.cache[cache_key] = agent
self._expire_times[cache_key] = expire_time
self._create_times[cache_key] = current_time
# Clean metadata for the evicted item
if evicted_key and evicted_key != cache_key:
self._cleanup_metadata(evicted_key)
self._evictions += 1
logger.debug(f"Evicted cache key: {evicted_key}")
self._sets += 1
logger.info(f"Cached agent for key: {cache_key}, ttl: {ttl}s")
return True
except Exception as e:
logger.error(f"Error setting cache for key {cache_key}: {e}")
return False
def delete(self, cache_key: str) -> bool:
"""
Delete a specific cache entry.
Args:
cache_key: Cache key
Returns:
Whether deletion succeeded
"""
with self._lock:
try:
# Remove from cache
deleted = cache_key in self.cache
if deleted:
del self.cache[cache_key]
# Clean metadata
self._cleanup_metadata(cache_key)
if deleted:
logger.info(f"Deleted cache for key: {cache_key}")
else:
logger.warning(f"Cache key not found for deletion: {cache_key}")
return deleted
except Exception as e:
logger.error(f"Error deleting cache for key {cache_key}: {e}")
return False
def _remove_expired(self, cache_key: str):
"""Remove an expired cache entry."""
if cache_key in self.cache:
del self.cache[cache_key]
self._cleanup_metadata(cache_key)
def _cleanup_metadata(self, cache_key: str):
"""Clean metadata for the specified key."""
self._expire_times.pop(cache_key, None)
self._create_times.pop(cache_key, None)
def clear_all(self) -> bool:
"""
Clear all cache entries.
Returns:
Whether clearing succeeded
"""
with self._lock:
try:
count = len(self.cache)
self.cache.clear()
self._expire_times.clear()
self._create_times.clear()
# Reset statistics
self._hits = 0
self._misses = 0
self._sets = 0
self._evictions = 0
logger.info(f"Cleared all cache entries, total: {count}")
return True
except Exception as e:
logger.error(f"Error clearing all cache: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dictionary containing cache statistics
"""
with self._lock:
total_requests = self._hits + self._misses
hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0
return {
"type": "memory",
"total_items": len(self.cache),
"max_size": self.max_size,
"default_ttl": self.default_ttl,
"auto_renew": self.auto_renew,
"hits": self._hits,
"misses": self._misses,
"hit_rate_percent": round(hit_rate, 2),
"sets": self._sets,
"evictions": self._evictions,
"memory_usage_mb": round(self._estimate_memory_usage() / 1024 / 1024, 2)
}
def _estimate_memory_usage(self) -> int:
"""Estimate memory usage in bytes."""
# This is a rough estimate
import sys
total_size = 0
# Estimate cache entry sizes
for key, value in self.cache.items():
total_size += sys.getsizeof(key)
total_size += sys.getsizeof(value)
# Estimate metadata sizes
total_size += sys.getsizeof(self._expire_times)
total_size += sys.getsizeof(self._create_times)
return total_size
def cleanup_old_entries(self, max_age_seconds: int = 3600) -> int:
"""
Remove all cache entries older than the specified age.
Args:
max_age_seconds: Maximum age in seconds
Returns:
Number of removed cache entries
"""
with self._lock:
current_time = time.monotonic()
keys_to_delete = []
# Find entries older than the maximum age
for cache_key, create_time in self._create_times.items():
age_seconds = current_time - create_time
if age_seconds > max_age_seconds:
keys_to_delete.append(cache_key)
# Delete old entries
deleted_count = 0
for key in keys_to_delete:
if self.delete(key):
deleted_count += 1
logger.info(f"Cleaned up {deleted_count} old cache entries older than {max_age_seconds}s")
return deleted_count
def get_keys(self) -> list:
"""
Get all cache keys.
Returns:
List of cache keys
"""
with self._lock:
return list(self.cache.keys())
def __len__(self) -> int:
"""Return the number of items in the cache."""
return len(self.cache)
def get_mcp_tools(self, mcp_settings: dict) -> Optional[List]:
"""
Get cached MCP tools.
Args:
mcp_settings: MCP settings dictionary
Returns:
Cached list of tools or None
"""
cache_key = self._get_mcp_cache_key(mcp_settings)
return self.get(cache_key)
def set_mcp_tools(self, mcp_settings: dict, tools: List, ttl: Optional[int] = None) -> bool:
"""
Cache MCP tools.
Args:
mcp_settings: MCP settings dictionary
tools: List of tools to cache
ttl: Expiration time in seconds; uses default if None
Returns:
Whether the cache entry was set successfully
"""
cache_key = self._get_mcp_cache_key(mcp_settings)
return self.set(cache_key, tools, ttl=ttl)
def _get_mcp_cache_key(self, mcp_settings: dict) -> str:
"""
Generate a cache key from mcp_settings.
Args:
mcp_settings: MCP settings dictionary
Returns:
Cache key string
"""
# Convert mcp_settings to a JSON string and hash it
settings_str = json.dumps(mcp_settings, sort_keys=True)
return f"mcp_tools:{hashlib.md5(settings_str.encode()).hexdigest()}"
# Global cache manager instance
_global_cache_manager: Optional[AgentMemoryCacheManager] = None
def get_memory_cache_manager() -> AgentMemoryCacheManager:
"""
Get the global in-memory cache manager instance as a singleton.
Returns:
AgentMemoryCacheManager instance
"""
global _global_cache_manager
if _global_cache_manager is None:
# Import configuration from settings
_global_cache_manager = AgentMemoryCacheManager(
max_size=TOOL_CACHE_MAX_SIZE,
default_ttl=TOOL_CACHE_TTL,
auto_renew=TOOL_CACHE_AUTO_RENEW
)
return _global_cache_manager