""" 对外 Bot 分享与客户账号管理接口 包含三组接口: 1. share token CRUD:bot 主人生成/列出/吊销对外访问凭证 2. end-user CRUD:bot 主人添加/查看/重置/删除客户账号 3. public bot-share 信息:客户登录后通过 share_token 获取 bot 渲染所需的最小字段 设计要点: - 客户账号物理上仍写入 agent_user 表,但 is_end_user=True,且不参与 New API 同步 - share_token 是一次性可吊销的字符串,与 agent_bots.owner_id 关联 - 大模型调用时仍按 bot owner 的 new_api_token 走,与现有 chat 流程一致 """ import logging import secrets from datetime import datetime from typing import Optional, List from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel from agent.db_pool_manager import get_db_pool_manager from routes.bot_manager import ( verify_user_auth, is_bot_owner, hash_password, get_parent_user_id, ) logger = logging.getLogger(__name__) router = APIRouter() # ============== Pydantic 模型 ============== class CreateShareTokenRequest(BaseModel): name: Optional[str] = None expires_at: Optional[str] = None # ISO 8601 class ShareTokenItem(BaseModel): id: str bot_id: str share_token: str name: Optional[str] = None created_at: str expires_at: Optional[str] = None revoked_at: Optional[str] = None class ShareTokenListResponse(BaseModel): items: List[ShareTokenItem] class CreateEndUserRequest(BaseModel): username: str password: str email: Optional[str] = None class EndUserItem(BaseModel): id: str username: str email: Optional[str] = None is_active: bool created_at: str last_login: Optional[str] = None class EndUserListResponse(BaseModel): items: List[EndUserItem] class ResetEndUserPasswordRequest(BaseModel): password: str class PublicBotInfoResponse(BaseModel): bot_id: str bot_uuid: str name: str avatar_url: Optional[str] = None welcome: Optional[str] = None suggested_questions: Optional[list] = None settings: Optional[dict] = None # 仅展示用最小字段 class SimpleSuccessResponse(BaseModel): success: bool = True message: Optional[str] = None # ============== 内部权限辅助 ============== async def _require_app_user(authorization: Optional[str]) -> str: """ 要求当前请求是任意应用内用户(admin / 普通用户 / 子账号 / masterkey),但**不允许**对外客户账号。 子账号会被折叠到主账号 id 上(effective_user_id),便于按"主账号 = 资源所有者"做隔离。 返回 effective_user_id(masterkey 时返回 '__masterkey__')。 """ valid, user_id, _ = await verify_user_auth(authorization) if not valid or not user_id: raise HTTPException(status_code=401, detail="未登录或 token 无效") if user_id == "__masterkey__": return user_id pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT is_end_user FROM agent_user WHERE id = %s", (user_id,) ) row = await cursor.fetchone() if row and row[0]: raise HTTPException(status_code=403, detail="客户账号不能访问该接口") # 子账号折叠到主账号 return await get_parent_user_id(user_id) async def _require_end_user(authorization: Optional[str]) -> str: """要求当前请求是 end-user,返回 user_id""" valid, user_id, _ = await verify_user_auth(authorization) if not valid or not user_id or user_id == "__masterkey__": raise HTTPException(status_code=401, detail="未登录或 token 无效") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT is_end_user FROM agent_user WHERE id = %s", (user_id,) ) row = await cursor.fetchone() if not row or not row[0]: raise HTTPException(status_code=403, detail="仅客户账号可访问此接口") return user_id # ============== Share Token CRUD(admin 用) ============== @router.post("/api/v1/bots/{bot_id}/share-tokens", response_model=ShareTokenItem) async def create_share_token( bot_id: str, body: CreateShareTokenRequest, authorization: Optional[str] = Header(None), ): user_id = await _require_app_user(authorization) if user_id != "__masterkey__" and not await is_bot_owner(bot_id, user_id): raise HTTPException(status_code=403, detail="您不是该 bot 的所有者") share_token = secrets.token_urlsafe(24) expires_at = None if body.expires_at: try: expires_at = datetime.fromisoformat(body.expires_at.replace("Z", "+00:00")) except ValueError: raise HTTPException(status_code=400, detail="expires_at 格式无效,应为 ISO 8601") # masterkey 用户没有真实 created_by,需要回退到 bot owner_id created_by = user_id pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: if user_id == "__masterkey__": await cursor.execute( "SELECT owner_id FROM agent_bots WHERE id = %s", (bot_id,) ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="bot 不存在") created_by = str(row[0]) await cursor.execute( """ INSERT INTO agent_bot_share_tokens (bot_id, share_token, name, created_by, expires_at) VALUES (%s, %s, %s, %s, %s) RETURNING id, created_at """, (bot_id, share_token, body.name, created_by, expires_at), ) row = await cursor.fetchone() await conn.commit() return ShareTokenItem( id=str(row[0]), bot_id=bot_id, share_token=share_token, name=body.name, created_at=row[1].isoformat(), expires_at=expires_at.isoformat() if expires_at else None, revoked_at=None, ) @router.get("/api/v1/bots/{bot_id}/share-tokens", response_model=ShareTokenListResponse) async def list_share_tokens( bot_id: str, authorization: Optional[str] = Header(None), ): user_id = await _require_app_user(authorization) if user_id != "__masterkey__" and not await is_bot_owner(bot_id, user_id): raise HTTPException(status_code=403, detail="您不是该 bot 的所有者") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT id, share_token, name, created_at, expires_at, revoked_at FROM agent_bot_share_tokens WHERE bot_id = %s ORDER BY created_at DESC """, (bot_id,), ) rows = await cursor.fetchall() items = [ ShareTokenItem( id=str(r[0]), bot_id=bot_id, share_token=r[1], name=r[2], created_at=r[3].isoformat(), expires_at=r[4].isoformat() if r[4] else None, revoked_at=r[5].isoformat() if r[5] else None, ) for r in rows ] return ShareTokenListResponse(items=items) @router.delete("/api/v1/bot-share-tokens/{share_token_id}", response_model=SimpleSuccessResponse) async def revoke_share_token( share_token_id: str, authorization: Optional[str] = Header(None), ): user_id = await _require_app_user(authorization) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT bot_id FROM agent_bot_share_tokens WHERE id = %s", (share_token_id,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="share token 不存在") bot_id = str(row[0]) if user_id != "__masterkey__" and not await is_bot_owner(bot_id, user_id): raise HTTPException(status_code=403, detail="您不是该 bot 的所有者") await cursor.execute( "UPDATE agent_bot_share_tokens SET revoked_at = NOW() WHERE id = %s", (share_token_id,), ) await conn.commit() return SimpleSuccessResponse(message="已吊销") # ============== End-user CRUD(bot 主人用) ============== # 客户账号的归属用 agent_user.parent_id 表示(end-user 的 parent_id 指向其主人 user_id)。 # 注:子账号也用 parent_id 表示主账号;区分由 is_subaccount / is_end_user 标志位负责。 @router.post("/api/v1/end-users", response_model=EndUserItem) async def create_end_user( body: CreateEndUserRequest, authorization: Optional[str] = Header(None), ): owner_id = await _require_app_user(authorization) if owner_id == "__masterkey__": raise HTTPException(status_code=400, detail="masterkey 不能拥有客户账号,请用普通账号登录") if not body.username or not body.password: raise HTTPException(status_code=400, detail="username 和 password 必填") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( "SELECT id FROM agent_user WHERE username = %s", (body.username,), ) if await cursor.fetchone(): raise HTTPException(status_code=409, detail="用户名已存在") await cursor.execute( """ INSERT INTO agent_user (username, email, password_hash, is_admin, is_subaccount, is_end_user, parent_id) VALUES (%s, %s, %s, FALSE, FALSE, TRUE, %s) RETURNING id, created_at, is_active """, (body.username, body.email, hash_password(body.password), owner_id), ) row = await cursor.fetchone() await conn.commit() return EndUserItem( id=str(row[0]), username=body.username, email=body.email, is_active=row[2], created_at=row[1].isoformat(), last_login=None, ) @router.get("/api/v1/end-users", response_model=EndUserListResponse) async def list_end_users(authorization: Optional[str] = Header(None)): owner_id = await _require_app_user(authorization) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: if owner_id == "__masterkey__": await cursor.execute( """ SELECT id, username, email, is_active, created_at, last_login FROM agent_user WHERE is_end_user = TRUE ORDER BY created_at DESC """ ) else: await cursor.execute( """ SELECT id, username, email, is_active, created_at, last_login FROM agent_user WHERE is_end_user = TRUE AND parent_id = %s ORDER BY created_at DESC """, (owner_id,), ) rows = await cursor.fetchall() items = [ EndUserItem( id=str(r[0]), username=r[1], email=r[2], is_active=r[3], created_at=r[4].isoformat(), last_login=r[5].isoformat() if r[5] else None, ) for r in rows ] return EndUserListResponse(items=items) async def _ensure_owns_end_user(cursor, end_user_id: str, owner_id: str): """校验 end_user_id 存在且归属当前 owner_id(masterkey 通过)""" await cursor.execute( "SELECT parent_id FROM agent_user WHERE id = %s AND is_end_user = TRUE", (end_user_id,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="客户账号不存在") if owner_id != "__masterkey__" and str(row[0]) != owner_id: raise HTTPException(status_code=403, detail="您不是该客户账号的所有者") @router.post("/api/v1/end-users/{end_user_id}/reset-password", response_model=SimpleSuccessResponse) async def reset_end_user_password( end_user_id: str, body: ResetEndUserPasswordRequest, authorization: Optional[str] = Header(None), ): owner_id = await _require_app_user(authorization) if not body.password: raise HTTPException(status_code=400, detail="password 必填") pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await _ensure_owns_end_user(cursor, end_user_id, owner_id) await cursor.execute( "UPDATE agent_user SET password_hash = %s WHERE id = %s", (hash_password(body.password), end_user_id), ) # 清理已有 token,强制重新登录 await cursor.execute( "DELETE FROM agent_user_tokens WHERE user_id = %s", (end_user_id,), ) await conn.commit() return SimpleSuccessResponse(message="密码已重置,原 token 已失效") @router.delete("/api/v1/end-users/{end_user_id}", response_model=SimpleSuccessResponse) async def delete_end_user( end_user_id: str, authorization: Optional[str] = Header(None), ): owner_id = await _require_app_user(authorization) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await _ensure_owns_end_user(cursor, end_user_id, owner_id) await cursor.execute( "DELETE FROM agent_user WHERE id = %s AND is_end_user = TRUE", (end_user_id,), ) await conn.commit() return SimpleSuccessResponse(message="已删除") # ============== Public bot-share 信息(客户用) ============== async def _resolve_share_token(share_token: str) -> str: """校验 share_token 有效,返回对应的 bot_id""" pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT bot_id, expires_at, revoked_at FROM agent_bot_share_tokens WHERE share_token = %s """, (share_token,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="分享链接无效") bot_id, expires_at, revoked_at = row if revoked_at is not None: raise HTTPException(status_code=403, detail="分享链接已被吊销") if expires_at is not None: from datetime import timezone if expires_at < datetime.now(timezone.utc): raise HTTPException(status_code=403, detail="分享链接已过期") return str(bot_id) async def validate_share_token_for_bot(share_token: str, bot_id: str) -> bool: """供 chat.py 校验 share_token 与 bot_id 一致,返回是否有效(无效抛 HTTPException)""" resolved = await _resolve_share_token(share_token) if resolved != bot_id: raise HTTPException(status_code=403, detail="分享链接与 bot 不匹配") return True @router.get("/api/v1/public/bot-share/{share_token}", response_model=PublicBotInfoResponse) async def get_public_bot_by_share( share_token: str, authorization: Optional[str] = Header(None), ): await _require_end_user(authorization) bot_id = await _resolve_share_token(share_token) pool = get_db_pool_manager().pool async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT id, name, settings FROM agent_bots WHERE id = %s """, (bot_id,), ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="bot 不存在") bot_uuid, name, settings = row settings = settings or {} # 只返回展示安全的字段,过滤掉可能含 prompt 等敏感信息的 key safe_settings = { k: v for k, v in settings.items() if k in ("language", "enable_thinking", "enable_memori", "tool_response") } return PublicBotInfoResponse( bot_id=str(bot_uuid), bot_uuid=str(bot_uuid), name=name, avatar_url=settings.get("avatar_url"), welcome=settings.get("welcome") or settings.get("welcome_message"), suggested_questions=settings.get("suggested_questions") or settings.get("guide_questions"), settings=safe_settings, )