""" 对外 Bot 分享与客户账号管理接口 包含三组接口: 1. share token CRUD:bot 主人生成/列出/吊销对外访问凭证 2. end-user CRUD:bot 主人添加/查看/重置/删除客户账号 3. public bot-share 信息:客户登录后通过 share_token 获取 bot 渲染所需的最小字段 设计要点: - 客户账号物理上仍写入 agent_user 表,但 is_end_user=True,且不参与 New API 同步 - share_token 是一次性可吊销的字符串,与 agent_bots.owner_id 关联 - 大模型调用时仍按 bot owner 的 new_api_token 走,与现有 chat 流程一致 """ import logging import secrets from datetime import datetime from typing import Optional, List from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel from agent.db_pool_manager import get_db_pool_manager from routes.bot_manager import ( verify_user_auth, is_bot_owner, is_admin_user, hash_password, get_parent_user_id, ) logger = logging.getLogger(__name__) router = APIRouter() # ============== Pydantic 模型 ============== class CreateShareTokenRequest(BaseModel): name: Optional[str] = None expires_at: Optional[str] = None # ISO 8601 class ShareTokenItem(BaseModel): id: str bot_id: str share_token: str name: Optional[str] = None created_at: str expires_at: Optional[str] = None revoked_at: Optional[str] = None class ShareTokenListResponse(BaseModel): items: List[ShareTokenItem] class CreateEndUserRequest(BaseModel): username: str password: str email: Optional[str] = None class EndUserItem(BaseModel): id: str username: str email: Optional[str] = None is_active: bool created_at: str last_login: Optional[str] = None class EndUserListResponse(BaseModel): items: List[EndUserItem] class ResetEndUserPasswordRequest(BaseModel): password: str class PublicBotInfoResponse(BaseModel): bot_id: str bot_uuid: str name: str avatar_url: Optional[str] = None welcome: Optional[str] = None suggested_questions: Optional[list] = None settings: Optional[dict] = None # 仅展示用最小字段 class SimpleSuccessResponse(BaseModel): success: bool = True message: Optional[str] = None # ============== 内部权限辅助 ============== async def _require_admin_user(authorization: Optional[str]) -> str: """要求当前请求是 admin(is_admin=True 或 masterkey),返回 user_id""" valid, user_id, _ = await verify_user_auth(authorization) if not valid or not user_id: raise HTTPException(status_code=401, detail="未登录或 token 无效") if user_id == "__masterkey__": return user_id if not await is_admin_user(authorization): raise HTTPException(status_code=403, detail="仅 admin 可执行此操作") return user_id async def _require_end_user(authorization: Optional[str]) -> str: """要求当前请求是 end-user,返回 user_id""" valid, user_id, _ = await verify_user_auth(authorization) if not valid or not user_id or user_id == "__masterkey__": raise HTTPException(status_code=401, detail="未登录或 token 无效") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT is_end_user FROM agent_user WHERE id = %s", (user_id,) ) row = await cursor.fetchone() if not row or not row[0]: raise HTTPException(status_code=403, detail="仅客户账号可访问此接口") return user_id # ============== Share Token CRUD(admin 用) ============== @router.post("/api/v1/bots/{bot_id}/share-tokens", response_model=ShareTokenItem) async def create_share_token( bot_id: str, body: CreateShareTokenRequest, authorization: Optional[str] = Header(None), ): user_id = await _require_admin_user(authorization) if user_id != "__masterkey__" and not await is_bot_owner(bot_id, user_id): raise HTTPException(status_code=403, detail="您不是该 bot 的所有者") share_token = secrets.token_urlsafe(24) expires_at = None if body.expires_at: try: expires_at = datetime.fromisoformat(body.expires_at.replace("Z", "+00:00")) except ValueError: raise HTTPException(status_code=400, detail="expires_at 格式无效,应为 ISO 8601") # masterkey 用户没有真实 created_by,需要回退到 bot owner_id created_by = user_id pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: if user_id == "__masterkey__": await cursor.execute( "SELECT owner_id FROM agent_bots WHERE id = %s", (bot_id,) ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="bot 不存在") created_by = str(row[0]) await cursor.execute( """ INSERT INTO agent_bot_share_tokens (bot_id, share_token, name, created_by, expires_at) VALUES (%s, %s, %s, %s, %s) RETURNING id, created_at """, (bot_id, share_token, body.name, created_by, expires_at), ) row = await cursor.fetchone() await conn.commit() return ShareTokenItem( id=str(row[0]), bot_id=bot_id, share_token=share_token, name=body.name, created_at=row[1].isoformat(), expires_at=expires_at.isoformat() if expires_at else None, revoked_at=None, ) @router.get("/api/v1/bots/{bot_id}/share-tokens", response_model=ShareTokenListResponse) async def list_share_tokens( bot_id: str, authorization: Optional[str] = Header(None), ): user_id = await _require_admin_user(authorization) if user_id != "__masterkey__" and not await is_bot_owner(bot_id, user_id): raise HTTPException(status_code=403, detail="您不是该 bot 的所有者") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT id, share_token, name, created_at, expires_at, revoked_at FROM agent_bot_share_tokens WHERE bot_id = %s ORDER BY created_at DESC """, (bot_id,), ) rows = await cursor.fetchall() items = [ ShareTokenItem( id=str(r[0]), bot_id=bot_id, share_token=r[1], name=r[2], created_at=r[3].isoformat(), expires_at=r[4].isoformat() if r[4] else None, revoked_at=r[5].isoformat() if r[5] else None, ) for r in rows ] return ShareTokenListResponse(items=items) @router.delete("/api/v1/bot-share-tokens/{share_token_id}", response_model=SimpleSuccessResponse) async def revoke_share_token( share_token_id: str, authorization: Optional[str] = Header(None), ): user_id = await _require_admin_user(authorization) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT bot_id FROM agent_bot_share_tokens WHERE id = %s", (share_token_id,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="share token 不存在") bot_id = str(row[0]) if user_id != "__masterkey__" and not await is_bot_owner(bot_id, user_id): raise HTTPException(status_code=403, detail="您不是该 bot 的所有者") await cursor.execute( "UPDATE agent_bot_share_tokens SET revoked_at = NOW() WHERE id = %s", (share_token_id,), ) await conn.commit() return SimpleSuccessResponse(message="已吊销") # ============== End-user CRUD(admin 用) ============== @router.post("/api/v1/end-users", response_model=EndUserItem) async def create_end_user( body: CreateEndUserRequest, authorization: Optional[str] = Header(None), ): await _require_admin_user(authorization) if not body.username or not body.password: raise HTTPException(status_code=400, detail="username 和 password 必填") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT id FROM agent_user WHERE username = %s", (body.username,), ) if await cursor.fetchone(): raise HTTPException(status_code=409, detail="用户名已存在") await cursor.execute( """ INSERT INTO agent_user (username, email, password_hash, is_admin, is_subaccount, is_end_user) VALUES (%s, %s, %s, FALSE, FALSE, TRUE) RETURNING id, created_at, is_active """, (body.username, body.email, hash_password(body.password)), ) row = await cursor.fetchone() await conn.commit() return EndUserItem( id=str(row[0]), username=body.username, email=body.email, is_active=row[2], created_at=row[1].isoformat(), last_login=None, ) @router.get("/api/v1/end-users", response_model=EndUserListResponse) async def list_end_users(authorization: Optional[str] = Header(None)): await _require_admin_user(authorization) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT id, username, email, is_active, created_at, last_login FROM agent_user WHERE is_end_user = TRUE ORDER BY created_at DESC """ ) rows = await cursor.fetchall() items = [ EndUserItem( id=str(r[0]), username=r[1], email=r[2], is_active=r[3], created_at=r[4].isoformat(), last_login=r[5].isoformat() if r[5] else None, ) for r in rows ] return EndUserListResponse(items=items) @router.post("/api/v1/end-users/{end_user_id}/reset-password", response_model=SimpleSuccessResponse) async def reset_end_user_password( end_user_id: str, body: ResetEndUserPasswordRequest, authorization: Optional[str] = Header(None), ): await _require_admin_user(authorization) if not body.password: raise HTTPException(status_code=400, detail="password 必填") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT id FROM agent_user WHERE id = %s AND is_end_user = TRUE", (end_user_id,), ) if not await cursor.fetchone(): raise HTTPException(status_code=404, detail="客户账号不存在") await cursor.execute( "UPDATE agent_user SET password_hash = %s WHERE id = %s", (hash_password(body.password), end_user_id), ) # 清理已有 token,强制重新登录 await cursor.execute( "DELETE FROM agent_user_tokens WHERE user_id = %s", (end_user_id,), ) await conn.commit() return SimpleSuccessResponse(message="密码已重置,原 token 已失效") @router.delete("/api/v1/end-users/{end_user_id}", response_model=SimpleSuccessResponse) async def delete_end_user( end_user_id: str, authorization: Optional[str] = Header(None), ): await _require_admin_user(authorization) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "DELETE FROM agent_user WHERE id = %s AND is_end_user = TRUE", (end_user_id,), ) await conn.commit() return SimpleSuccessResponse(message="已删除") # ============== Public bot-share 信息(客户用) ============== async def _resolve_share_token(share_token: str) -> str: """校验 share_token 有效,返回对应的 bot_id""" pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT bot_id, expires_at, revoked_at FROM agent_bot_share_tokens WHERE share_token = %s """, (share_token,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="分享链接无效") bot_id, expires_at, revoked_at = row if revoked_at is not None: raise HTTPException(status_code=403, detail="分享链接已被吊销") if expires_at is not None: from datetime import timezone if expires_at < datetime.now(timezone.utc): raise HTTPException(status_code=403, detail="分享链接已过期") return str(bot_id) async def validate_share_token_for_bot(share_token: str, bot_id: str) -> bool: """供 chat.py 校验 share_token 与 bot_id 一致,返回是否有效(无效抛 HTTPException)""" resolved = await _resolve_share_token(share_token) if resolved != bot_id: raise HTTPException(status_code=403, detail="分享链接与 bot 不匹配") return True @router.get("/api/v1/public/bot-share/{share_token}", response_model=PublicBotInfoResponse) async def get_public_bot_by_share( share_token: str, authorization: Optional[str] = Header(None), ): await _require_end_user(authorization) bot_id = await _resolve_share_token(share_token) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT id, name, settings FROM agent_bots WHERE id = %s """, (bot_id,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="bot 不存在") bot_uuid, name, settings = row settings = settings or {} # 只返回展示安全的字段,过滤掉可能含 prompt 等敏感信息的 key safe_settings = { k: v for k, v in settings.items() if k in ("language", "enable_thinking", "enable_memori", "tool_response") } return PublicBotInfoResponse( bot_id=str(bot_uuid), bot_uuid=str(bot_uuid), name=name, avatar_url=settings.get("avatar_url"), welcome=settings.get("welcome") or settings.get("welcome_message"), suggested_questions=settings.get("suggested_questions") or settings.get("guide_questions"), settings=safe_settings, )