qwen_agent/plans/memori-integration.md
朱潮 f694101747 refactor: migrate from Memori to Mem0 for long-term memory
Replace Memori with Mem0 for memory management:
- Delete memori_config.py, memori_manager.py, memori_middleware.py
- Add mem0_config.py, mem0_manager.py, mem0_middleware.py
- Update environment variables (MEMORI_* -> MEM0_*)
- Integrate Mem0 with LangGraph middleware
- Add sync connection pool for Mem0 in DBPoolManager
- Move checkpoint message prep to config creation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 21:15:30 +08:00

12 KiB
Raw Permalink Blame History

feat: Memori 长期记忆系统集成

概述

将 Memori (https://github.com/MemoriLabs/Memori) 的 SQL 原生长期记忆能力集成到现有 Agent 系统 (create_agentcreate_deep_agent) 中,为用户提供跨会话的持久化记忆功能。

背景

当前状态

系统目前使用两种 Agent 类型:

  • general_agent: 基于 langchain.agents.create_agent
  • deep_agent: 基于 deepagents.create_deep_agent

现有的记忆管理:

  • CheckpointerManager: PostgreSQL checkpoint 持久化
  • ChatHistoryManager: 聊天记录存储
  • AgentMemoryCacheManager: MCP 工具缓存

痛点

  • Agent 无法跨会话记住用户偏好、历史事实
  • 没有语义搜索能力来检索相关历史信息
  • 需要手动管理记忆提取和存储逻辑

Memori 简介

Memori 是一个开源 Python SDK提供

  • 自动对话跟踪 - 捕获所有 LLM 交互
  • 高级增强 - AI 驱动的事实、偏好、技能、事件提取
  • 语义搜索 - 基于 FAISS/向量嵌入的召回
  • 零延迟 - 后台处理不阻塞响应
  • 基础设施无关 - 支持 SQLite/PostgreSQL/MySQL/MongoDB

拟议方案

架构设计

graph TB
    subgraph "FastAPI 应用"
        A[AgentConfig] --> B[init_agent]
        B --> C{robot_type}
        C -->|general_agent| D[create_agent]
        C -->|deep_agent| E[create_deep_agent]
    end

    subgraph "Memori 集成层"
        F[MemoriManager] --> G[MemoriMiddleware]
        G --> H[before_agent - 记忆召回]
        G --> I[after_agent - 记忆存储]
    end

    subgraph "数据存储"
        J[(PostgreSQL)]
        J1[memori_* 表]
        J2[checkpoints 表]
        J3[chat_messages 表]
        J --> J1
        J --> J2
        J --> J3
    end

    D --> G
    E --> G
    F --> J

核心组件

1. MemoriManager (agent/memori_manager.py)

class MemoriManager:
    """Memori 连接和实例管理"""

    def __init__(self, db_pool: AsyncConnectionPool):
        self.pool = db_pool
        self._instances: Dict[str, Memori] = {}

    async def get_memori(
        self,
        bot_id: str,
        user_identifier: str,
        session_id: str
    ) -> Memori:
        """获取或创建 Memori 实例"""
        cache_key = f"{bot_id}:{user_identifier}"

        if cache_key not in self._instances:
            # 创建 Memori 实例
            mem = Memori(conn=self._get_session())
            mem.attribution(
                entity_id=user_identifier,  # 用户
                process_id=bot_id           # Bot
            )
            self._instances[cache_key] = mem

        # 设置会话
        self._instances[cache_key].set_session(session_id)
        return self._instances[cache_key]

2. MemoriMiddleware (agent/memori_middleware.py)

class MemoriMiddleware(AgentMiddleware):
    """Agent 记忆中间件"""

    def __init__(self, memori: Memori, config: MemoriConfig):
        self.memori = memori
        self.config = config

    async def abefore_agent(
        self,
        state: AgentState,
        runtime: AgentRuntime
    ) -> AgentState:
        """Agent 执行前:召回相关记忆"""
        query = state["messages"][-1]["content"]

        # 语义搜索
        memories = self.memori.recall(
            query=query,
            limit=self.config.semantic_search_top_k
        )

        # 注入到系统提示
        if memories:
            memory_context = self._format_memories(memories)
            state = self._inject_memory_context(state, memory_context)

        return state

    async def aafter_agent(
        self,
        state: AgentState,
        runtime: AgentRuntime
    ) -> AgentState:
        """Agent 执行后:后台提取记忆(非阻塞)"""
        # 触发后台增强
        asyncio.create_task(self._async_augment(state))
        return state

3. 配置扩展 (agent/agent_config.py)

@dataclass
class AgentConfig:
    # ... 现有字段 ...

    # Memori 配置
    enable_memori: bool = False
    memori_api_key: Optional[str] = None
    memori_semantic_search_top_k: int = 5

集成点修改

init_agent() 函数修改 (agent/deep_assistant.py)

async def init_agent(config: AgentConfig) -> tuple[agent, checkpointer]:
    # ... 现有代码 ...

    # 新增Memori 初始化
    memori_middleware = None
    if config.enable_memori:
        memori_manager = get_memori_manager()
        memori = await memori_manager.get_memori(
            bot_id=config.bot_id,
            user_identifier=config.user_identifier,
            session_id=config.session_id
        )

        memori_config = MemoriConfig(
            semantic_search_top_k=config.memori_semantic_search_top_k,
        )
        memori_middleware = MemoriMiddleware(memori, memori_config)

    # 中间件顺序调整
    middleware = [
        ToolUseCleanupMiddleware(),
        ToolOutputLengthMiddleware(),
        # Memori 在指南之前注入记忆
        *(memori_middleware,) if memori_middleware else (),
    ]

    if config.enable_thinking:
        middleware.append(GuidelineMiddleware(...))

    # ... 其余代码 ...

技术考虑

多租户隔离

隔离级别 字段 用途
实体级 entity_id = user_identifier 用户数据隔离
进程级 process_id = bot_id Bot/Agent 隔离
会话级 session_id 对话隔离

查询模式

-- 用户在某 Bot 下的所有记忆
SELECT * FROM memori_entity_fact
WHERE entity_id = :user_identifier
AND process_id = :bot_id;

-- 用户在特定会话的记忆
SELECT * FROM memori_entity_fact
WHERE entity_id = :user_identifier
AND process_id = :bot_id
AND session_id = :session_id;

中间件执行顺序

ToolUseCleanupMiddleware → 清理孤立的 tool_use 块
         ↓
MemoriMiddleware → 召回相关记忆,注入上下文
         ↓
ToolOutputLengthMiddleware → 控制工具输出长度
         ↓
GuidelineMiddleware → 添加思考(如果启用)
         ↓
Agent 执行
         ↓
MemoriMiddleware → 后台提取新记忆(非阻塞)

性能考量

操作 目标延迟 缓存策略
记忆召回 <200ms p95 Redis 常用查询缓存
记忆存储 后台异步 批量写入
向量搜索 <100ms FAISS 索引

安全考虑

  1. PII 检测 - 存储前过滤敏感信息(邮箱、电话、信用卡)
  2. 数据隔离 - 应用层 WHERE 子句强制租户隔离
  3. GDPR 合规 - 提供记忆删除/导出 API

验收标准

功能需求

  • 基本记忆功能

    • Agent 能记住用户跨会话提供的信息(姓名、偏好等)
    • 支持语义搜索召回相关历史记忆
    • 记忆在后台异步提取,不阻塞响应
  • 多租户支持

    • 用户数据按 user_identifier 隔离
    • Bot 数据按 bot_id 隔离
    • 会话数据按 session_id 隔离
  • 配置控制

    • enable_memori 开关控制功能启用
    • 可配置召回记忆数量 (semantic_search_top_k)

非功能需求

  • 性能

    • 记忆召回延迟 <200ms p95
    • 不增加 Agent 首字响应延迟
  • 可靠性

    • Memori 服务故障时 Agent 仍可工作(降级模式)
    • 连接池管理和重试机制
  • 安全

    • PII 检测和过滤
    • 租户数据隔离验证
  • 测试

    • 单元测试覆盖率 >80%
    • 集成测试覆盖主要用户流程

成功指标

指标 测量方法 目标
记忆召回准确率 用户反馈/A/B测试 >80% 相关度
记忆提取覆盖率 对话分析 >60% 关键信息
响应延迟影响 延迟对比 <50ms 增加
用户满意度 反馈评分 >4.0/5.0

依赖关系与风险

依赖项

依赖 类型 状态
memori Python 包 外部 需添加
PostgreSQL 数据库 内部 已有
连接池管理 内部 已有
Agent 中间件框架 内部 已有

风险分析

风险 影响 缓解措施
Memori API 配额超限 记忆提取失败 实现降级模式,监控配额
多租户数据泄露 安全问题 应用层强制隔离,添加测试
语义搜索延迟高 UX 下降 添加缓存,设置超时
与 AgentMemoryMiddleware 冲突 功能异常 明确职责分离,集成测试

实施阶段

Phase 1: 技术验证

  • 设置 Memori 开发环境
  • 创建 PoC 验证基本功能
  • 数据库 Schema 兼容性检查
  • 中间件集成原型

Phase 2: 核心实现

  • 实现 MemoriManager
  • 实现 MemoriMiddleware
  • 修改 init_agent() 集成逻辑
  • 添加配置到 AgentConfigsettings.py

Phase 3: 测试与优化

  • 单元测试
  • 集成测试
  • 性能基准测试
  • 安全测试

Phase 4: 逐步上线

  • 添加功能开关
  • 内部测试 (1% 流量)
  • Beta 用户 (10% 流量)
  • 全量上线

文件清单

新建文件

qwen-agent/
├── agent/
│   ├── memori_manager.py          # Memori 连接管理
│   ├── memori_middleware.py       # Agent 中间件实现
│   └── memori_config.py           # 配置数据类
├── tests/
│   ├── test_memori_manager.py     # 单元测试
│   ├── test_memori_middleware.py  # 中间件测试
│   └── test_memori_integration.py # 集成测试
└── utils/
    └── settings.py                # [修改] 添加 MEMORI_* 配置

修改文件

qwen-agent/
├── agent/
│   ├── deep_assistant.py          # [修改] init_agent() 集成 Memori
│   └── agent_config.py            # [修改] 添加 memori 配置字段
└── pyproject.toml                 # [修改] 添加 memori 依赖

配置示例

环境变量 (utils/settings.py)

# Memori 配置
MEMORI_ENABLED = os.getenv("MEMORI_ENABLED", "true") == "true"
MEMORI_API_KEY = os.getenv("MEMORI_API_KEY", "")
MEMORI_SEMANTIC_SEARCH_TOP_K = int(os.getenv("MEMORI_SEMANTIC_SEARCH_TOP_K", "5"))
MEMORI_EMBEDDING_MODEL = os.getenv("MEMORI_EMBEDDING_MODEL", "paraphrase-multilingual-MiniLM-L12-v2")

依赖添加 (pyproject.toml)

[tool.poetry.dependencies]
memori = "^3.1.0"

待解决问题

优先级 1: 关键问题

  1. 多租户隔离执行位置 - 应用层 vs 数据库层 RLS
  2. 连接池共享 - 与 CheckpointerManager 共享还是独立?
  3. async/await 兼容性 - Memori 的同步 wait() 如何与 FastAPI 集成?
  4. PII 检测策略 - 存储前/后检测,允许/拒绝列表?
  5. 中间件执行顺序 - 与 GuidelineMiddleware/SummarizationMiddleware 的相对顺序

优先级 2: 重要问题

  1. 功能开关策略 - 请求级/会话级/全局?
  2. 延迟预算 - 记忆召回目标延迟?
  3. AgentMemoryMiddleware 兼容性 - 如何避免冲突?
  4. 多语言嵌入模型 - 使用哪个模型支持 ja/zh/en
  5. 记忆保留策略 - 保留多久,如何归档?

参考资料

内部参考

  • Agent 创建: /Users/moshui/Documents/felo/qwen-agent/agent/deep_assistant.py
  • Agent 配置: /Users/moshui/Documents/felo/qwen-agent/agent/agent_config.py
  • 中间件基类: deepagents.agent.AgentMiddleware
  • Checkpointer: /Users/moshui/Documents/felo/qwen-agent/agent/checkpoint_manager.py

外部参考