feat: add POST /api/v1/memory endpoint for realtime conversation memory (#17)

* feat: add POST /api/v1/memory endpoint for realtime conversation memory

Add memory extraction API that accepts conversation messages and
stores them via Mem0. This enables realtime voice sessions to save
memories through the same pipeline as chat conversations.

Fixes: sparticleinc/mygpt-frontend#2126

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address code review findings for memory API

- Use Literal["user","assistant"] for role field validation
- Add Field constraints (min_length, max_length=200)
- Track and report pairs_failed in response
- Hide internal exception details from HTTP response
- Remove unused authorization parameter (internal API)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: zhuchao <zhuchaowe@163.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
autobee-sparticle 2026-03-17 11:14:02 +09:00 committed by GitHub
parent deb78a7625
commit a161e43421
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,13 +1,13 @@
"""
Memory 管理 API 路由
提供记忆查看和删除功能
提供记忆查看添加和删除功能
"""
import logging
from typing import Optional, List, Dict, Any
from typing import Literal, Optional, List, Dict, Any
from fastapi import APIRouter, HTTPException, Header, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pydantic import BaseModel, Field
logger = logging.getLogger('app')
@ -33,6 +33,26 @@ class DeleteAllResponse(BaseModel):
deleted_count: int
class ConversationMessage(BaseModel):
"""对话消息"""
role: Literal["user", "assistant"]
content: str = Field(..., min_length=1)
class AddMemoryRequest(BaseModel):
"""添加记忆的请求体"""
bot_id: str = Field(..., min_length=1)
user_id: str = Field(..., min_length=1)
messages: List[ConversationMessage] = Field(..., max_length=200)
class AddMemoryResponse(BaseModel):
"""添加记忆的响应"""
success: bool
pairs_processed: int
pairs_failed: int = 0
async def get_user_identifier_from_request(
authorization: Optional[str],
user_id: Optional[str] = None
@ -63,6 +83,92 @@ async def get_user_identifier_from_request(
)
@router.post("/memory", response_model=AddMemoryResponse)
async def add_memory_from_conversation(data: AddMemoryRequest):
"""
从对话消息中提取并保存记忆
将用户和助手的对话配对通过 Mem0 提取关键事实并存储
用于 realtime 语音对话等不经过 Agent 中间件的场景
此端点供内部服务调用 felo-mygpt不暴露给外部用户
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
if not data.messages:
return AddMemoryResponse(success=True, pairs_processed=0)
manager = get_mem0_manager()
# 将消息配对为 user-assistant 对,然后调用 add_memory
pairs_processed = 0
pairs_failed = 0
i = 0
while i < len(data.messages):
msg = data.messages[i]
if msg.role == 'user':
# 收集连续的 user 消息
user_contents = [msg.content]
j = i + 1
while j < len(data.messages) and data.messages[j].role == 'user':
user_contents.append(data.messages[j].content)
j += 1
user_text = '\n'.join(user_contents)
# 检查是否有对应的 assistant 回复
assistant_text = ""
if j < len(data.messages) and data.messages[j].role == 'assistant':
assistant_text = data.messages[j].content or ""
j += 1
if user_text and assistant_text:
conversation_text = f"User: {user_text}\nAssistant: {assistant_text}"
try:
await manager.add_memory(
text=conversation_text,
user_id=data.user_id,
agent_id=data.bot_id,
metadata={"type": "realtime_conversation"},
)
pairs_processed += 1
except Exception as pair_error:
pairs_failed += 1
logger.error(
f"Failed to add memory for pair: {pair_error}"
)
i = j
else:
i += 1
logger.info(
f"Added {pairs_processed} memory pairs (failed={pairs_failed}) "
f"for user={data.user_id}, bot={data.bot_id}"
)
return AddMemoryResponse(
success=pairs_failed == 0,
pairs_processed=pairs_processed,
pairs_failed=pairs_failed,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to add memory from conversation: {e}")
raise HTTPException(
status_code=500,
detail="Failed to add memory from conversation"
)
@router.get("/memory", response_model=MemoryListResponse)
async def get_memories(
bot_id: str = Query(..., description="Bot ID (对应 agent_id)"),