""" Global PostgreSQL checkpointer manager. Uses the shared database connection pool. """ import logging from typing import Optional from psycopg_pool import AsyncConnectionPool from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver logger = logging.getLogger('app') class CheckpointerManager: """ Global checkpointer manager using the shared PostgreSQL connection pool. Main features: 1. Uses the shared connection pool from DBPoolManager 2. AsyncPostgresSaver natively supports connection pools and acquires/releases connections automatically 3. No manual return is required, avoiding long-running requests occupying connections """ def __init__(self): self._pool: Optional[AsyncConnectionPool] = None self._initialized = False self._closed = False async def initialize(self, pool: AsyncConnectionPool) -> None: """Initialize the checkpointer using an externally provided connection pool Args: pool: AsyncConnectionPool instance (from DBPoolManager) """ if self._initialized: return self._pool = pool logger.info("Initializing PostgreSQL checkpointer (using shared connection pool)...") try: # Create the table schema (autocommit is required for CREATE INDEX CONCURRENTLY) async with self._pool.connection() as conn: await conn.set_autocommit(True) checkpointer = AsyncPostgresSaver(conn=conn) await checkpointer.setup() self._initialized = True logger.info("PostgreSQL checkpointer initialized successfully") except Exception as e: logger.error(f"Failed to initialize PostgreSQL checkpointer: {e}") raise @property def checkpointer(self) -> AsyncPostgresSaver: """Get the global AsyncPostgresSaver instance.""" if self._closed: raise RuntimeError("CheckpointerManager is closed") if not self._initialized: raise RuntimeError("CheckpointerManager not initialized, call initialize() first") return AsyncPostgresSaver(conn=self._pool) @property def pool(self) -> AsyncConnectionPool: """Get the connection pool for sharing with other managers.""" if self._closed: raise RuntimeError("CheckpointerManager is closed") if not self._initialized: raise RuntimeError("CheckpointerManager not initialized, call initialize() first") return self._pool async def close(self) -> None: """Close the checkpointer manager without closing the pool managed by DBPoolManager.""" if self._closed: return logger.info("Closing CheckpointerManager...") self._closed = True self._initialized = False logger.info("CheckpointerManager closed (pool managed by DBPoolManager)") # Global singleton _global_manager: Optional[CheckpointerManager] = None def get_checkpointer_manager() -> CheckpointerManager: """Get the global CheckpointerManager singleton.""" global _global_manager if _global_manager is None: _global_manager = CheckpointerManager() return _global_manager async def init_global_checkpointer(pool: AsyncConnectionPool) -> None: """Initialize the global checkpointer manager Args: pool: AsyncConnectionPool instance (from DBPoolManager) """ manager = get_checkpointer_manager() await manager.initialize(pool) async def close_global_checkpointer() -> None: """Close the global checkpointer manager.""" global _global_manager if _global_manager is not None: await _global_manager.close()