qwen_agent/routes/memory.py
autobee-sparticle 108c675c3d
feat(memory): add memory management API endpoints (#10)
* chore: add .worktrees/ to .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(memory): add memory management API endpoints

Add new /api/v1/memory endpoints for viewing and managing user memories:
- GET /api/v1/memory - list all memories for a bot
- DELETE /api/v1/memory/{memory_id} - delete single memory
- DELETE /api/v1/memory - delete all memories for a bot

Also add delete_memory and delete_all_memories methods to Mem0Manager.

Issue: #1844

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: zhuchao <zhuchaowe@163.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 18:01:12 +09:00

233 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Memory 管理 API 路由
提供记忆查看和删除功能
"""
import logging
from typing import Optional, List, Dict, Any
from fastapi import APIRouter, HTTPException, Header, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
logger = logging.getLogger('app')
router = APIRouter(prefix="/api/v1", tags=["memory"])
class MemoryItem(BaseModel):
"""单条记忆的数据模型"""
id: str
content: str
created_at: Optional[str] = None
updated_at: Optional[str] = None
class MemoryListResponse(BaseModel):
"""记忆列表响应"""
memories: List[MemoryItem]
total: int
class DeleteAllResponse(BaseModel):
"""删除所有记忆响应"""
deleted_count: int
async def get_user_identifier_from_request(
authorization: Optional[str],
user_id: Optional[str] = None
) -> str:
"""
获取用户标识符
优先使用请求参数中的 user_id否则尝试从 Authorization header 解析
Args:
authorization: Authorization header
user_id: 请求参数中的 user_id
Returns:
用户标识符
Raises:
HTTPException: 如果无法获取用户标识符
"""
if user_id:
return user_id
# 如果没有提供 user_id抛出异常
# 注意:根据 PRDuser_id 从前端传入
raise HTTPException(
status_code=400,
detail="user_id is required"
)
@router.get("/memory", response_model=MemoryListResponse)
async def get_memories(
bot_id: str = Query(..., description="Bot ID (对应 agent_id)"),
user_id: str = Query(..., description="用户 ID"),
authorization: Optional[str] = Header(None, description="Authorization header"),
):
"""
获取当前用户在指定 Bot 下的所有记忆
Args:
bot_id: Bot ID对应 agent_id
user_id: 用户标识符
authorization: Authorization header用于鉴权
Returns:
MemoryListResponse: 记忆列表
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
# 检查 Memory 功能是否启用
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
# 获取 Mem0Manager 实例
manager = get_mem0_manager()
# 获取所有记忆
memories = await manager.get_all_memories(
user_id=user_id,
agent_id=bot_id
)
# 转换为响应格式
memory_items = []
for m in memories:
memory_items.append(MemoryItem(
id=m.get("id", ""),
content=m.get("memory", ""),
created_at=m.get("created_at"),
updated_at=m.get("updated_at")
))
return MemoryListResponse(
memories=memory_items,
total=len(memory_items)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get memories: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to get memories: {str(e)}"
)
@router.delete("/memory/{memory_id}", status_code=204)
async def delete_memory(
memory_id: str,
bot_id: str = Query(..., description="Bot ID (用于权限校验)"),
user_id: str = Query(..., description="用户 ID"),
authorization: Optional[str] = Header(None, description="Authorization header"),
):
"""
删除单条记忆
Args:
memory_id: 记忆 ID
bot_id: Bot ID用于权限校验
user_id: 用户标识符
authorization: Authorization header
Returns:
204 No Content
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
# 检查 Memory 功能是否启用
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
# 获取 Mem0Manager 实例
manager = get_mem0_manager()
# 删除记忆
success = await manager.delete_memory(
memory_id=memory_id,
user_id=user_id,
agent_id=bot_id
)
if not success:
raise HTTPException(
status_code=404,
detail="Memory not found or delete failed"
)
return JSONResponse(status_code=204, content=None)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete memory {memory_id}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete memory: {str(e)}"
)
@router.delete("/memory", response_model=DeleteAllResponse)
async def delete_all_memories(
bot_id: str = Query(..., description="Bot ID"),
user_id: str = Query(..., description="用户 ID"),
authorization: Optional[str] = Header(None, description="Authorization header"),
):
"""
清除指定 Bot 下当前用户的所有记忆
Args:
bot_id: Bot ID
user_id: 用户标识符
authorization: Authorization header
Returns:
DeleteAllResponse: 删除的记忆数量
"""
try:
from agent.mem0_manager import get_mem0_manager
from utils.settings import MEM0_ENABLED
# 检查 Memory 功能是否启用
if not MEM0_ENABLED:
raise HTTPException(
status_code=503,
detail="Memory feature is not enabled"
)
# 获取 Mem0Manager 实例
manager = get_mem0_manager()
# 删除所有记忆
deleted_count = await manager.delete_all_memories(
user_id=user_id,
agent_id=bot_id
)
return DeleteAllResponse(deleted_count=deleted_count)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete all memories: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete all memories: {str(e)}"
)