diff --git a/agent/checkpoint_manager.py b/agent/checkpoint_manager.py index 4962d45..7627267 100644 --- a/agent/checkpoint_manager.py +++ b/agent/checkpoint_manager.py @@ -1,196 +1,92 @@ """ -全局 SQLite Checkpointer 管理器 -解决高并发场景下的数据库锁定问题 - -每个 session 使用独立的数据库文件,避免并发锁竞争 +全局 PostgreSQL Checkpointer 管理器 +使用 psycopg_pool 连接池,AsyncPostgresSaver 原生支持 """ import asyncio import logging -import os -import time -from typing import Optional, Dict, Any, Tuple +from typing import Optional -import aiosqlite -from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver +from psycopg_pool import AsyncConnectionPool +from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from utils.settings import ( - CHECKPOINT_DB_PATH, - CHECKPOINT_WAL_MODE, - CHECKPOINT_BUSY_TIMEOUT, + CHECKPOINT_DB_URL, + CHECKPOINT_POOL_SIZE, CHECKPOINT_CLEANUP_ENABLED, CHECKPOINT_CLEANUP_INTERVAL_HOURS, - CHECKPOINT_CLEANUP_OLDER_THAN_DAYS, + CHECKPOINT_CLEANUP_INACTIVE_DAYS, ) logger = logging.getLogger('app') -# 每个 session 的连接池大小(单个 session 串行处理,1 个连接即可) -POOL_SIZE_PER_SESSION = 1 - class CheckpointerManager: """ - 全局 Checkpointer 管理器,按 session_id 分离数据库文件 + 全局 Checkpointer 管理器,使用 PostgreSQL 连接池 主要功能: - 1. 每个 session_id 独立的数据库文件和连接池 - 2. 按需创建连接池,不用的 session 不占用资源 - 3. 预配置 WAL 模式和 busy_timeout - 4. 基于文件修改时间的简单清理机制 + 1. 使用 psycopg_pool.AsyncConnectionPool 管理连接 + 2. AsyncPostgresSaver 原生支持连接池,自动获取/释放连接 + 3. 无需手动归还,避免长请求占用连接 + 4. 基于 SQL 查询的清理机制 5. 优雅关闭机制 """ def __init__(self): - # 每个 (bot_id, session_id) 一个连接池 - self._pools: Dict[Tuple[str, str], asyncio.Queue[AsyncSqliteSaver]] = {} - # 每个 session 的初始化锁 - self._locks: Dict[Tuple[str, str], asyncio.Lock] = {} - # 全局锁,用于保护 pools 和 locks 字典的访问 - self._global_lock = asyncio.Lock() + self._pool: Optional[AsyncConnectionPool] = None + self._initialized = False self._closed = False # 清理调度任务 self._cleanup_task: Optional[asyncio.Task] = None self._cleanup_stop_event = asyncio.Event() - def _get_db_path(self, bot_id: str, session_id: str) -> str: - """获取指定 session 的数据库文件路径""" - return os.path.join(CHECKPOINT_DB_PATH, bot_id, session_id, "checkpoints.db") - - def _get_pool_key(self, bot_id: str, session_id: str) -> Tuple[str, str]: - """获取连接池的键""" - return (bot_id, session_id) - - async def _initialize_session_pool(self, bot_id: str, session_id: str) -> None: - """初始化指定 session 的连接池""" - pool_key = self._get_pool_key(bot_id, session_id) - if pool_key in self._pools: + async def initialize(self) -> None: + """初始化连接池""" + if self._initialized: return - logger.info(f"Initializing checkpointer pool for bot_id={bot_id}, session_id={session_id}") + logger.info( + f"Initializing PostgreSQL checkpointer pool: " + f"URL={CHECKPOINT_DB_URL}, size={CHECKPOINT_POOL_SIZE}" + ) - db_path = self._get_db_path(bot_id, session_id) - os.makedirs(os.path.dirname(db_path), exist_ok=True) + try: + # 创建 psycopg 连接池 + self._pool = AsyncConnectionPool( + CHECKPOINT_DB_URL, + min_size=1, + max_size=CHECKPOINT_POOL_SIZE, + open=False, + ) - pool = asyncio.Queue() - for i in range(POOL_SIZE_PER_SESSION): - try: - conn = await self._create_configured_connection(db_path) - checkpointer = AsyncSqliteSaver(conn=conn) - # 预先调用 setup 确保表结构已创建 + # 打开连接池 + await self._pool.open() + + # 创建表结构(需要 autocommit 模式来执行 CREATE INDEX CONCURRENTLY) + async with self._pool.connection() as conn: + await conn.set_autocommit(True) + checkpointer = AsyncPostgresSaver(conn=conn) await checkpointer.setup() - await pool.put(checkpointer) - logger.debug(f"Created checkpointer connection {i+1}/{POOL_SIZE_PER_SESSION} for session={session_id}") - except Exception as e: - logger.error(f"Failed to create checkpointer connection {i+1} for session={session_id}: {e}") - raise - self._pools[pool_key] = pool - self._locks[pool_key] = asyncio.Lock() - logger.info(f"Checkpointer pool initialized for bot_id={bot_id}, session_id={session_id}") + self._initialized = True + logger.info("PostgreSQL checkpointer pool initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize PostgreSQL checkpointer pool: {e}") + raise - async def _create_configured_connection(self, db_path: str) -> aiosqlite.Connection: - """ - 创建已配置的 SQLite 连接 - - 配置包括: - 1. WAL 模式 (Write-Ahead Logging) - 允许读写并发 - 2. busy_timeout - 等待锁定的最长时间 - 3. 其他优化参数 - """ - conn = aiosqlite.connect(db_path) - - # 等待连接建立 - await conn.__aenter__() - - # 设置 busy timeout(必须在连接建立后设置) - await conn.execute(f"PRAGMA busy_timeout = {CHECKPOINT_BUSY_TIMEOUT}") - - # 如果启用 WAL 模式 - if CHECKPOINT_WAL_MODE: - await conn.execute("PRAGMA journal_mode = WAL") - await conn.execute("PRAGMA synchronous = NORMAL") - # WAL 模式下的优化配置 - await conn.execute("PRAGMA wal_autocheckpoint = 10000") # 增加到 10000 - await conn.execute("PRAGMA cache_size = -64000") # 64MB 缓存 - await conn.execute("PRAGMA temp_store = MEMORY") - await conn.execute("PRAGMA journal_size_limit = 1048576") # 1MB - - await conn.commit() - - return conn - - async def initialize(self) -> None: - """初始化管理器(不再预创建连接池,改为按需创建)""" - logger.info("CheckpointerManager initialized (pools will be created on-demand)") - - async def acquire_for_agent(self, bot_id: str, session_id: str) -> AsyncSqliteSaver: - """ - 获取指定 session 的 checkpointer - - 注意:此方法获取的 checkpointer 需要手动归还 - 使用 return_to_pool() 方法归还 - - Args: - bot_id: 机器人 ID - session_id: 会话 ID - - Returns: - AsyncSqliteSaver 实例 - """ + @property + def checkpointer(self) -> AsyncPostgresSaver: + """获取全局 AsyncPostgresSaver 实例""" if self._closed: raise RuntimeError("CheckpointerManager is closed") - pool_key = self._get_pool_key(bot_id, session_id) - async with self._global_lock: - if pool_key not in self._pools: - await self._initialize_session_pool(bot_id, session_id) + if not self._initialized: + raise RuntimeError("CheckpointerManager not initialized, call initialize() first") - # 获取该 session 的锁,确保连接池操作线程安全 - async with self._locks[pool_key]: - checkpointer = await self._pools[pool_key].get() - logger.debug(f"Acquired checkpointer for bot_id={bot_id}, session_id={session_id}, remaining: {self._pools[pool_key].qsize()}") - return checkpointer - - async def return_to_pool(self, bot_id: str, session_id: str, checkpointer: AsyncSqliteSaver) -> None: - """ - 归还 checkpointer 到对应 session 的池 - - Args: - bot_id: 机器人 ID - session_id: 会话 ID - checkpointer: 要归还的 checkpointer 实例 - """ - pool_key = self._get_pool_key(bot_id, session_id) - if pool_key in self._pools: - async with self._locks[pool_key]: - await self._pools[pool_key].put(checkpointer) - logger.debug(f"Returned checkpointer for bot_id={bot_id}, session_id={session_id}, remaining: {self._pools[pool_key].qsize()}") - - async def _close_session_pool(self, bot_id: str, session_id: str) -> None: - """关闭指定 session 的连接池""" - pool_key = self._get_pool_key(bot_id, session_id) - if pool_key not in self._pools: - return - - logger.info(f"Closing checkpointer pool for bot_id={bot_id}, session_id={session_id}") - - pool = self._pools[pool_key] - while not pool.empty(): - try: - checkpointer = pool.get_nowait() - if checkpointer.conn: - await checkpointer.conn.close() - except asyncio.QueueEmpty: - break - - del self._pools[pool_key] - if pool_key in self._locks: - del self._locks[pool_key] - - logger.info(f"Checkpointer pool closed for bot_id={bot_id}, session_id={session_id}") + return AsyncPostgresSaver(conn=self._pool) async def close(self) -> None: - """关闭所有连接池""" + """关闭连接池""" if self._closed: return @@ -199,122 +95,103 @@ class CheckpointerManager: self._cleanup_stop_event.set() try: self._cleanup_task.cancel() - await asyncio.sleep(0.1) # 给任务一点时间清理 + await asyncio.sleep(0.1) except asyncio.CancelledError: pass self._cleanup_task = None - async with self._global_lock: - if self._closed: - return + if self._closed: + return - logger.info("Closing CheckpointerManager...") + logger.info("Closing CheckpointerManager...") - # 关闭所有 session 的连接池 - pool_keys = list(self._pools.keys()) - for bot_id, session_id in pool_keys: - await self._close_session_pool(bot_id, session_id) + if self._pool is not None: + await self._pool.close() + self._pool = None - self._closed = True - logger.info("CheckpointerManager closed") - - def get_pool_stats(self) -> dict: - """获取连接池状态统计""" - return { - "session_count": len(self._pools), - "pools": { - f"{bot_id}/{session_id}": { - "available": pool.qsize(), - "pool_size": POOL_SIZE_PER_SESSION - } - for (bot_id, session_id), pool in self._pools.items() - }, - "closed": self._closed - } + self._closed = True + self._initialized = False + logger.info("CheckpointerManager closed") # ============================================================ - # Checkpoint 清理方法(基于文件修改时间) + # Checkpoint 清理方法(基于 PostgreSQL SQL 查询) # ============================================================ - async def cleanup_old_dbs(self, older_than_days: int = None) -> Dict[str, Any]: + async def cleanup_old_dbs(self, inactive_days: int = None) -> dict: """ - 根据数据库文件的修改时间清理旧数据库文件 + 清理旧的 checkpoint 记录 + + 删除 N 天未活动的 thread 的所有 checkpoint。 Args: - older_than_days: 清理多少天前的数据,默认使用配置值 + inactive_days: 删除 N 天未活动的 thread,默认使用配置值 Returns: Dict: 清理统计信息 - - deleted: 删除的 session 目录数量 - - scanned: 扫描的 session 目录数量 - - cutoff_time: 截止时间戳 """ - if older_than_days is None: - older_than_days = CHECKPOINT_CLEANUP_OLDER_THAN_DAYS + if inactive_days is None: + inactive_days = CHECKPOINT_CLEANUP_INACTIVE_DAYS - cutoff_time = time.time() - older_than_days * 86400 - logger.info(f"Starting checkpoint cleanup: removing db files not modified since {cutoff_time}") + logger.info(f"Starting checkpoint cleanup: removing threads inactive for {inactive_days} days") - db_dir = CHECKPOINT_DB_PATH - deleted_count = 0 - scanned_count = 0 + if self._pool is None: + logger.warning("Connection pool not initialized, skipping cleanup") + return {"deleted": 0, "inactive_days": inactive_days} - if not os.path.exists(db_dir): - logger.info(f"Checkpoint directory does not exist: {db_dir}") - return {"deleted": 0, "scanned": 0, "cutoff_time": cutoff_time} + try: + # 从池中获取连接执行清理 + async with self._pool.connection() as conn: + async with conn.cursor() as cursor: + # 查找不活跃的 thread + query_find_inactive = """ + SELECT DISTINCT thread_id + FROM checkpoints + WHERE (checkpoint->>'ts')::timestamp < NOW() - INTERVAL '%s days' + """ - # 遍历 bot_id 目录 - for bot_id in os.listdir(db_dir): - bot_path = os.path.join(db_dir, bot_id) - # 跳过非目录文件 - if not os.path.isdir(bot_path): - continue + await cursor.execute(query_find_inactive, (inactive_days,)) + inactive_threads = await cursor.fetchall() + inactive_thread_ids = [row[0] for row in inactive_threads] - # 遍历 session_id 目录 - for session_id in os.listdir(bot_path): - session_path = os.path.join(bot_path, session_id) - if not os.path.isdir(session_path): - continue + if not inactive_thread_ids: + logger.info("No inactive threads found") + return {"deleted": 0, "threads_deleted": 0, "inactive_days": inactive_days} - db_file = os.path.join(session_path, "checkpoints.db") - if not os.path.exists(db_file): - continue + # 删除不活跃 thread 的 checkpoints + placeholders = ','.join(['%s'] * len(inactive_thread_ids)) + query_delete_checkpoints = f""" + DELETE FROM checkpoints + WHERE thread_id IN ({placeholders}) + """ - scanned_count += 1 - mtime = os.path.getmtime(db_file) + await cursor.execute(query_delete_checkpoints, inactive_thread_ids) + deleted_checkpoints = cursor.rowcount - if mtime < cutoff_time: - # 关闭该 session 的连接池(如果有) - await self._close_session_pool(bot_id, session_id) + # 同时清理 checkpoint_writes + query_delete_writes = f""" + DELETE FROM checkpoint_writes + WHERE thread_id IN ({placeholders}) + """ + await cursor.execute(query_delete_writes, inactive_thread_ids) + deleted_writes = cursor.rowcount - # 删除整个 session 目录 - try: - import shutil - shutil.rmtree(session_path) - deleted_count += 1 - logger.info(f"Deleted old checkpoint session: {bot_id}/{session_id}/ (last modified: {mtime})") - except Exception as e: - logger.warning(f"Failed to delete {session_path}: {e}") + logger.info( + f"Checkpoint cleanup completed: " + f"deleted {deleted_checkpoints} checkpoints, {deleted_writes} writes " + f"from {len(inactive_thread_ids)} inactive threads" + ) - # 清理空的 bot_id 目录 - for bot_id in os.listdir(db_dir): - bot_path = os.path.join(db_dir, bot_id) - if os.path.isdir(bot_path) and not os.listdir(bot_path): - try: - os.rmdir(bot_path) - logger.debug(f"Removed empty bot directory: {bot_id}/") - except Exception: - pass + return { + "deleted": deleted_checkpoints + deleted_writes, + "threads_deleted": len(inactive_thread_ids), + "inactive_days": inactive_days + } - result = { - "deleted": deleted_count, - "scanned": scanned_count, - "cutoff_time": cutoff_time, - "older_than_days": older_than_days - } - - logger.info(f"Checkpoint cleanup completed: {result}") - return result + except Exception as e: + logger.error(f"Error during checkpoint cleanup: {e}") + import traceback + logger.error(traceback.format_exc()) + return {"deleted": 0, "error": str(e), "inactive_days": inactive_days} async def _cleanup_loop(self): """后台清理循环""" @@ -323,7 +200,7 @@ class CheckpointerManager: logger.info( f"Checkpoint cleanup scheduler started: " f"interval={CHECKPOINT_CLEANUP_INTERVAL_HOURS}h, " - f"older_than={CHECKPOINT_CLEANUP_OLDER_THAN_DAYS} days" + f"inactive_days={CHECKPOINT_CLEANUP_INACTIVE_DAYS}" ) while not self._cleanup_stop_event.is_set(): @@ -364,7 +241,6 @@ class CheckpointerManager: logger.warning("Cleanup scheduler is already running") return - # 创建新的事件循环(如果需要在后台运行) try: loop = asyncio.get_running_loop() except RuntimeError: diff --git a/agent/deep_assistant.py b/agent/deep_assistant.py index b0943d6..f7c7498 100644 --- a/agent/deep_assistant.py +++ b/agent/deep_assistant.py @@ -18,8 +18,7 @@ from agent.agent_config import AgentConfig from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async from agent.agent_memory_cache import get_memory_cache_manager from .checkpoint_utils import prepare_checkpoint_message -import aiosqlite -from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver +from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver import os # 全局 MemorySaver 实例 @@ -180,7 +179,7 @@ async def init_agent(config: AgentConfig): if config.session_id: from .checkpoint_manager import get_checkpointer_manager manager = get_checkpointer_manager() - checkpointer = await manager.acquire_for_agent(config.bot_id, config.session_id) + checkpointer = manager.checkpointer await prepare_checkpoint_message(config, checkpointer) summarization_middleware = SummarizationMiddleware( model=llm_instance, diff --git a/poetry.lock b/poetry.lock index a88a060..6692974 100644 --- a/poetry.lock +++ b/poetry.lock @@ -197,25 +197,6 @@ files = [ frozenlist = ">=1.1.0" typing-extensions = {version = ">=4.2", markers = "python_version < \"3.13\""} -[[package]] -name = "aiosqlite" -version = "0.21.0" -description = "asyncio bridge to the standard sqlite3 module" -optional = false -python-versions = ">=3.9" -groups = ["main"] -files = [ - {file = "aiosqlite-0.21.0-py3-none-any.whl", hash = "sha256:2549cf4057f95f53dcba16f2b64e8e2791d7e1adedb13197dd8ed77bb226d7d0"}, - {file = "aiosqlite-0.21.0.tar.gz", hash = "sha256:131bb8056daa3bc875608c631c678cda73922a2d4ba8aec373b19f18c17e7aa3"}, -] - -[package.dependencies] -typing_extensions = ">=4.0" - -[package.extras] -dev = ["attribution (==1.7.1)", "black (==24.3.0)", "build (>=1.2)", "coverage[toml] (==7.6.10)", "flake8 (==7.0.0)", "flake8-bugbear (==24.12.12)", "flit (==3.10.1)", "mypy (==1.14.1)", "ufmt (==2.5.1)", "usort (==1.0.8.post1)"] -docs = ["sphinx (==8.1.3)", "sphinx-mdinclude (==0.6.1)"] - [[package]] name = "annotated-types" version = "0.7.0" @@ -1568,36 +1549,37 @@ xxhash = ">=3.5.0" [[package]] name = "langgraph-checkpoint" -version = "3.0.1" +version = "2.1.2" description = "Library with base interfaces for LangGraph checkpoint savers." optional = false -python-versions = ">=3.10" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "langgraph_checkpoint-3.0.1-py3-none-any.whl", hash = "sha256:9b04a8d0edc0474ce4eaf30c5d731cee38f11ddff50a6177eead95b5c4e4220b"}, - {file = "langgraph_checkpoint-3.0.1.tar.gz", hash = "sha256:59222f875f85186a22c494aedc65c4e985a3df27e696e5016ba0b98a5ed2cee0"}, + {file = "langgraph_checkpoint-2.1.2-py3-none-any.whl", hash = "sha256:911ebffb069fd01775d4b5184c04aaafc2962fcdf50cf49d524cd4367c4d0c60"}, + {file = "langgraph_checkpoint-2.1.2.tar.gz", hash = "sha256:112e9d067a6eff8937caf198421b1ffba8d9207193f14ac6f89930c1260c06f9"}, ] [package.dependencies] langchain-core = ">=0.2.38" -ormsgpack = ">=1.12.0" +ormsgpack = ">=1.10.0" [[package]] -name = "langgraph-checkpoint-sqlite" -version = "3.0.1" -description = "Library with a SQLite implementation of LangGraph checkpoint saver." +name = "langgraph-checkpoint-postgres" +version = "2.0.25" +description = "Library with a Postgres implementation of LangGraph checkpoint saver." optional = false -python-versions = ">=3.10" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "langgraph_checkpoint_sqlite-3.0.1-py3-none-any.whl", hash = "sha256:616124676e5827294966997ed853f5d41490cc61f73b3c79359f4ff307728508"}, - {file = "langgraph_checkpoint_sqlite-3.0.1.tar.gz", hash = "sha256:c6580138e6abfd2ade7ea49186c664d47ef28dc44538674fa47e50a8a5f8af83"}, + {file = "langgraph_checkpoint_postgres-2.0.25-py3-none-any.whl", hash = "sha256:cf1248a58fe828c9cfc36ee57ff118d7799ce214d4b35718e57ec98407130fb5"}, + {file = "langgraph_checkpoint_postgres-2.0.25.tar.gz", hash = "sha256:916b80f73a641a589301f6c54414974768b6d646d82db7b301ff8d47105c3613"}, ] [package.dependencies] -aiosqlite = ">=0.20" -langgraph-checkpoint = ">=3,<4.0.0" -sqlite-vec = ">=0.1.6" +langgraph-checkpoint = ">=2.1.2,<3.0.0" +orjson = ">=3.10.1" +psycopg = ">=3.2.0" +psycopg-pool = ">=3.2.0" [[package]] name = "langgraph-prebuilt" @@ -3015,6 +2997,48 @@ files = [ dev = ["abi3audit", "black", "check-manifest", "colorama ; os_name == \"nt\"", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pyreadline ; os_name == \"nt\"", "pytest", "pytest-cov", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "requests", "rstcheck", "ruff", "setuptools", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "validate-pyproject[all]", "virtualenv", "vulture", "wheel", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""] test = ["pytest", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "setuptools", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""] +[[package]] +name = "psycopg" +version = "3.3.2" +description = "PostgreSQL database adapter for Python" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "psycopg-3.3.2-py3-none-any.whl", hash = "sha256:3e94bc5f4690247d734599af56e51bae8e0db8e4311ea413f801fef82b14a99b"}, + {file = "psycopg-3.3.2.tar.gz", hash = "sha256:707a67975ee214d200511177a6a80e56e654754c9afca06a7194ea6bbfde9ca7"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.6", markers = "python_version < \"3.13\""} +tzdata = {version = "*", markers = "sys_platform == \"win32\""} + +[package.extras] +binary = ["psycopg-binary (==3.3.2) ; implementation_name != \"pypy\""] +c = ["psycopg-c (==3.3.2) ; implementation_name != \"pypy\""] +dev = ["ast-comments (>=1.1.2)", "black (>=24.1.0)", "codespell (>=2.2)", "cython-lint (>=0.16)", "dnspython (>=2.1)", "flake8 (>=4.0)", "isort-psycopg", "isort[colors] (>=6.0)", "mypy (>=1.19.0)", "pre-commit (>=4.0.1)", "types-setuptools (>=57.4)", "types-shapely (>=2.0)", "wheel (>=0.37)"] +docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"] +pool = ["psycopg-pool"] +test = ["anyio (>=4.0)", "mypy (>=1.19.0) ; implementation_name != \"pypy\"", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + +[[package]] +name = "psycopg-pool" +version = "3.3.0" +description = "Connection Pool for Psycopg" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "psycopg_pool-3.3.0-py3-none-any.whl", hash = "sha256:2e44329155c410b5e8666372db44276a8b1ebd8c90f1c3026ebba40d4bc81063"}, + {file = "psycopg_pool-3.3.0.tar.gz", hash = "sha256:fa115eb2860bd88fce1717d75611f41490dec6135efb619611142b24da3f6db5"}, +] + +[package.dependencies] +typing-extensions = ">=4.6" + +[package.extras] +test = ["anyio (>=4.0)", "mypy (>=1.14)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + [[package]] name = "pydantic" version = "2.10.5" @@ -3994,21 +4018,6 @@ files = [ {file = "soupsieve-2.8.1.tar.gz", hash = "sha256:4cf733bc50fa805f5df4b8ef4740fc0e0fa6218cf3006269afd3f9d6d80fd350"}, ] -[[package]] -name = "sqlite-vec" -version = "0.1.6" -description = "" -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "sqlite_vec-0.1.6-py3-none-macosx_10_6_x86_64.whl", hash = "sha256:77491bcaa6d496f2acb5cc0d0ff0b8964434f141523c121e313f9a7d8088dee3"}, - {file = "sqlite_vec-0.1.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:fdca35f7ee3243668a055255d4dee4dea7eed5a06da8cad409f89facf4595361"}, - {file = "sqlite_vec-0.1.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7b0519d9cd96164cd2e08e8eed225197f9cd2f0be82cb04567692a0a4be02da3"}, - {file = "sqlite_vec-0.1.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux1_x86_64.whl", hash = "sha256:823b0493add80d7fe82ab0fe25df7c0703f4752941aee1c7b2b02cec9656cb24"}, - {file = "sqlite_vec-0.1.6-py3-none-win_amd64.whl", hash = "sha256:c65bcfd90fa2f41f9000052bcb8bb75d38240b2dae49225389eca6c3136d3f0c"}, -] - [[package]] name = "sse-starlette" version = "3.0.2" @@ -5419,4 +5428,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "364fb7c7d9d639dda8917ff9fee399135b6406a16917b10fb537029277eec5b1" +content-hash = "02ca6f912b2b258fd07ba45bece31692b3f8a937d4c77d4f50714fe49eeab77e" diff --git a/pyproject.toml b/pyproject.toml index 0ceefaf..f0f5fac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ dependencies = [ "numpy<2", "aiohttp", "aiofiles", - "aiosqlite==0.21.0", "huey (>=2.5.3,<3.0.0)", "pandas>=1.5.0", "openpyxl>=3.0.0", @@ -31,7 +30,7 @@ dependencies = [ "langchain-mcp-adapters (>=0.2.1,<0.3.0)", "langchain-openai (>=1.1.1,<2.0.0)", "cachetools (>=6.2.4,<7.0.0)", - "langgraph-checkpoint-sqlite (>=3.0.1,<4.0.0)", + "langgraph-checkpoint-postgres (>=2.0.0,<3.0.0)", "deepagents-cli (>=0.0.11,<0.0.12)", ] diff --git a/routes/chat.py b/routes/chat.py index 5e82605..746f5df 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -116,11 +116,6 @@ async def enhanced_generate_stream_response( except Exception as e: logger.error(f"Error in agent task: {e}") await output_queue.put(("agent_done", None)) - finally: - if checkpointer: - from agent.checkpoint_manager import get_checkpointer_manager - manager = get_checkpointer_manager() - await manager.return_to_pool(config.bot_id, config.session_id, checkpointer) # 并发执行任务 # 只有在 enable_thinking 为 True 时才执行 preamble 任务 @@ -210,46 +205,40 @@ async def create_agent_and_generate_response( ) agent, checkpointer = await init_agent(config) - try: - # 使用更新后的 messages - agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) - append_messages = agent_responses["messages"][len(config.messages):] - response_text = "" - for msg in append_messages: - if isinstance(msg,AIMessage): - if len(msg.text)>0: - meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER") - output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text - response_text += f"[{meta_message_tag}]\n"+output_text+ "\n" - if len(msg.tool_calls)>0: - response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls]) - elif isinstance(msg,ToolMessage) and config.tool_response: - response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n" + # 使用更新后的 messages + agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) + append_messages = agent_responses["messages"][len(config.messages):] + response_text = "" + for msg in append_messages: + if isinstance(msg,AIMessage): + if len(msg.text)>0: + meta_message_tag = msg.additional_kwargs.get("message_tag", "ANSWER") + output_text = msg.text.replace("````","").replace("````","") if meta_message_tag == "THINK" else msg.text + response_text += f"[{meta_message_tag}]\n"+output_text+ "\n" + if len(msg.tool_calls)>0: + response_text += "".join([f"[TOOL_CALL] {tool['name']}\n{json.dumps(tool["args"]) if isinstance(tool["args"],dict) else tool["args"]}\n" for tool in msg.tool_calls]) + elif isinstance(msg,ToolMessage) and config.tool_response: + response_text += f"[TOOL_RESPONSE] {msg.name}\n{msg.text}\n" - if len(response_text) > 0: - # 构造OpenAI格式的响应 - result = ChatResponse( - choices=[{ - "index": 0, - "message": { - "role": "assistant", - "content": response_text - }, - "finish_reason": "stop" - }], - usage={ - "prompt_tokens": sum(len(msg.get("content", "")) for msg in config.messages), - "completion_tokens": len(response_text), - "total_tokens": sum(len(msg.get("content", "")) for msg in config.messages) + len(response_text) - } - ) - else: - raise HTTPException(status_code=500, detail="No response from agent") - finally: - if checkpointer: - from agent.checkpoint_manager import get_checkpointer_manager - manager = get_checkpointer_manager() - await manager.return_to_pool(config.bot_id, config.session_id, checkpointer) + if len(response_text) > 0: + # 构造OpenAI格式的响应 + result = ChatResponse( + choices=[{ + "index": 0, + "message": { + "role": "assistant", + "content": response_text + }, + "finish_reason": "stop" + }], + usage={ + "prompt_tokens": sum(len(msg.get("content", "")) for msg in config.messages), + "completion_tokens": len(response_text), + "total_tokens": sum(len(msg.get("content", "")) for msg in config.messages) + len(response_text) + } + ) + else: + raise HTTPException(status_code=500, detail="No response from agent") return result diff --git a/utils/settings.py b/utils/settings.py index 75e708c..d5f8be3 100644 --- a/utils/settings.py +++ b/utils/settings.py @@ -42,35 +42,27 @@ MCP_HTTP_TIMEOUT = int(os.getenv("MCP_HTTP_TIMEOUT", 60)) # HTTP 请求超时 MCP_SSE_READ_TIMEOUT = int(os.getenv("MCP_SSE_READ_TIMEOUT", 300)) # SSE 读取超时(秒) # ============================================================ -# SQLite Checkpoint Configuration +# PostgreSQL Checkpoint Configuration # ============================================================ -# Checkpoint 数据库路径 -CHECKPOINT_DB_PATH = os.getenv("CHECKPOINT_DB_PATH", "./projects/memory/") - -# 启用 WAL 模式 (Write-Ahead Logging) -# WAL 模式允许读写并发,大幅提升并发性能 -CHECKPOINT_WAL_MODE = os.getenv("CHECKPOINT_WAL_MODE", "true") == "true" - -# Busy Timeout (毫秒) -# 当数据库被锁定时,等待的最长时间(毫秒) -CHECKPOINT_BUSY_TIMEOUT = int(os.getenv("CHECKPOINT_BUSY_TIMEOUT", "10000")) - +# PostgreSQL 连接字符串 +# 格式: postgresql://user:password@host:port/database +CHECKPOINT_DB_URL = os.getenv("CHECKPOINT_DB_URL", "postgresql://postgres:AeEGDB0b7Z5GK0E2tblt@dev-circleo-pg.celp3nik7oaq.ap-northeast-1.rds.amazonaws.com:5432/gptbase") +#CHECKPOINT_DB_URL = os.getenv("CHECKPOINT_DB_URL", "postgresql://moshui:@localhost:5432/moshui") # 连接池大小 # 同时可以持有的最大连接数 -CHECKPOINT_POOL_SIZE = int(os.getenv("CHECKPOINT_POOL_SIZE", "15")) +CHECKPOINT_POOL_SIZE = int(os.getenv("CHECKPOINT_POOL_SIZE", "20")) # Checkpoint 自动清理配置 # 是否启用自动清理旧 session CHECKPOINT_CLEANUP_ENABLED = os.getenv("CHECKPOINT_CLEANUP_ENABLED", "true") == "true" +# 清理多少天前未活动的 thread(天数) +CHECKPOINT_CLEANUP_INACTIVE_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_INACTIVE_DAYS", "3")) + # 清理间隔(小时) # 每隔多少小时执行一次清理任务 CHECKPOINT_CLEANUP_INTERVAL_HOURS = int(os.getenv("CHECKPOINT_CLEANUP_INTERVAL_HOURS", "24")) -# 清理多少天前的数据 -# 超过 N 天未活动的 thread 会被删除 -CHECKPOINT_CLEANUP_OLDER_THAN_DAYS = int(os.getenv("CHECKPOINT_CLEANUP_OLDER_THAN_DAYS", "3")) -