qwen_agent/routes/database.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

291 lines
9.4 KiB
Python

"""
Database operations API routes.
Provides features such as database migrations and schema changes.
"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Header
from pydantic import BaseModel
from agent.db_pool_manager import get_db_pool_manager
from utils.settings import MASTERKEY
from utils.fastapi_utils import extract_api_key_from_auth
logger = logging.getLogger('app')
router = APIRouter()
def verify_database_auth(authorization: Optional[str]) -> None:
"""Verify authentication for database operation APIs.
Uses MASTERKEY directly for verification.
Args:
authorization: Authorization header value
Raises:
HTTPException: Raises 401/403 when authentication fails
"""
# Extract the provided token
provided_token = extract_api_key_from_auth(authorization)
if not provided_token:
raise HTTPException(
status_code=401,
detail="Authorization header is required"
)
if provided_token != MASTERKEY:
raise HTTPException(
status_code=403,
detail="Invalid authorization token"
)
class DatabaseMigrationResponse(BaseModel):
"""Database migration response."""
success: bool
message: str
steps_completed: list[str]
error: Optional[str] = None
class ExecuteSQLRequest(BaseModel):
"""Execute SQL request."""
sql: str
autocommit: bool = True
class ExecuteSQLResponse(BaseModel):
"""Execute SQL response."""
success: bool
rows_affected: Optional[int] = None
message: str
columns: Optional[list[str]] = None
data: Optional[list[list]] = None
error: Optional[str] = None
@router.post("/api/v1/database/migrate-pgvector", response_model=DatabaseMigrationResponse)
async def migrate_pgvector(authorization: Optional[str] = Header(None)):
"""
Run the pgvector extension installation migration
Steps performed:
1. Rename the public.vector table to public.vector_legacy
2. Create the pgvector extension (CREATE EXTENSION vector)
Note: This operation modifies the database schema. Ensure a backup has been created before running it.
Authentication:
- Authorization header should contain: Bearer {MASTERKEY}
Returns:
DatabaseMigrationResponse: Migration result
"""
# Verify authentication
verify_database_auth(authorization)
steps_completed = []
pool_manager = get_db_pool_manager()
try:
# Get the async connection
pool = pool_manager.pool
# Step 1: Rename the vector table to vector_legacy
async with pool.connection() as conn:
async with conn.cursor() as cursor:
# Check whether the vector table exists
await cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'vector'
)
""")
vector_exists = (await cursor.fetchone())[0]
if vector_exists:
# Check whether vector_legacy already exists
await cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'vector_legacy'
)
""")
legacy_exists = (await cursor.fetchone())[0]
if legacy_exists:
steps_completed.append("vector_legacy table already exists, skipping rename")
else:
# Perform the rename
await cursor.execute("ALTER TABLE public.vector RENAME TO vector_legacy")
steps_completed.append("Renamed public.vector to public.vector_legacy")
logger.info("Renamed public.vector to public.vector_legacy")
else:
steps_completed.append("public.vector does not exist, skipping rename")
# Commit the transaction
await conn.commit()
# Step 2: Create the pgvector extension
async with pool.connection() as conn:
async with conn.cursor() as cursor:
# Check whether the pgvector extension is already installed
await cursor.execute("""
SELECT EXISTS (
SELECT 1 FROM pg_extension WHERE extname = 'vector'
)
""")
extension_exists = (await cursor.fetchone())[0]
if extension_exists:
steps_completed.append("pgvector extension is already installed")
else:
# Create the pgvector extension
await cursor.execute("CREATE EXTENSION vector")
steps_completed.append("Successfully installed the pgvector extension")
logger.info("Created pgvector extension")
# Commit the transaction
await conn.commit()
return DatabaseMigrationResponse(
success=True,
message="pgvector migration completed",
steps_completed=steps_completed
)
except HTTPException:
raise
except Exception as e:
logger.error(f"pgvector migration failed: {e}")
return DatabaseMigrationResponse(
success=False,
message="pgvector migration failed",
steps_completed=steps_completed,
error=str(e)
)
@router.post("/api/v1/database/execute-sql", response_model=ExecuteSQLResponse)
async def execute_sql(request: ExecuteSQLRequest, authorization: Optional[str] = Header(None)):
"""
Execute custom SQL statements
Note: This endpoint has elevated privileges. Use it with caution.
Authentication:
- Authorization header should contain: Bearer {MASTERKEY}
Args:
request: Request containing the SQL statement and whether to autocommit
Returns:
ExecuteSQLResponse: Execution result
"""
# Verify authentication
verify_database_auth(authorization)
pool_manager = get_db_pool_manager()
try:
pool = pool_manager.pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(request.sql)
rows_affected = cursor.rowcount
# Get column names
columns = None
data = None
if cursor.description:
columns = [desc.name for desc in cursor.description]
# Get all row data
rows = await cursor.fetchall()
data = [list(row) for row in rows]
if request.autocommit:
await conn.commit()
return ExecuteSQLResponse(
success=True,
rows_affected=rows_affected,
message=f"SQL executed successfully, rows affected: {rows_affected}, returned rows: {len(data) if data else 0} rows",
columns=columns,
data=data
)
except HTTPException:
raise
except Exception as e:
logger.error(f"SQL execution failed: {e}")
raise HTTPException(
status_code=500,
detail=f"SQL execution failed: {str(e)}"
)
@router.get("/api/v1/database/check-pgvector")
async def check_pgvector(authorization: Optional[str] = Header(None)):
"""
Check the pgvector extension installation status
Authentication:
- Authorization header should contain: Bearer {MASTERKEY}
Returns:
Status information for the pgvector extension
"""
# Verify authentication
verify_database_auth(authorization)
pool_manager = get_db_pool_manager()
try:
pool = pool_manager.pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
# Check whether the extension exists
await cursor.execute("""
SELECT
extname,
extversion
FROM pg_extension
WHERE extname = 'vector'
""")
extension_result = await cursor.fetchone()
# Check vector-related tables
await cursor.execute("""
SELECT
table_name,
table_type
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE '%vector%'
ORDER BY table_name
""")
tables = await cursor.fetchall()
return {
"extension_installed": extension_result is not None,
"extension_version": extension_result[1] if extension_result else None,
"vector_tables": [
{"name": row[0], "type": row[1]}
for row in tables
]
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to check pgvector status: {e}")
raise HTTPException(
status_code=500,
detail=f"Check failed: {str(e)}"
)