""" Memory 管理 API 路由 提供记忆查看、添加和删除功能 """ import logging from typing import Literal, Optional, List, Dict, Any from fastapi import APIRouter, HTTPException, Header, Query from fastapi.responses import JSONResponse from pydantic import BaseModel, Field logger = logging.getLogger('app') router = APIRouter(prefix="/api/v1", tags=["memory"]) class MemoryItem(BaseModel): """单条记忆的数据模型""" id: str content: str created_at: Optional[str] = None updated_at: Optional[str] = None class MemoryListResponse(BaseModel): """记忆列表响应""" memories: List[MemoryItem] total: int class DeleteAllResponse(BaseModel): """删除所有记忆响应""" deleted_count: int class ConversationMessage(BaseModel): """对话消息""" role: Literal["user", "assistant"] content: str = Field(..., min_length=1) class AddMemoryRequest(BaseModel): """添加记忆的请求体""" bot_id: str = Field(..., min_length=1) user_id: str = Field(..., min_length=1) messages: List[ConversationMessage] = Field(..., max_length=200) class AddMemoryResponse(BaseModel): """添加记忆的响应""" success: bool pairs_processed: int pairs_failed: int = 0 async def get_user_identifier_from_request( authorization: Optional[str], user_id: Optional[str] = None ) -> str: """ 获取用户标识符 优先使用请求参数中的 user_id,否则尝试从 Authorization header 解析 Args: authorization: Authorization header user_id: 请求参数中的 user_id Returns: 用户标识符 Raises: HTTPException: 如果无法获取用户标识符 """ if user_id: return user_id # 如果没有提供 user_id,抛出异常 # 注意:根据 PRD,user_id 从前端传入 raise HTTPException( status_code=400, detail="user_id is required" ) @router.post("/memory", response_model=AddMemoryResponse) async def add_memory_from_conversation(data: AddMemoryRequest): """ 从对话消息中提取并保存记忆 将用户和助手的对话配对,通过 Mem0 提取关键事实并存储。 用于 realtime 语音对话等不经过 Agent 中间件的场景。 此端点供内部服务调用(如 felo-mygpt),不暴露给外部用户。 """ try: from agent.mem0_manager import get_mem0_manager from utils.settings import MEM0_ENABLED if not MEM0_ENABLED: raise HTTPException( status_code=503, detail="Memory feature is not enabled" ) if not data.messages: return AddMemoryResponse(success=True, pairs_processed=0) manager = get_mem0_manager() # 将消息配对为 user-assistant 对,然后调用 add_memory pairs_processed = 0 pairs_failed = 0 i = 0 while i < len(data.messages): msg = data.messages[i] if msg.role == 'user': # 收集连续的 user 消息 user_contents = [msg.content] j = i + 1 while j < len(data.messages) and data.messages[j].role == 'user': user_contents.append(data.messages[j].content) j += 1 user_text = '\n'.join(user_contents) # 检查是否有对应的 assistant 回复 assistant_text = "" if j < len(data.messages) and data.messages[j].role == 'assistant': assistant_text = data.messages[j].content or "" j += 1 if user_text and assistant_text: conversation_text = f"User: {user_text}\nAssistant: {assistant_text}" try: await manager.add_memory( text=conversation_text, user_id=data.user_id, agent_id=data.bot_id, metadata={"type": "realtime_conversation"}, ) pairs_processed += 1 except Exception as pair_error: pairs_failed += 1 logger.error( f"Failed to add memory for pair: {pair_error}" ) i = j else: i += 1 logger.info( f"Added {pairs_processed} memory pairs (failed={pairs_failed}) " f"for user={data.user_id}, bot={data.bot_id}" ) return AddMemoryResponse( success=pairs_failed == 0, pairs_processed=pairs_processed, pairs_failed=pairs_failed, ) except HTTPException: raise except Exception as e: logger.error(f"Failed to add memory from conversation: {e}") raise HTTPException( status_code=500, detail="Failed to add memory from conversation" ) @router.get("/memory", response_model=MemoryListResponse) async def get_memories( bot_id: str = Query(..., description="Bot ID (对应 agent_id)"), user_id: str = Query(..., description="用户 ID"), authorization: Optional[str] = Header(None, description="Authorization header"), ): """ 获取当前用户在指定 Bot 下的所有记忆 Args: bot_id: Bot ID(对应 agent_id) user_id: 用户标识符 authorization: Authorization header(用于鉴权) Returns: MemoryListResponse: 记忆列表 """ try: from agent.mem0_manager import get_mem0_manager from utils.settings import MEM0_ENABLED # 检查 Memory 功能是否启用 if not MEM0_ENABLED: raise HTTPException( status_code=503, detail="Memory feature is not enabled" ) # 获取 Mem0Manager 实例 manager = get_mem0_manager() # 获取所有记忆 memories = await manager.get_all_memories( user_id=user_id, agent_id=bot_id ) # 转换为响应格式 memory_items = [] for m in memories: memory_items.append(MemoryItem( id=m.get("id", ""), content=m.get("memory", ""), created_at=m.get("created_at"), updated_at=m.get("updated_at") )) return MemoryListResponse( memories=memory_items, total=len(memory_items) ) except HTTPException: raise except Exception as e: logger.error(f"Failed to get memories: {e}") raise HTTPException( status_code=500, detail=f"Failed to get memories: {str(e)}" ) @router.delete("/memory/{memory_id}", status_code=204) async def delete_memory( memory_id: str, bot_id: str = Query(..., description="Bot ID (用于权限校验)"), user_id: str = Query(..., description="用户 ID"), authorization: Optional[str] = Header(None, description="Authorization header"), ): """ 删除单条记忆 Args: memory_id: 记忆 ID bot_id: Bot ID(用于权限校验) user_id: 用户标识符 authorization: Authorization header Returns: 204 No Content """ try: from agent.mem0_manager import get_mem0_manager from utils.settings import MEM0_ENABLED # 检查 Memory 功能是否启用 if not MEM0_ENABLED: raise HTTPException( status_code=503, detail="Memory feature is not enabled" ) # 获取 Mem0Manager 实例 manager = get_mem0_manager() # 删除记忆 success = await manager.delete_memory( memory_id=memory_id, user_id=user_id, agent_id=bot_id ) if not success: raise HTTPException( status_code=404, detail="Memory not found or delete failed" ) return JSONResponse(status_code=204, content=None) except HTTPException: raise except Exception as e: logger.error(f"Failed to delete memory {memory_id}: {e}") raise HTTPException( status_code=500, detail=f"Failed to delete memory: {str(e)}" ) @router.delete("/memory", response_model=DeleteAllResponse) async def delete_all_memories( bot_id: str = Query(..., description="Bot ID"), user_id: str = Query(..., description="用户 ID"), authorization: Optional[str] = Header(None, description="Authorization header"), ): """ 清除指定 Bot 下当前用户的所有记忆 Args: bot_id: Bot ID user_id: 用户标识符 authorization: Authorization header Returns: DeleteAllResponse: 删除的记忆数量 """ try: from agent.mem0_manager import get_mem0_manager from utils.settings import MEM0_ENABLED # 检查 Memory 功能是否启用 if not MEM0_ENABLED: raise HTTPException( status_code=503, detail="Memory feature is not enabled" ) # 获取 Mem0Manager 实例 manager = get_mem0_manager() # 删除所有记忆 deleted_count = await manager.delete_all_memories( user_id=user_id, agent_id=bot_id ) return DeleteAllResponse(deleted_count=deleted_count) except HTTPException: raise except Exception as e: logger.error(f"Failed to delete all memories: {e}") raise HTTPException( status_code=500, detail=f"Failed to delete all memories: {str(e)}" )