""" Database operations API routes. Provides features such as database migrations and schema changes. """ import logging from typing import Optional from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel from agent.db_pool_manager import get_db_pool_manager from utils.settings import MASTERKEY from utils.fastapi_utils import extract_api_key_from_auth logger = logging.getLogger('app') router = APIRouter() def verify_database_auth(authorization: Optional[str]) -> None: """Verify authentication for database operation APIs. Uses MASTERKEY directly for verification. Args: authorization: Authorization header value Raises: HTTPException: Raises 401/403 when authentication fails """ # Extract the provided token provided_token = extract_api_key_from_auth(authorization) if not provided_token: raise HTTPException( status_code=401, detail="Authorization header is required" ) if provided_token != MASTERKEY: raise HTTPException( status_code=403, detail="Invalid authorization token" ) class DatabaseMigrationResponse(BaseModel): """Database migration response.""" success: bool message: str steps_completed: list[str] error: Optional[str] = None class ExecuteSQLRequest(BaseModel): """Execute SQL request.""" sql: str autocommit: bool = True class ExecuteSQLResponse(BaseModel): """Execute SQL response.""" success: bool rows_affected: Optional[int] = None message: str columns: Optional[list[str]] = None data: Optional[list[list]] = None error: Optional[str] = None @router.post("/api/v1/database/migrate-pgvector", response_model=DatabaseMigrationResponse) async def migrate_pgvector(authorization: Optional[str] = Header(None)): """ Run the pgvector extension installation migration Steps performed: 1. Rename the public.vector table to public.vector_legacy 2. Create the pgvector extension (CREATE EXTENSION vector) Note: This operation modifies the database schema. Ensure a backup has been created before running it. Authentication: - Authorization header should contain: Bearer {MASTERKEY} Returns: DatabaseMigrationResponse: Migration result """ # Verify authentication verify_database_auth(authorization) steps_completed = [] pool_manager = get_db_pool_manager() try: # Get the async connection pool = pool_manager.pool # Step 1: Rename the vector table to vector_legacy async with pool.connection() as conn: async with conn.cursor() as cursor: # Check whether the vector table exists await cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'vector' ) """) vector_exists = (await cursor.fetchone())[0] if vector_exists: # Check whether vector_legacy already exists await cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'vector_legacy' ) """) legacy_exists = (await cursor.fetchone())[0] if legacy_exists: steps_completed.append("vector_legacy table already exists, skipping rename") else: # Perform the rename await cursor.execute("ALTER TABLE public.vector RENAME TO vector_legacy") steps_completed.append("Renamed public.vector to public.vector_legacy") logger.info("Renamed public.vector to public.vector_legacy") else: steps_completed.append("public.vector does not exist, skipping rename") # Commit the transaction await conn.commit() # Step 2: Create the pgvector extension async with pool.connection() as conn: async with conn.cursor() as cursor: # Check whether the pgvector extension is already installed await cursor.execute(""" SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'vector' ) """) extension_exists = (await cursor.fetchone())[0] if extension_exists: steps_completed.append("pgvector extension is already installed") else: # Create the pgvector extension await cursor.execute("CREATE EXTENSION vector") steps_completed.append("Successfully installed the pgvector extension") logger.info("Created pgvector extension") # Commit the transaction await conn.commit() return DatabaseMigrationResponse( success=True, message="pgvector migration completed", steps_completed=steps_completed ) except HTTPException: raise except Exception as e: logger.error(f"pgvector migration failed: {e}") return DatabaseMigrationResponse( success=False, message="pgvector migration failed", steps_completed=steps_completed, error=str(e) ) @router.post("/api/v1/database/execute-sql", response_model=ExecuteSQLResponse) async def execute_sql(request: ExecuteSQLRequest, authorization: Optional[str] = Header(None)): """ Execute custom SQL statements Note: This endpoint has elevated privileges. Use it with caution. Authentication: - Authorization header should contain: Bearer {MASTERKEY} Args: request: Request containing the SQL statement and whether to autocommit Returns: ExecuteSQLResponse: Execution result """ # Verify authentication verify_database_auth(authorization) pool_manager = get_db_pool_manager() try: pool = pool_manager.pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute(request.sql) rows_affected = cursor.rowcount # Get column names columns = None data = None if cursor.description: columns = [desc.name for desc in cursor.description] # Get all row data rows = await cursor.fetchall() data = [list(row) for row in rows] if request.autocommit: await conn.commit() return ExecuteSQLResponse( success=True, rows_affected=rows_affected, message=f"SQL executed successfully, rows affected: {rows_affected}, returned rows: {len(data) if data else 0} rows", columns=columns, data=data ) except HTTPException: raise except Exception as e: logger.error(f"SQL execution failed: {e}") raise HTTPException( status_code=500, detail=f"SQL execution failed: {str(e)}" ) @router.get("/api/v1/database/check-pgvector") async def check_pgvector(authorization: Optional[str] = Header(None)): """ Check the pgvector extension installation status Authentication: - Authorization header should contain: Bearer {MASTERKEY} Returns: Status information for the pgvector extension """ # Verify authentication verify_database_auth(authorization) pool_manager = get_db_pool_manager() try: pool = pool_manager.pool async with pool.connection() as conn: async with conn.cursor() as cursor: # Check whether the extension exists await cursor.execute(""" SELECT extname, extversion FROM pg_extension WHERE extname = 'vector' """) extension_result = await cursor.fetchone() # Check vector-related tables await cursor.execute(""" SELECT table_name, table_type FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE '%vector%' ORDER BY table_name """) tables = await cursor.fetchall() return { "extension_installed": extension_result is not None, "extension_version": extension_result[1] if extension_result else None, "vector_tables": [ {"name": row[0], "type": row[1]} for row in tables ] } except HTTPException: raise except Exception as e: logger.error(f"Failed to check pgvector status: {e}") raise HTTPException( status_code=500, detail=f"Check failed: {str(e)}" )