add CHECKPOINT_DB_URL

This commit is contained in:
朱潮 2025-12-24 20:43:10 +08:00
parent 7a21df05a3
commit 0e6b2f1511
6 changed files with 221 additions and 357 deletions

View File

@ -1,196 +1,92 @@
""" """
全局 SQLite Checkpointer 管理器 全局 PostgreSQL Checkpointer 管理器
解决高并发场景下的数据库锁定问题 使用 psycopg_pool 连接池AsyncPostgresSaver 原生支持
每个 session 使用独立的数据库文件避免并发锁竞争
""" """
import asyncio import asyncio
import logging import logging
import os from typing import Optional
import time
from typing import Optional, Dict, Any, Tuple
import aiosqlite from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from utils.settings import ( from utils.settings import (
CHECKPOINT_DB_PATH, CHECKPOINT_DB_URL,
CHECKPOINT_WAL_MODE, CHECKPOINT_POOL_SIZE,
CHECKPOINT_BUSY_TIMEOUT,
CHECKPOINT_CLEANUP_ENABLED, CHECKPOINT_CLEANUP_ENABLED,
CHECKPOINT_CLEANUP_INTERVAL_HOURS, CHECKPOINT_CLEANUP_INTERVAL_HOURS,
CHECKPOINT_CLEANUP_OLDER_THAN_DAYS, CHECKPOINT_CLEANUP_INACTIVE_DAYS,
) )
logger = logging.getLogger('app') logger = logging.getLogger('app')
# 每个 session 的连接池大小(单个 session 串行处理1 个连接即可)
POOL_SIZE_PER_SESSION = 1
class CheckpointerManager: class CheckpointerManager:
""" """
全局 Checkpointer 管理器session_id 分离数据库文件 全局 Checkpointer 管理器使用 PostgreSQL 连接池
主要功能 主要功能
1. 每个 session_id 独立的数据库文件和连接池 1. 使用 psycopg_pool.AsyncConnectionPool 管理连接
2. 按需创建连接池不用的 session 不占用资源 2. AsyncPostgresSaver 原生支持连接池自动获取/释放连接
3. 预配置 WAL 模式和 busy_timeout 3. 无需手动归还避免长请求占用连接
4. 基于文件修改时间的简单清理机制 4. 基于 SQL 查询的清理机制
5. 优雅关闭机制 5. 优雅关闭机制
""" """
def __init__(self): def __init__(self):
# 每个 (bot_id, session_id) 一个连接池 self._pool: Optional[AsyncConnectionPool] = None
self._pools: Dict[Tuple[str, str], asyncio.Queue[AsyncSqliteSaver]] = {} self._initialized = False
# 每个 session 的初始化锁
self._locks: Dict[Tuple[str, str], asyncio.Lock] = {}
# 全局锁,用于保护 pools 和 locks 字典的访问
self._global_lock = asyncio.Lock()
self._closed = False self._closed = False
# 清理调度任务 # 清理调度任务
self._cleanup_task: Optional[asyncio.Task] = None self._cleanup_task: Optional[asyncio.Task] = None
self._cleanup_stop_event = asyncio.Event() self._cleanup_stop_event = asyncio.Event()
def _get_db_path(self, bot_id: str, session_id: str) -> str: async def initialize(self) -> None:
"""获取指定 session 的数据库文件路径""" """初始化连接池"""
return os.path.join(CHECKPOINT_DB_PATH, bot_id, session_id, "checkpoints.db") if self._initialized:
def _get_pool_key(self, bot_id: str, session_id: str) -> Tuple[str, str]:
"""获取连接池的键"""
return (bot_id, session_id)
async def _initialize_session_pool(self, bot_id: str, session_id: str) -> None:
"""初始化指定 session 的连接池"""
pool_key = self._get_pool_key(bot_id, session_id)
if pool_key in self._pools:
return return
logger.info(f"Initializing checkpointer pool for bot_id={bot_id}, session_id={session_id}") logger.info(
f"Initializing PostgreSQL checkpointer pool: "
f"URL={CHECKPOINT_DB_URL}, size={CHECKPOINT_POOL_SIZE}"
)
db_path = self._get_db_path(bot_id, session_id)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
pool = asyncio.Queue()
for i in range(POOL_SIZE_PER_SESSION):
try: try:
conn = await self._create_configured_connection(db_path) # 创建 psycopg 连接池
checkpointer = AsyncSqliteSaver(conn=conn) self._pool = AsyncConnectionPool(
# 预先调用 setup 确保表结构已创建 CHECKPOINT_DB_URL,
min_size=1,
max_size=CHECKPOINT_POOL_SIZE,
open=False,
)
# 打开连接池
await self._pool.open()
# 创建表结构(需要 autocommit 模式来执行 CREATE INDEX CONCURRENTLY
async with self._pool.connection() as conn:
await conn.set_autocommit(True)
checkpointer = AsyncPostgresSaver(conn=conn)
await checkpointer.setup() await checkpointer.setup()
await pool.put(checkpointer)
logger.debug(f"Created checkpointer connection {i+1}/{POOL_SIZE_PER_SESSION} for session={session_id}") self._initialized = True
logger.info("PostgreSQL checkpointer pool initialized successfully")
except Exception as e: except Exception as e:
logger.error(f"Failed to create checkpointer connection {i+1} for session={session_id}: {e}") logger.error(f"Failed to initialize PostgreSQL checkpointer pool: {e}")
raise raise
self._pools[pool_key] = pool @property
self._locks[pool_key] = asyncio.Lock() def checkpointer(self) -> AsyncPostgresSaver:
logger.info(f"Checkpointer pool initialized for bot_id={bot_id}, session_id={session_id}") """获取全局 AsyncPostgresSaver 实例"""
async def _create_configured_connection(self, db_path: str) -> aiosqlite.Connection:
"""
创建已配置的 SQLite 连接
配置包括
1. WAL 模式 (Write-Ahead Logging) - 允许读写并发
2. busy_timeout - 等待锁定的最长时间
3. 其他优化参数
"""
conn = aiosqlite.connect(db_path)
# 等待连接建立
await conn.__aenter__()
# 设置 busy timeout必须在连接建立后设置
await conn.execute(f"PRAGMA busy_timeout = {CHECKPOINT_BUSY_TIMEOUT}")
# 如果启用 WAL 模式
if CHECKPOINT_WAL_MODE:
await conn.execute("PRAGMA journal_mode = WAL")
await conn.execute("PRAGMA synchronous = NORMAL")
# WAL 模式下的优化配置
await conn.execute("PRAGMA wal_autocheckpoint = 10000") # 增加到 10000
await conn.execute("PRAGMA cache_size = -64000") # 64MB 缓存
await conn.execute("PRAGMA temp_store = MEMORY")
await conn.execute("PRAGMA journal_size_limit = 1048576") # 1MB
await conn.commit()
return conn
async def initialize(self) -> None:
"""初始化管理器(不再预创建连接池,改为按需创建)"""
logger.info("CheckpointerManager initialized (pools will be created on-demand)")
async def acquire_for_agent(self, bot_id: str, session_id: str) -> AsyncSqliteSaver:
"""
获取指定 session checkpointer
注意此方法获取的 checkpointer 需要手动归还
使用 return_to_pool() 方法归还
Args:
bot_id: 机器人 ID
session_id: 会话 ID
Returns:
AsyncSqliteSaver 实例
"""
if self._closed: if self._closed:
raise RuntimeError("CheckpointerManager is closed") raise RuntimeError("CheckpointerManager is closed")
pool_key = self._get_pool_key(bot_id, session_id) if not self._initialized:
async with self._global_lock: raise RuntimeError("CheckpointerManager not initialized, call initialize() first")
if pool_key not in self._pools:
await self._initialize_session_pool(bot_id, session_id)
# 获取该 session 的锁,确保连接池操作线程安全 return AsyncPostgresSaver(conn=self._pool)
async with self._locks[pool_key]:
checkpointer = await self._pools[pool_key].get()
logger.debug(f"Acquired checkpointer for bot_id={bot_id}, session_id={session_id}, remaining: {self._pools[pool_key].qsize()}")
return checkpointer
async def return_to_pool(self, bot_id: str, session_id: str, checkpointer: AsyncSqliteSaver) -> None:
"""
归还 checkpointer 到对应 session 的池
Args:
bot_id: 机器人 ID
session_id: 会话 ID
checkpointer: 要归还的 checkpointer 实例
"""
pool_key = self._get_pool_key(bot_id, session_id)
if pool_key in self._pools:
async with self._locks[pool_key]:
await self._pools[pool_key].put(checkpointer)
logger.debug(f"Returned checkpointer for bot_id={bot_id}, session_id={session_id}, remaining: {self._pools[pool_key].qsize()}")
async def _close_session_pool(self, bot_id: str, session_id: str) -> None:
"""关闭指定 session 的连接池"""
pool_key = self._get_pool_key(bot_id, session_id)
if pool_key not in self._pools:
return
logger.info(f"Closing checkpointer pool for bot_id={bot_id}, session_id={session_id}")
pool = self._pools[pool_key]
while not pool.empty():
try:
checkpointer = pool.get_nowait()
if checkpointer.conn:
await checkpointer.conn.close()
except asyncio.QueueEmpty:
break
del self._pools[pool_key]
if pool_key in self._locks:
del self._locks[pool_key]
logger.info(f"Checkpointer pool closed for bot_id={bot_id}, session_id={session_id}")
async def close(self) -> None: async def close(self) -> None:
"""关闭所有连接池""" """关闭连接池"""
if self._closed: if self._closed:
return return
@ -199,122 +95,103 @@ class CheckpointerManager:
self._cleanup_stop_event.set() self._cleanup_stop_event.set()
try: try:
self._cleanup_task.cancel() self._cleanup_task.cancel()
await asyncio.sleep(0.1) # 给任务一点时间清理 await asyncio.sleep(0.1)
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
self._cleanup_task = None self._cleanup_task = None
async with self._global_lock:
if self._closed: if self._closed:
return return
logger.info("Closing CheckpointerManager...") logger.info("Closing CheckpointerManager...")
# 关闭所有 session 的连接池 if self._pool is not None:
pool_keys = list(self._pools.keys()) await self._pool.close()
for bot_id, session_id in pool_keys: self._pool = None
await self._close_session_pool(bot_id, session_id)
self._closed = True self._closed = True
self._initialized = False
logger.info("CheckpointerManager closed") logger.info("CheckpointerManager closed")
def get_pool_stats(self) -> dict:
"""获取连接池状态统计"""
return {
"session_count": len(self._pools),
"pools": {
f"{bot_id}/{session_id}": {
"available": pool.qsize(),
"pool_size": POOL_SIZE_PER_SESSION
}
for (bot_id, session_id), pool in self._pools.items()
},
"closed": self._closed
}
# ============================================================ # ============================================================
# Checkpoint 清理方法(基于文件修改时间 # Checkpoint 清理方法(基于 PostgreSQL SQL 查询)
# ============================================================ # ============================================================
async def cleanup_old_dbs(self, older_than_days: int = None) -> Dict[str, Any]: async def cleanup_old_dbs(self, inactive_days: int = None) -> dict:
""" """
根据数据库文件的修改时间清理旧数据库文件 清理旧的 checkpoint 记录
删除 N 天未活动的 thread 的所有 checkpoint
Args: Args:
older_than_days: 清理多少天前的数据默认使用配置值 inactive_days: 删除 N 天未活动的 thread默认使用配置值
Returns: Returns:
Dict: 清理统计信息 Dict: 清理统计信息
- deleted: 删除的 session 目录数量
- scanned: 扫描的 session 目录数量
- cutoff_time: 截止时间戳
""" """
if older_than_days is None: if inactive_days is None:
older_than_days = CHECKPOINT_CLEANUP_OLDER_THAN_DAYS inactive_days = CHECKPOINT_CLEANUP_INACTIVE_DAYS
cutoff_time = time.time() - older_than_days * 86400 logger.info(f"Starting checkpoint cleanup: removing threads inactive for {inactive_days} days")
logger.info(f"Starting checkpoint cleanup: removing db files not modified since {cutoff_time}")
db_dir = CHECKPOINT_DB_PATH if self._pool is None:
deleted_count = 0 logger.warning("Connection pool not initialized, skipping cleanup")
scanned_count = 0 return {"deleted": 0, "inactive_days": inactive_days}
if not os.path.exists(db_dir):
logger.info(f"Checkpoint directory does not exist: {db_dir}")
return {"deleted": 0, "scanned": 0, "cutoff_time": cutoff_time}
# 遍历 bot_id 目录
for bot_id in os.listdir(db_dir):
bot_path = os.path.join(db_dir, bot_id)
# 跳过非目录文件
if not os.path.isdir(bot_path):
continue
# 遍历 session_id 目录
for session_id in os.listdir(bot_path):
session_path = os.path.join(bot_path, session_id)
if not os.path.isdir(session_path):
continue
db_file = os.path.join(session_path, "checkpoints.db")
if not os.path.exists(db_file):
continue
scanned_count += 1
mtime = os.path.getmtime(db_file)
if mtime < cutoff_time:
# 关闭该 session 的连接池(如果有)
await self._close_session_pool(bot_id, session_id)
# 删除整个 session 目录
try: try:
import shutil # 从池中获取连接执行清理
shutil.rmtree(session_path) async with self._pool.connection() as conn:
deleted_count += 1 async with conn.cursor() as cursor:
logger.info(f"Deleted old checkpoint session: {bot_id}/{session_id}/ (last modified: {mtime})") # 查找不活跃的 thread
except Exception as e: query_find_inactive = """
logger.warning(f"Failed to delete {session_path}: {e}") SELECT DISTINCT thread_id
FROM checkpoints
WHERE (checkpoint->>'ts')::timestamp < NOW() - INTERVAL '%s days'
"""
# 清理空的 bot_id 目录 await cursor.execute(query_find_inactive, (inactive_days,))
for bot_id in os.listdir(db_dir): inactive_threads = await cursor.fetchall()
bot_path = os.path.join(db_dir, bot_id) inactive_thread_ids = [row[0] for row in inactive_threads]
if os.path.isdir(bot_path) and not os.listdir(bot_path):
try:
os.rmdir(bot_path)
logger.debug(f"Removed empty bot directory: {bot_id}/")
except Exception:
pass
result = { if not inactive_thread_ids:
"deleted": deleted_count, logger.info("No inactive threads found")
"scanned": scanned_count, return {"deleted": 0, "threads_deleted": 0, "inactive_days": inactive_days}
"cutoff_time": cutoff_time,
"older_than_days": older_than_days # 删除不活跃 thread 的 checkpoints
placeholders = ','.join(['%s'] * len(inactive_thread_ids))
query_delete_checkpoints = f"""
DELETE FROM checkpoints
WHERE thread_id IN ({placeholders})
"""
await cursor.execute(query_delete_checkpoints, inactive_thread_ids)
deleted_checkpoints = cursor.rowcount
# 同时清理 checkpoint_writes
query_delete_writes = f"""
DELETE FROM checkpoint_writes
WHERE thread_id IN ({placeholders})
"""
await cursor.execute(query_delete_writes, inactive_thread_ids)
deleted_writes = cursor.rowcount
logger.info(
f"Checkpoint cleanup completed: "
f"deleted {deleted_checkpoints} checkpoints, {deleted_writes} writes "
f"from {len(inactive_thread_ids)} inactive threads"
)
return {
"deleted": deleted_checkpoints + deleted_writes,
"threads_deleted": len(inactive_thread_ids),
"inactive_days": inactive_days
} }
logger.info(f"Checkpoint cleanup completed: {result}") except Exception as e:
return result logger.error(f"Error during checkpoint cleanup: {e}")
import traceback
logger.error(traceback.format_exc())
return {"deleted": 0, "error": str(e), "inactive_days": inactive_days}
async def _cleanup_loop(self): async def _cleanup_loop(self):
"""后台清理循环""" """后台清理循环"""
@ -323,7 +200,7 @@ class CheckpointerManager:
logger.info( logger.info(
f"Checkpoint cleanup scheduler started: " f"Checkpoint cleanup scheduler started: "
f"interval={CHECKPOINT_CLEANUP_INTERVAL_HOURS}h, " f"interval={CHECKPOINT_CLEANUP_INTERVAL_HOURS}h, "
f"older_than={CHECKPOINT_CLEANUP_OLDER_THAN_DAYS} days" f"inactive_days={CHECKPOINT_CLEANUP_INACTIVE_DAYS}"
) )
while not self._cleanup_stop_event.is_set(): while not self._cleanup_stop_event.is_set():
@ -364,7 +241,6 @@ class CheckpointerManager:
logger.warning("Cleanup scheduler is already running") logger.warning("Cleanup scheduler is already running")
return return
# 创建新的事件循环(如果需要在后台运行)
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:

View File

@ -18,8 +18,7 @@ from agent.agent_config import AgentConfig
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
from agent.agent_memory_cache import get_memory_cache_manager from agent.agent_memory_cache import get_memory_cache_manager
from .checkpoint_utils import prepare_checkpoint_message from .checkpoint_utils import prepare_checkpoint_message
import aiosqlite from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
import os import os
# 全局 MemorySaver 实例 # 全局 MemorySaver 实例
@ -180,7 +179,7 @@ async def init_agent(config: AgentConfig):
if config.session_id: if config.session_id:
from .checkpoint_manager import get_checkpointer_manager from .checkpoint_manager import get_checkpointer_manager
manager = get_checkpointer_manager() manager = get_checkpointer_manager()
checkpointer = await manager.acquire_for_agent(config.bot_id, config.session_id) checkpointer = manager.checkpointer
await prepare_checkpoint_message(config, checkpointer) await prepare_checkpoint_message(config, checkpointer)
summarization_middleware = SummarizationMiddleware( summarization_middleware = SummarizationMiddleware(
model=llm_instance, model=llm_instance,

107
poetry.lock generated
View File

@ -197,25 +197,6 @@ files = [
frozenlist = ">=1.1.0" frozenlist = ">=1.1.0"
typing-extensions = {version = ">=4.2", markers = "python_version < \"3.13\""} typing-extensions = {version = ">=4.2", markers = "python_version < \"3.13\""}
[[package]]
name = "aiosqlite"
version = "0.21.0"
description = "asyncio bridge to the standard sqlite3 module"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "aiosqlite-0.21.0-py3-none-any.whl", hash = "sha256:2549cf4057f95f53dcba16f2b64e8e2791d7e1adedb13197dd8ed77bb226d7d0"},
{file = "aiosqlite-0.21.0.tar.gz", hash = "sha256:131bb8056daa3bc875608c631c678cda73922a2d4ba8aec373b19f18c17e7aa3"},
]
[package.dependencies]
typing_extensions = ">=4.0"
[package.extras]
dev = ["attribution (==1.7.1)", "black (==24.3.0)", "build (>=1.2)", "coverage[toml] (==7.6.10)", "flake8 (==7.0.0)", "flake8-bugbear (==24.12.12)", "flit (==3.10.1)", "mypy (==1.14.1)", "ufmt (==2.5.1)", "usort (==1.0.8.post1)"]
docs = ["sphinx (==8.1.3)", "sphinx-mdinclude (==0.6.1)"]
[[package]] [[package]]
name = "annotated-types" name = "annotated-types"
version = "0.7.0" version = "0.7.0"
@ -1568,36 +1549,37 @@ xxhash = ">=3.5.0"
[[package]] [[package]]
name = "langgraph-checkpoint" name = "langgraph-checkpoint"
version = "3.0.1" version = "2.1.2"
description = "Library with base interfaces for LangGraph checkpoint savers." description = "Library with base interfaces for LangGraph checkpoint savers."
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "langgraph_checkpoint-3.0.1-py3-none-any.whl", hash = "sha256:9b04a8d0edc0474ce4eaf30c5d731cee38f11ddff50a6177eead95b5c4e4220b"}, {file = "langgraph_checkpoint-2.1.2-py3-none-any.whl", hash = "sha256:911ebffb069fd01775d4b5184c04aaafc2962fcdf50cf49d524cd4367c4d0c60"},
{file = "langgraph_checkpoint-3.0.1.tar.gz", hash = "sha256:59222f875f85186a22c494aedc65c4e985a3df27e696e5016ba0b98a5ed2cee0"}, {file = "langgraph_checkpoint-2.1.2.tar.gz", hash = "sha256:112e9d067a6eff8937caf198421b1ffba8d9207193f14ac6f89930c1260c06f9"},
] ]
[package.dependencies] [package.dependencies]
langchain-core = ">=0.2.38" langchain-core = ">=0.2.38"
ormsgpack = ">=1.12.0" ormsgpack = ">=1.10.0"
[[package]] [[package]]
name = "langgraph-checkpoint-sqlite" name = "langgraph-checkpoint-postgres"
version = "3.0.1" version = "2.0.25"
description = "Library with a SQLite implementation of LangGraph checkpoint saver." description = "Library with a Postgres implementation of LangGraph checkpoint saver."
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "langgraph_checkpoint_sqlite-3.0.1-py3-none-any.whl", hash = "sha256:616124676e5827294966997ed853f5d41490cc61f73b3c79359f4ff307728508"}, {file = "langgraph_checkpoint_postgres-2.0.25-py3-none-any.whl", hash = "sha256:cf1248a58fe828c9cfc36ee57ff118d7799ce214d4b35718e57ec98407130fb5"},
{file = "langgraph_checkpoint_sqlite-3.0.1.tar.gz", hash = "sha256:c6580138e6abfd2ade7ea49186c664d47ef28dc44538674fa47e50a8a5f8af83"}, {file = "langgraph_checkpoint_postgres-2.0.25.tar.gz", hash = "sha256:916b80f73a641a589301f6c54414974768b6d646d82db7b301ff8d47105c3613"},
] ]
[package.dependencies] [package.dependencies]
aiosqlite = ">=0.20" langgraph-checkpoint = ">=2.1.2,<3.0.0"
langgraph-checkpoint = ">=3,<4.0.0" orjson = ">=3.10.1"
sqlite-vec = ">=0.1.6" psycopg = ">=3.2.0"
psycopg-pool = ">=3.2.0"
[[package]] [[package]]
name = "langgraph-prebuilt" name = "langgraph-prebuilt"
@ -3015,6 +2997,48 @@ files = [
dev = ["abi3audit", "black", "check-manifest", "colorama ; os_name == \"nt\"", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pyreadline ; os_name == \"nt\"", "pytest", "pytest-cov", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "requests", "rstcheck", "ruff", "setuptools", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "validate-pyproject[all]", "virtualenv", "vulture", "wheel", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""] dev = ["abi3audit", "black", "check-manifest", "colorama ; os_name == \"nt\"", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pyreadline ; os_name == \"nt\"", "pytest", "pytest-cov", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "requests", "rstcheck", "ruff", "setuptools", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "validate-pyproject[all]", "virtualenv", "vulture", "wheel", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""]
test = ["pytest", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "setuptools", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""] test = ["pytest", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "setuptools", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""]
[[package]]
name = "psycopg"
version = "3.3.2"
description = "PostgreSQL database adapter for Python"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "psycopg-3.3.2-py3-none-any.whl", hash = "sha256:3e94bc5f4690247d734599af56e51bae8e0db8e4311ea413f801fef82b14a99b"},
{file = "psycopg-3.3.2.tar.gz", hash = "sha256:707a67975ee214d200511177a6a80e56e654754c9afca06a7194ea6bbfde9ca7"},
]
[package.dependencies]
typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""}
tzdata = {version = "*", markers = "sys_platform == \"win32\""}
[package.extras]
binary = ["psycopg-binary (==3.3.2) ; implementation_name != \"pypy\""]
c = ["psycopg-c (==3.3.2) ; implementation_name != \"pypy\""]
dev = ["ast-comments (>=1.1.2)", "black (>=24.1.0)", "codespell (>=2.2)", "cython-lint (>=0.16)", "dnspython (>=2.1)", "flake8 (>=4.0)", "isort-psycopg", "isort[colors] (>=6.0)", "mypy (>=1.19.0)", "pre-commit (>=4.0.1)", "types-setuptools (>=57.4)", "types-shapely (>=2.0)", "wheel (>=0.37)"]
docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"]
pool = ["psycopg-pool"]
test = ["anyio (>=4.0)", "mypy (>=1.19.0) ; implementation_name != \"pypy\"", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"]
[[package]]
name = "psycopg-pool"
version = "3.3.0"
description = "Connection Pool for Psycopg"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "psycopg_pool-3.3.0-py3-none-any.whl", hash = "sha256:2e44329155c410b5e8666372db44276a8b1ebd8c90f1c3026ebba40d4bc81063"},
{file = "psycopg_pool-3.3.0.tar.gz", hash = "sha256:fa115eb2860bd88fce1717d75611f41490dec6135efb619611142b24da3f6db5"},
]
[package.dependencies]
typing-extensions = ">=4.6"
[package.extras]
test = ["anyio (>=4.0)", "mypy (>=1.14)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"]
[[package]] [[package]]
name = "pydantic" name = "pydantic"
version = "2.10.5" version = "2.10.5"
@ -3994,21 +4018,6 @@ files = [
{file = "soupsieve-2.8.1.tar.gz", hash = "sha256:4cf733bc50fa805f5df4b8ef4740fc0e0fa6218cf3006269afd3f9d6d80fd350"}, {file = "soupsieve-2.8.1.tar.gz", hash = "sha256:4cf733bc50fa805f5df4b8ef4740fc0e0fa6218cf3006269afd3f9d6d80fd350"},
] ]
[[package]]
name = "sqlite-vec"
version = "0.1.6"
description = ""
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "sqlite_vec-0.1.6-py3-none-macosx_10_6_x86_64.whl", hash = "sha256:77491bcaa6d496f2acb5cc0d0ff0b8964434f141523c121e313f9a7d8088dee3"},
{file = "sqlite_vec-0.1.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:fdca35f7ee3243668a055255d4dee4dea7eed5a06da8cad409f89facf4595361"},
{file = "sqlite_vec-0.1.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7b0519d9cd96164cd2e08e8eed225197f9cd2f0be82cb04567692a0a4be02da3"},
{file = "sqlite_vec-0.1.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux1_x86_64.whl", hash = "sha256:823b0493add80d7fe82ab0fe25df7c0703f4752941aee1c7b2b02cec9656cb24"},
{file = "sqlite_vec-0.1.6-py3-none-win_amd64.whl", hash = "sha256:c65bcfd90fa2f41f9000052bcb8bb75d38240b2dae49225389eca6c3136d3f0c"},
]
[[package]] [[package]]
name = "sse-starlette" name = "sse-starlette"
version = "3.0.2" version = "3.0.2"
@ -5419,4 +5428,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12,<4.0" python-versions = ">=3.12,<4.0"
content-hash = "364fb7c7d9d639dda8917ff9fee399135b6406a16917b10fb537029277eec5b1" content-hash = "02ca6f912b2b258fd07ba45bece31692b3f8a937d4c77d4f50714fe49eeab77e"

View File

@ -19,7 +19,6 @@ dependencies = [
"numpy<2", "numpy<2",
"aiohttp", "aiohttp",
"aiofiles", "aiofiles",
"aiosqlite==0.21.0",
"huey (>=2.5.3,<3.0.0)", "huey (>=2.5.3,<3.0.0)",
"pandas>=1.5.0", "pandas>=1.5.0",
"openpyxl>=3.0.0", "openpyxl>=3.0.0",
@ -31,7 +30,7 @@ dependencies = [
"langchain-mcp-adapters (>=0.2.1,<0.3.0)", "langchain-mcp-adapters (>=0.2.1,<0.3.0)",
"langchain-openai (>=1.1.1,<2.0.0)", "langchain-openai (>=1.1.1,<2.0.0)",
"cachetools (>=6.2.4,<7.0.0)", "cachetools (>=6.2.4,<7.0.0)",
"langgraph-checkpoint-sqlite (>=3.0.1,<4.0.0)", "langgraph-checkpoint-postgres (>=2.0.0,<3.0.0)",
"deepagents-cli (>=0.0.11,<0.0.12)", "deepagents-cli (>=0.0.11,<0.0.12)",
] ]

View File

@ -116,11 +116,6 @@ async def enhanced_generate_stream_response(
except Exception as e: except Exception as e:
logger.error(f"Error in agent task: {e}") logger.error(f"Error in agent task: {e}")
await output_queue.put(("agent_done", None)) await output_queue.put(("agent_done", None))
finally:
if checkpointer:
from agent.checkpoint_manager import get_checkpointer_manager
manager = get_checkpointer_manager()
await manager.return_to_pool(config.bot_id, config.session_id, checkpointer)
# 并发执行任务 # 并发执行任务
# 只有在 enable_thinking 为 True 时才执行 preamble 任务 # 只有在 enable_thinking 为 True 时才执行 preamble 任务
@ -210,7 +205,6 @@ async def create_agent_and_generate_response(
) )
agent, checkpointer = await init_agent(config) agent, checkpointer = await init_agent(config)
try:
# 使用更新后的 messages # 使用更新后的 messages
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
append_messages = agent_responses["messages"][len(config.messages):] append_messages = agent_responses["messages"][len(config.messages):]
@ -245,11 +239,6 @@ async def create_agent_and_generate_response(
) )
else: else:
raise HTTPException(status_code=500, detail="No response from agent") raise HTTPException(status_code=500, detail="No response from agent")
finally:
if checkpointer:
from agent.checkpoint_manager import get_checkpointer_manager
manager = get_checkpointer_manager()
await manager.return_to_pool(config.bot_id, config.session_id, checkpointer)
return result return result

View File

@ -42,35 +42,27 @@ MCP_HTTP_TIMEOUT = int(os.getenv("MCP_HTTP_TIMEOUT", 60)) # HTTP 请求超时
MCP_SSE_READ_TIMEOUT = int(os.getenv("MCP_SSE_READ_TIMEOUT", 300)) # SSE 读取超时(秒) MCP_SSE_READ_TIMEOUT = int(os.getenv("MCP_SSE_READ_TIMEOUT", 300)) # SSE 读取超时(秒)
# ============================================================ # ============================================================
# SQLite Checkpoint Configuration # PostgreSQL Checkpoint Configuration
# ============================================================ # ============================================================
# Checkpoint 数据库路径 # PostgreSQL 连接字符串
CHECKPOINT_DB_PATH = os.getenv("CHECKPOINT_DB_PATH", "./projects/memory/") # 格式: postgresql://user:password@host:port/database
CHECKPOINT_DB_URL = os.getenv("CHECKPOINT_DB_URL", "postgresql://postgres:AeEGDB0b7Z5GK0E2tblt@dev-circleo-pg.celp3nik7oaq.ap-northeast-1.rds.amazonaws.com:5432/gptbase")
# 启用 WAL 模式 (Write-Ahead Logging) #CHECKPOINT_DB_URL = os.getenv("CHECKPOINT_DB_URL", "postgresql://moshui:@localhost:5432/moshui")
# WAL 模式允许读写并发,大幅提升并发性能
CHECKPOINT_WAL_MODE = os.getenv("CHECKPOINT_WAL_MODE", "true") == "true"
# Busy Timeout (毫秒)
# 当数据库被锁定时,等待的最长时间(毫秒)
CHECKPOINT_BUSY_TIMEOUT = int(os.getenv("CHECKPOINT_BUSY_TIMEOUT", "10000"))
# 连接池大小 # 连接池大小
# 同时可以持有的最大连接数 # 同时可以持有的最大连接数
CHECKPOINT_POOL_SIZE = int(os.getenv("CHECKPOINT_POOL_SIZE", "15")) CHECKPOINT_POOL_SIZE = int(os.getenv("CHECKPOINT_POOL_SIZE", "20"))
# Checkpoint 自动清理配置 # Checkpoint 自动清理配置
# 是否启用自动清理旧 session # 是否启用自动清理旧 session
CHECKPOINT_CLEANUP_ENABLED = os.getenv("CHECKPOINT_CLEANUP_ENABLED", "true") == "true" CHECKPOINT_CLEANUP_ENABLED = os.getenv("CHECKPOINT_CLEANUP_ENABLED", "true") == "true"
# 清理多少天前未活动的 thread天数
CHECKPOINT_CLEANUP_INACTIVE_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_INACTIVE_DAYS", "3"))
# 清理间隔(小时) # 清理间隔(小时)
# 每隔多少小时执行一次清理任务 # 每隔多少小时执行一次清理任务
CHECKPOINT_CLEANUP_INTERVAL_HOURS = int(os.getenv("CHECKPOINT_CLEANUP_INTERVAL_HOURS", "24")) CHECKPOINT_CLEANUP_INTERVAL_HOURS = int(os.getenv("CHECKPOINT_CLEANUP_INTERVAL_HOURS", "24"))
# 清理多少天前的数据
# 超过 N 天未活动的 thread 会被删除
CHECKPOINT_CLEANUP_OLDER_THAN_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_OLDER_THAN_DAYS", "3"))