Convert all Chinese comments, docstrings, logger/print output, HTTPException detail messages, and API response messages to English across the entire codebase. Functional zh/ja localized strings (e.g. prompt templates, timezone display names, date formats) are preserved as-is. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
111 lines
3.5 KiB
Python
111 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Configuration file cache module.
|
|
Provides asynchronous cached file reads to reduce file I/O overhead under concurrent load.
|
|
"""
|
|
import asyncio
|
|
import os
|
|
import json
|
|
from typing import Dict, Tuple, Optional, Any
|
|
import logging
|
|
|
|
logger = logging.getLogger('app')
|
|
|
|
|
|
class ConfigFileCache:
|
|
"""Configuration file cache.
|
|
|
|
Provides a cache based on file modification time to avoid re-reading unchanged files.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._cache: Dict[str, Tuple[Any, float]] = {} # {file_path: (content, mtime)}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def get_text_file(self, file_path: str) -> Optional[str]:
|
|
"""Get text file content with caching.
|
|
|
|
Args:
|
|
file_path: File path
|
|
|
|
Returns:
|
|
File content as a string, or None if the file does not exist or cannot be read
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
return None
|
|
|
|
current_mtime = os.path.getmtime(file_path)
|
|
|
|
# Check whether the cache is still valid without acquiring the lock.
|
|
if file_path in self._cache:
|
|
cached_content, cached_mtime = self._cache[file_path]
|
|
if current_mtime == cached_mtime:
|
|
logger.debug(f"Using cached file: {file_path}")
|
|
return cached_content
|
|
|
|
# Read the file and update the cache while holding the lock.
|
|
async with self._lock:
|
|
# Check the cache again in case another coroutine updated it while we were waiting.
|
|
if file_path in self._cache:
|
|
cached_content, cached_mtime = self._cache[file_path]
|
|
if current_mtime == cached_mtime:
|
|
logger.debug(f"Using cached file: {file_path}")
|
|
return cached_content
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
self._cache[file_path] = (content, current_mtime)
|
|
logger.debug(f"Cached file: {file_path}")
|
|
return content
|
|
except Exception as e:
|
|
logger.error(f"Failed to read text file {file_path}: {e}")
|
|
return None
|
|
|
|
async def get_json_file(self, file_path: str) -> Optional[Dict]:
|
|
"""Get JSON file content with caching.
|
|
|
|
Args:
|
|
file_path: JSON file path
|
|
|
|
Returns:
|
|
Parsed dictionary, or None if the file does not exist, cannot be read,
|
|
or contains invalid JSON
|
|
"""
|
|
content = await self.get_text_file(file_path)
|
|
if content:
|
|
try:
|
|
return json.loads(content)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON from {file_path}: {e}")
|
|
return None
|
|
|
|
def clear_cache(self, file_path: str = None):
|
|
"""Clear the cache.
|
|
|
|
Args:
|
|
file_path: File path to clear. If None, clear the entire cache.
|
|
"""
|
|
if file_path:
|
|
self._cache.pop(file_path, None)
|
|
logger.debug(f"Cleared file cache: {file_path}")
|
|
else:
|
|
cleared_count = len(self._cache)
|
|
self._cache.clear()
|
|
logger.debug(f"Cleared all cache entries, total files: {cleared_count}")
|
|
|
|
def get_cache_stats(self) -> Dict:
|
|
"""Get cache statistics.
|
|
|
|
Returns:
|
|
Dictionary containing cache statistics
|
|
"""
|
|
return {
|
|
"cached_files": len(self._cache),
|
|
"cached_paths": list(self._cache.keys())
|
|
}
|
|
|
|
|
|
# Global cache instance
|
|
config_cache = ConfigFileCache()
|