Replace Memori with Mem0 for memory management: - Delete memori_config.py, memori_manager.py, memori_middleware.py - Add mem0_config.py, mem0_manager.py, mem0_middleware.py - Update environment variables (MEMORI_* -> MEM0_*) - Integrate Mem0 with LangGraph middleware - Add sync connection pool for Mem0 in DBPoolManager - Move checkpoint message prep to config creation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
12 KiB
12 KiB
feat: Memori 长期记忆系统集成
概述
将 Memori (https://github.com/MemoriLabs/Memori) 的 SQL 原生长期记忆能力集成到现有 Agent 系统 (create_agent 和 create_deep_agent) 中,为用户提供跨会话的持久化记忆功能。
背景
当前状态
系统目前使用两种 Agent 类型:
- general_agent: 基于
langchain.agents.create_agent - deep_agent: 基于
deepagents.create_deep_agent
现有的记忆管理:
CheckpointerManager: PostgreSQL checkpoint 持久化ChatHistoryManager: 聊天记录存储AgentMemoryCacheManager: MCP 工具缓存
痛点:
- Agent 无法跨会话记住用户偏好、历史事实
- 没有语义搜索能力来检索相关历史信息
- 需要手动管理记忆提取和存储逻辑
Memori 简介
Memori 是一个开源 Python SDK,提供:
- 自动对话跟踪 - 捕获所有 LLM 交互
- 高级增强 - AI 驱动的事实、偏好、技能、事件提取
- 语义搜索 - 基于 FAISS/向量嵌入的召回
- 零延迟 - 后台处理不阻塞响应
- 基础设施无关 - 支持 SQLite/PostgreSQL/MySQL/MongoDB
拟议方案
架构设计
graph TB
subgraph "FastAPI 应用"
A[AgentConfig] --> B[init_agent]
B --> C{robot_type}
C -->|general_agent| D[create_agent]
C -->|deep_agent| E[create_deep_agent]
end
subgraph "Memori 集成层"
F[MemoriManager] --> G[MemoriMiddleware]
G --> H[before_agent - 记忆召回]
G --> I[after_agent - 记忆存储]
end
subgraph "数据存储"
J[(PostgreSQL)]
J1[memori_* 表]
J2[checkpoints 表]
J3[chat_messages 表]
J --> J1
J --> J2
J --> J3
end
D --> G
E --> G
F --> J
核心组件
1. MemoriManager (agent/memori_manager.py)
class MemoriManager:
"""Memori 连接和实例管理"""
def __init__(self, db_pool: AsyncConnectionPool):
self.pool = db_pool
self._instances: Dict[str, Memori] = {}
async def get_memori(
self,
bot_id: str,
user_identifier: str,
session_id: str
) -> Memori:
"""获取或创建 Memori 实例"""
cache_key = f"{bot_id}:{user_identifier}"
if cache_key not in self._instances:
# 创建 Memori 实例
mem = Memori(conn=self._get_session())
mem.attribution(
entity_id=user_identifier, # 用户
process_id=bot_id # Bot
)
self._instances[cache_key] = mem
# 设置会话
self._instances[cache_key].set_session(session_id)
return self._instances[cache_key]
2. MemoriMiddleware (agent/memori_middleware.py)
class MemoriMiddleware(AgentMiddleware):
"""Agent 记忆中间件"""
def __init__(self, memori: Memori, config: MemoriConfig):
self.memori = memori
self.config = config
async def abefore_agent(
self,
state: AgentState,
runtime: AgentRuntime
) -> AgentState:
"""Agent 执行前:召回相关记忆"""
query = state["messages"][-1]["content"]
# 语义搜索
memories = self.memori.recall(
query=query,
limit=self.config.semantic_search_top_k
)
# 注入到系统提示
if memories:
memory_context = self._format_memories(memories)
state = self._inject_memory_context(state, memory_context)
return state
async def aafter_agent(
self,
state: AgentState,
runtime: AgentRuntime
) -> AgentState:
"""Agent 执行后:后台提取记忆(非阻塞)"""
# 触发后台增强
asyncio.create_task(self._async_augment(state))
return state
3. 配置扩展 (agent/agent_config.py)
@dataclass
class AgentConfig:
# ... 现有字段 ...
# Memori 配置
enable_memori: bool = False
memori_api_key: Optional[str] = None
memori_semantic_search_top_k: int = 5
集成点修改
init_agent() 函数修改 (agent/deep_assistant.py)
async def init_agent(config: AgentConfig) -> tuple[agent, checkpointer]:
# ... 现有代码 ...
# 新增:Memori 初始化
memori_middleware = None
if config.enable_memori:
memori_manager = get_memori_manager()
memori = await memori_manager.get_memori(
bot_id=config.bot_id,
user_identifier=config.user_identifier,
session_id=config.session_id
)
memori_config = MemoriConfig(
semantic_search_top_k=config.memori_semantic_search_top_k,
)
memori_middleware = MemoriMiddleware(memori, memori_config)
# 中间件顺序调整
middleware = [
ToolUseCleanupMiddleware(),
ToolOutputLengthMiddleware(),
# Memori 在指南之前注入记忆
*(memori_middleware,) if memori_middleware else (),
]
if config.enable_thinking:
middleware.append(GuidelineMiddleware(...))
# ... 其余代码 ...
技术考虑
多租户隔离
| 隔离级别 | 字段 | 用途 |
|---|---|---|
| 实体级 | entity_id = user_identifier |
用户数据隔离 |
| 进程级 | process_id = bot_id |
Bot/Agent 隔离 |
| 会话级 | session_id |
对话隔离 |
查询模式:
-- 用户在某 Bot 下的所有记忆
SELECT * FROM memori_entity_fact
WHERE entity_id = :user_identifier
AND process_id = :bot_id;
-- 用户在特定会话的记忆
SELECT * FROM memori_entity_fact
WHERE entity_id = :user_identifier
AND process_id = :bot_id
AND session_id = :session_id;
中间件执行顺序
ToolUseCleanupMiddleware → 清理孤立的 tool_use 块
↓
MemoriMiddleware → 召回相关记忆,注入上下文
↓
ToolOutputLengthMiddleware → 控制工具输出长度
↓
GuidelineMiddleware → 添加思考(如果启用)
↓
Agent 执行
↓
MemoriMiddleware → 后台提取新记忆(非阻塞)
性能考量
| 操作 | 目标延迟 | 缓存策略 |
|---|---|---|
| 记忆召回 | <200ms p95 | Redis 常用查询缓存 |
| 记忆存储 | 后台异步 | 批量写入 |
| 向量搜索 | <100ms | FAISS 索引 |
安全考虑
- PII 检测 - 存储前过滤敏感信息(邮箱、电话、信用卡)
- 数据隔离 - 应用层 WHERE 子句强制租户隔离
- GDPR 合规 - 提供记忆删除/导出 API
验收标准
功能需求
-
基本记忆功能
- Agent 能记住用户跨会话提供的信息(姓名、偏好等)
- 支持语义搜索召回相关历史记忆
- 记忆在后台异步提取,不阻塞响应
-
多租户支持
- 用户数据按
user_identifier隔离 - Bot 数据按
bot_id隔离 - 会话数据按
session_id隔离
- 用户数据按
-
配置控制
enable_memori开关控制功能启用- 可配置召回记忆数量 (
semantic_search_top_k)
非功能需求
-
性能
- 记忆召回延迟 <200ms p95
- 不增加 Agent 首字响应延迟
-
可靠性
- Memori 服务故障时 Agent 仍可工作(降级模式)
- 连接池管理和重试机制
-
安全
- PII 检测和过滤
- 租户数据隔离验证
-
测试
- 单元测试覆盖率 >80%
- 集成测试覆盖主要用户流程
成功指标
| 指标 | 测量方法 | 目标 |
|---|---|---|
| 记忆召回准确率 | 用户反馈/A/B测试 | >80% 相关度 |
| 记忆提取覆盖率 | 对话分析 | >60% 关键信息 |
| 响应延迟影响 | 延迟对比 | <50ms 增加 |
| 用户满意度 | 反馈评分 | >4.0/5.0 |
依赖关系与风险
依赖项
| 依赖 | 类型 | 状态 |
|---|---|---|
memori Python 包 |
外部 | 需添加 |
| PostgreSQL 数据库 | 内部 | ✅ 已有 |
| 连接池管理 | 内部 | ✅ 已有 |
| Agent 中间件框架 | 内部 | ✅ 已有 |
风险分析
| 风险 | 影响 | 缓解措施 |
|---|---|---|
| Memori API 配额超限 | 记忆提取失败 | 实现降级模式,监控配额 |
| 多租户数据泄露 | 安全问题 | 应用层强制隔离,添加测试 |
| 语义搜索延迟高 | UX 下降 | 添加缓存,设置超时 |
| 与 AgentMemoryMiddleware 冲突 | 功能异常 | 明确职责分离,集成测试 |
实施阶段
Phase 1: 技术验证
- 设置 Memori 开发环境
- 创建 PoC 验证基本功能
- 数据库 Schema 兼容性检查
- 中间件集成原型
Phase 2: 核心实现
- 实现
MemoriManager - 实现
MemoriMiddleware - 修改
init_agent()集成逻辑 - 添加配置到
AgentConfig和settings.py
Phase 3: 测试与优化
- 单元测试
- 集成测试
- 性能基准测试
- 安全测试
Phase 4: 逐步上线
- 添加功能开关
- 内部测试 (1% 流量)
- Beta 用户 (10% 流量)
- 全量上线
文件清单
新建文件
qwen-agent/
├── agent/
│ ├── memori_manager.py # Memori 连接管理
│ ├── memori_middleware.py # Agent 中间件实现
│ └── memori_config.py # 配置数据类
├── tests/
│ ├── test_memori_manager.py # 单元测试
│ ├── test_memori_middleware.py # 中间件测试
│ └── test_memori_integration.py # 集成测试
└── utils/
└── settings.py # [修改] 添加 MEMORI_* 配置
修改文件
qwen-agent/
├── agent/
│ ├── deep_assistant.py # [修改] init_agent() 集成 Memori
│ └── agent_config.py # [修改] 添加 memori 配置字段
└── pyproject.toml # [修改] 添加 memori 依赖
配置示例
环境变量 (utils/settings.py)
# Memori 配置
MEMORI_ENABLED = os.getenv("MEMORI_ENABLED", "true") == "true"
MEMORI_API_KEY = os.getenv("MEMORI_API_KEY", "")
MEMORI_SEMANTIC_SEARCH_TOP_K = int(os.getenv("MEMORI_SEMANTIC_SEARCH_TOP_K", "5"))
MEMORI_EMBEDDING_MODEL = os.getenv("MEMORI_EMBEDDING_MODEL", "paraphrase-multilingual-MiniLM-L12-v2")
依赖添加 (pyproject.toml)
[tool.poetry.dependencies]
memori = "^3.1.0"
待解决问题
优先级 1: 关键问题
- 多租户隔离执行位置 - 应用层 vs 数据库层 RLS?
- 连接池共享 - 与 CheckpointerManager 共享还是独立?
- async/await 兼容性 - Memori 的同步 wait() 如何与 FastAPI 集成?
- PII 检测策略 - 存储前/后检测,允许/拒绝列表?
- 中间件执行顺序 - 与 GuidelineMiddleware/SummarizationMiddleware 的相对顺序
优先级 2: 重要问题
- 功能开关策略 - 请求级/会话级/全局?
- 延迟预算 - 记忆召回目标延迟?
- AgentMemoryMiddleware 兼容性 - 如何避免冲突?
- 多语言嵌入模型 - 使用哪个模型支持 ja/zh/en?
- 记忆保留策略 - 保留多久,如何归档?
参考资料
内部参考
- Agent 创建:
/Users/moshui/Documents/felo/qwen-agent/agent/deep_assistant.py - Agent 配置:
/Users/moshui/Documents/felo/qwen-agent/agent/agent_config.py - 中间件基类:
deepagents.agent.AgentMiddleware - Checkpointer:
/Users/moshui/Documents/felo/qwen-agent/agent/checkpoint_manager.py