"""Postgres 连接 helper。 配置全部走环境变量,默认指向 docker-compose 起的本地实例: PMDA_PG_HOST (默认 localhost) PMDA_PG_PORT (默认 5432) PMDA_PG_DB (默认 pmda) PMDA_PG_USER (默认 pmda) PMDA_PG_PASSWORD (默认 pmda_local_dev — 仅本地开发,生产由 secret 注入) `connect()` 返回 psycopg3 connection(autocommit=False)。 长时跑批时使用 `pool()` 取 ConnectionPool。 """ from __future__ import annotations import os from contextlib import contextmanager from typing import Iterator import psycopg from psycopg import Connection from psycopg_pool import ConnectionPool PG_HOST = os.environ.get("PMDA_PG_HOST", "localhost") PG_PORT = int(os.environ.get("PMDA_PG_PORT", "5432")) PG_DB = os.environ.get("PMDA_PG_DB", "pmda") PG_USER = os.environ.get("PMDA_PG_USER", "pmda") PG_PASSWORD = os.environ.get("PMDA_PG_PASSWORD", "pmda_local_dev") def conninfo() -> str: return ( f"host={PG_HOST} port={PG_PORT} dbname={PG_DB} " f"user={PG_USER} password={PG_PASSWORD}" ) def connect(*, autocommit: bool = False) -> Connection: """Open a single connection. Caller is responsible for closing.""" return psycopg.connect(conninfo(), autocommit=autocommit) @contextmanager def session(*, autocommit: bool = False) -> Iterator[Connection]: """`with session() as conn:` — auto close on exit.""" conn = connect(autocommit=autocommit) try: yield conn if not autocommit: conn.commit() except Exception: if not autocommit: conn.rollback() raise finally: conn.close() _pool: ConnectionPool | None = None def pool(min_size: int = 1, max_size: int = 8) -> ConnectionPool: """Lazy-init module-level pool. Use for batch / agent-loop hot path.""" global _pool if _pool is None: _pool = ConnectionPool( conninfo(), min_size=min_size, max_size=max_size, kwargs={"autocommit": False}, open=True, ) return _pool def close_pool() -> None: global _pool if _pool is not None: _pool.close() _pool = None