qwen_agent/routes/memory.py
autobee-sparticle a161e43421
feat: add POST /api/v1/memory endpoint for realtime conversation memory (#17)
* feat: add POST /api/v1/memory endpoint for realtime conversation memory

Add memory extraction API that accepts conversation messages and
stores them via Mem0. This enables realtime voice sessions to save
memories through the same pipeline as chat conversations.

Fixes: sparticleinc/mygpt-frontend#2126

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address code review findings for memory API

- Use Literal["user","assistant"] for role field validation
- Add Field constraints (min_length, max_length=200)
- Track and report pairs_failed in response
- Hide internal exception details from HTTP response
- Remove unused authorization parameter (internal API)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: zhuchao <zhuchaowe@163.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 11:14:02 +09:00

339 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Memory 管理 API 路由
提供记忆查看、添加和删除功能
"""
import logging
from typing import Literal, Optional, List, Dict, Any
from fastapi import APIRouter, HTTPException, Header, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
logger = logging.getLogger('app')
router = APIRouter(prefix="/api/v1", tags=["memory"])
class MemoryItem(BaseModel):
"""单条记忆的数据模型"""
id: str
content: str
created_at: Optional[str] = None
updated_at: Optional[str] = None
class MemoryListResponse(BaseModel):
"""记忆列表响应"""
memories: List[MemoryItem]
total: int
class DeleteAllResponse(BaseModel):
"""删除所有记忆响应"""
deleted_count: int
class ConversationMessage(BaseModel):
"""对话消息"""
role: Literal["user", "assistant"]
content: str = Field(..., min_length=1)
class AddMemoryRequest(BaseModel):
"""添加记忆的请求体"""
bot_id: str = Field(..., min_length=1)
user_id: str = Field(..., min_length=1)
messages: List[ConversationMessage] = Field(..., max_length=200)
class AddMemoryResponse(BaseModel):
"""添加记忆的响应"""
success: bool
pairs_processed: int
pairs_failed: int = 0
async def get_user_identifier_from_request(
authorization: Optional[str],
user_id: Optional[str] = None
) -> str:
"""
获取用户标识符
优先使用请求参数中的 user_id否则尝试从 Authorization header 解析
Args:
authorization: Authorization header
user_id: 请求参数中的 user_id
Returns:
用户标识符
Raises:
HTTPException: 如果无法获取用户标识符
"""
if user_id:
return user_id
# 如果没有提供 user_id抛出异常
# 注意:根据 PRDuser_id 从前端传入
raise HTTPException(
status_code=400,
detail="user_id is required"
)
@router.post("/memory", response_model=AddMemoryResponse)
async def add_memory_from_conversation(data: AddMemoryRequest):
"""
从对话消息中提取并保存记忆
将用户和助手的对话配对,通过 Mem0 提取关键事实并存储。
用于 realtime 语音对话等不经过 Agent 中间件的场景。
此端点供内部服务调用(如 felo-mygpt不暴露给外部用户。
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
if not data.messages:
return AddMemoryResponse(success=True, pairs_processed=0)
manager = get_mem0_manager()
# 将消息配对为 user-assistant 对,然后调用 add_memory
pairs_processed = 0
pairs_failed = 0
i = 0
while i < len(data.messages):
msg = data.messages[i]
if msg.role == 'user':
# 收集连续的 user 消息
user_contents = [msg.content]
j = i + 1
while j < len(data.messages) and data.messages[j].role == 'user':
user_contents.append(data.messages[j].content)
j += 1
user_text = '\n'.join(user_contents)
# 检查是否有对应的 assistant 回复
assistant_text = ""
if j < len(data.messages) and data.messages[j].role == 'assistant':
assistant_text = data.messages[j].content or ""
j += 1
if user_text and assistant_text:
conversation_text = f"User: {user_text}\nAssistant: {assistant_text}"
try:
await manager.add_memory(
text=conversation_text,
user_id=data.user_id,
agent_id=data.bot_id,
metadata={"type": "realtime_conversation"},
)
pairs_processed += 1
except Exception as pair_error:
pairs_failed += 1
logger.error(
f"Failed to add memory for pair: {pair_error}"
)
i = j
else:
i += 1
logger.info(
f"Added {pairs_processed} memory pairs (failed={pairs_failed}) "
f"for user={data.user_id}, bot={data.bot_id}"
)
return AddMemoryResponse(
success=pairs_failed == 0,
pairs_processed=pairs_processed,
pairs_failed=pairs_failed,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to add memory from conversation: {e}")
raise HTTPException(
status_code=500,
detail="Failed to add memory from conversation"
)
@router.get("/memory", response_model=MemoryListResponse)
async def get_memories(
bot_id: str = Query(..., description="Bot ID (对应 agent_id)"),
user_id: str = Query(..., description="用户 ID"),
authorization: Optional[str] = Header(None, description="Authorization header"),
):
"""
获取当前用户在指定 Bot 下的所有记忆
Args:
bot_id: Bot ID对应 agent_id
user_id: 用户标识符
authorization: Authorization header用于鉴权
Returns:
MemoryListResponse: 记忆列表
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
# 检查 Memory 功能是否启用
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
# 获取 Mem0Manager 实例
manager = get_mem0_manager()
# 获取所有记忆
memories = await manager.get_all_memories(
user_id=user_id,
agent_id=bot_id
)
# 转换为响应格式
memory_items = []
for m in memories:
memory_items.append(MemoryItem(
id=m.get("id", ""),
content=m.get("memory", ""),
created_at=m.get("created_at"),
updated_at=m.get("updated_at")
))
return MemoryListResponse(
memories=memory_items,
total=len(memory_items)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get memories: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to get memories: {str(e)}"
)
@router.delete("/memory/{memory_id}", status_code=204)
async def delete_memory(
memory_id: str,
bot_id: str = Query(..., description="Bot ID (用于权限校验)"),
user_id: str = Query(..., description="用户 ID"),
authorization: Optional[str] = Header(None, description="Authorization header"),
):
"""
删除单条记忆
Args:
memory_id: 记忆 ID
bot_id: Bot ID用于权限校验
user_id: 用户标识符
authorization: Authorization header
Returns:
204 No Content
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
# 检查 Memory 功能是否启用
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
# 获取 Mem0Manager 实例
manager = get_mem0_manager()
# 删除记忆
success = await manager.delete_memory(
memory_id=memory_id,
user_id=user_id,
agent_id=bot_id
)
if not success:
raise HTTPException(
status_code=404,
detail="Memory not found or delete failed"
)
return JSONResponse(status_code=204, content=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete memory {memory_id}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete memory: {str(e)}"
)
@router.delete("/memory", response_model=DeleteAllResponse)
async def delete_all_memories(
bot_id: str = Query(..., description="Bot ID"),
user_id: str = Query(..., description="用户 ID"),
authorization: Optional[str] = Header(None, description="Authorization header"),
):
"""
清除指定 Bot 下当前用户的所有记忆
Args:
bot_id: Bot ID
user_id: 用户标识符
authorization: Authorization header
Returns:
DeleteAllResponse: 删除的记忆数量
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
# 检查 Memory 功能是否启用
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
# 获取 Mem0Manager 实例
manager = get_mem0_manager()
# 删除所有记忆
deleted_count = await manager.delete_all_memories(
user_id=user_id,
agent_id=bot_id
)
return DeleteAllResponse(deleted_count=deleted_count)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete all memories: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete all memories: {str(e)}"
)