qwen_agent/agent/deep_assistant.py
2026-03-04 14:08:51 +08:00

502 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import time
import copy
import os
import tempfile
from pathlib import Path
from typing import Any, Dict
from langchain.chat_models import init_chat_model
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, LocalShellBackend
from deepagents.backends.filesystem import FilesystemBackend
from deepagents.backends.sandbox import SandboxBackendProtocol
from deepagents_cli.agent import create_cli_agent
from langchain.agents import create_agent
from langgraph.store.base import BaseStore
from langchain.agents.middleware import SummarizationMiddleware as LangchainSummarizationMiddleware
from .summarization_middleware import SummarizationMiddleware
from langchain_mcp_adapters.client import MultiServerMCPClient
from sympy.printing.cxx import none
from utils.fastapi_utils import detect_provider
from .guideline_middleware import GuidelineMiddleware
from .tool_output_length_middleware import ToolOutputLengthMiddleware
from .tool_use_cleanup_middleware import ToolUseCleanupMiddleware
from utils.settings import (
SUMMARIZATION_MAX_TOKENS,
SUMMARIZATION_TOKENS_TO_KEEP,
TOOL_OUTPUT_MAX_LENGTH,
MCP_HTTP_TIMEOUT,
MCP_SSE_READ_TIMEOUT,
DEFAULT_TRIM_TOKEN_LIMIT
)
from utils.token_counter import create_token_counter
from agent.agent_config import AgentConfig
from .mem0_manager import get_mem0_manager
from .mem0_middleware import create_mem0_middleware
from .mem0_config import Mem0Config
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
from agent.agent_memory_cache import get_memory_cache_manager
from .checkpoint_manager import get_checkpointer_manager
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import BaseTool
from langchain_core.language_models import BaseChatModel
from langgraph.pregel import Pregel
# 新版本导入MemoryMiddleware 和 SkillsMiddleware 已迁移到 deepagents.middleware
from deepagents.middleware import MemoryMiddleware, SkillsMiddleware
from langchain.agents.middleware import AgentMiddleware
from langgraph.types import Checkpointer
from deepagents_cli.config import settings, get_default_coding_instructions
from langchain.agents.middleware import HumanInTheLoopMiddleware, InterruptOnConfig, TodoListMiddleware
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware
from deepagents.graph import BASE_AGENT_PROMPT
# 新增LocalContextMiddleware
from deepagents_cli.local_context import LocalContextMiddleware
# 自定义:支持 SKILL.md 完整读取的 FilesystemMiddleware
from .custom_filesystem_middleware import CustomFilesystemMiddleware
# 全局 MemorySaver 实例
# from langgraph.checkpoint.memory import MemorySaver
# _global_checkpointer = MemorySaver()
logger = logging.getLogger('app')
# Utility functions
def read_system_prompt():
"""读取通用的无状态系统prompt"""
with open("./prompt/system_prompt_default.md", "r", encoding="utf-8") as f:
return f.read().strip()
def read_mcp_settings():
"""读取MCP工具配置"""
with open("./mcp/mcp_settings.json", "r") as f:
mcp_settings_json = json.load(f)
return mcp_settings_json
async def get_tools_from_mcp(mcp):
"""从MCP配置中提取工具带缓存"""
start_time = time.time()
# 防御式处理:确保 mcp 是列表且长度大于 0且包含 mcpServers
if not isinstance(mcp, list) or len(mcp) == 0 or "mcpServers" not in mcp[0]:
logger.info(f"get_tools_from_mcp: invalid mcp config, elapsed: {time.time() - start_time:.3f}s")
return []
# 尝试从缓存获取
cache_manager = get_memory_cache_manager()
cached_tools = cache_manager.get_mcp_tools(mcp)
if cached_tools is not None:
logger.info(f"get_tools_from_mcp: cached {len(cached_tools)} tools, elapsed: {time.time() - start_time:.3f}s")
return cached_tools
# 深拷贝 mcp 配置,避免修改原始配置(影响缓存键)
mcp_copy = copy.deepcopy(mcp)
# 修改 mcp_copy[0]["mcpServers"] 列表,把 type 字段改成 transport
# 如果没有 transport则根据是否存在 url 默认 transport 为 http 或 stdio
for cfg in mcp_copy[0]["mcpServers"].values():
if "type" in cfg:
cfg.pop("type")
if "transport" not in cfg:
cfg["transport"] = "http" if "url" in cfg else "stdio"
# 为 HTTP/ SSE 传输的 MCP 服务器添加超时配置
# 如果配置中未设置超时,使用全局默认值
if cfg.get("transport") in ("http", "sse"):
if "timeout" not in cfg:
cfg["timeout"] = MCP_HTTP_TIMEOUT
if "sse_read_timeout" not in cfg:
cfg["sse_read_timeout"] = MCP_SSE_READ_TIMEOUT
# 确保 mcp_copy[0]["mcpServers"] 是字典类型
if not isinstance(mcp_copy[0]["mcpServers"], dict):
logger.info(f"get_tools_from_mcp: mcpServers is not dict, elapsed: {time.time() - start_time:.3f}s")
return []
try:
mcp_client = MultiServerMCPClient(mcp_copy[0]["mcpServers"])
mcp_tools = await mcp_client.get_tools()
# 缓存结果
cache_manager.set_mcp_tools(mcp, mcp_tools)
logger.info(f"get_tools_from_mcp: loaded {len(mcp_tools)} tools, elapsed: {time.time() - start_time:.3f}s")
return mcp_tools
except Exception as e:
# 发生异常时返回空列表,避免上层调用报错
logger.error(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s")
return []
async def init_agent(config: AgentConfig):
"""
初始化 Agent支持持久化内存和对话摘要
注意:不再缓存 agent只缓存 mcp_tools
返回 (agent, checkpointer) 元组,调用后需要归还 checkpointer
Args:
config: AgentConfig 对象,包含所有初始化参数
Returns:
(agent, checkpointer) 元组
"""
# 加载配置
final_system_prompt = await load_system_prompt_async(config)
final_mcp_settings = await load_mcp_settings_async(config)
# 如果没有提供mcp使用config中的mcp_settings
mcp_settings = final_mcp_settings if final_mcp_settings else read_mcp_settings()
system_prompt = final_system_prompt if final_system_prompt else read_system_prompt()
config.system_prompt = mcp_settings
config.mcp_settings = system_prompt
# 获取 mcp_tools缓存逻辑已内置到 get_tools_from_mcp 中)
mcp_tools = await get_tools_from_mcp(mcp_settings)
logger.info(f"Loaded {len(mcp_tools)} MCP tools")
# 检测或使用指定的提供商
model_provider, base_url = detect_provider(config.model_name, config.model_server)
# 构建模型参数
model_kwargs = {
"model": config.model_name,
"model_provider": model_provider,
"temperature": 0.8,
"base_url": base_url,
"api_key": config.api_key
}
if config.generate_cfg:
# 内部使用的参数,不应传给任何 LLM
internal_params = {
'tool_output_max_length',
'tool_output_truncation_strategy',
'tool_output_filters',
'tool_output_exclude',
'preserve_code_blocks',
'preserve_json',
}
# Anthropic 不支持的 OpenAI 特有参数
openai_only_params = {
'n', # 生成多少个响应
'presence_penalty',
'frequency_penalty',
'logprobs',
'top_logprobs',
'logit_bias',
'seed',
'suffix',
'best_of',
'echo',
'user',
}
# 根据提供商决定需要过滤的参数
params_to_filter = internal_params.copy()
if model_provider == 'anthropic':
params_to_filter.update(openai_only_params)
filtered_cfg = {k: v for k, v in config.generate_cfg.items() if k not in params_to_filter}
model_kwargs.update(filtered_cfg)
llm_instance = init_chat_model(**model_kwargs)
# 创建新的 agent不再缓存
logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}")
checkpointer = None
create_start = time.time()
# 从连接池获取 checkpointerprepare_checkpoint_message 已在 from_v1/from_v2_request 中调用)
if config.session_id:
try:
manager = get_checkpointer_manager()
checkpointer = manager.checkpointer
except Exception as e:
logger.warning(f"Failed to load checkpointer: {e}")
# 构建中间件列表
middleware = []
# 首先添加 ToolUseCleanupMiddleware 来清理孤立的 tool_use
middleware.append(ToolUseCleanupMiddleware())
# 添加工具输出长度控制中间件
tool_output_middleware = ToolOutputLengthMiddleware(
max_length=(getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None) or TOOL_OUTPUT_MAX_LENGTH,
truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart',
tool_filters=getattr(config.generate_cfg, 'tool_output_filters', None) if config.generate_cfg else None,
exclude_tools=getattr(config.generate_cfg, 'tool_output_exclude', []) if config.generate_cfg else [],
preserve_code_blocks=getattr(config.generate_cfg, 'preserve_code_blocks', True) if config.generate_cfg else True,
preserve_json=getattr(config.generate_cfg, 'preserve_json', True) if config.generate_cfg else True
)
middleware.append(tool_output_middleware)
# 添加 Mem0 记忆中间件(如果启用)
if config.enable_memori:
try:
# 确保有 user_identifier
if not config.user_identifier:
logger.warning("Mem0 enabled but user_identifier is missing, skipping Mem0")
else:
# 获取全局 Mem0Manager已在 fastapi_app.py 中初始化)
mem0_manager = get_mem0_manager()
# 创建 Mem0 中间件,传入现有的 llm_instance 和 config
mem0_middleware = create_mem0_middleware(
bot_id=config.bot_id,
user_identifier=config.user_identifier,
session_id=config.session_id or "default",
agent_config=config, # 传入 AgentConfig 用于中间件间传递数据
enabled=config.enable_memori,
semantic_search_top_k=config.memori_semantic_search_top_k,
mem0_manager=mem0_manager,
llm_instance=llm_instance, # 传入现有 LLM 实例
)
if mem0_middleware:
middleware.append(mem0_middleware)
logger.info("Mem0 middleware added to agent")
except Exception as e:
logger.error(f"Failed to create Mem0 middleware: {e}, continuing without Mem0")
# 只有在 enable_thinking 为 True 时才添加 GuidelineMiddleware
if config.enable_thinking:
middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt))
summarization_middleware = SummarizationMiddleware(
model=llm_instance,
trigger=('tokens', SUMMARIZATION_MAX_TOKENS),
trim_tokens_to_summarize=DEFAULT_TRIM_TOKEN_LIMIT,
keep=('tokens', SUMMARIZATION_TOKENS_TO_KEEP),
token_counter=create_token_counter(config.model_name)
)
middleware.append(summarization_middleware)
workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id)
# workspace_root = str(Path.home() / ".deepagents" / config.bot_id)
agent, composite_backend = create_custom_cli_agent(
model=llm_instance,
assistant_id=config.bot_id,
system_prompt=system_prompt,
tools=mcp_tools,
auto_approve=True,
workspace_root=workspace_root,
middleware=middleware,
checkpointer=checkpointer,
shell_env={
"ASSISTANT_ID": config.bot_id,
"USER_IDENTIFIER": config.user_identifier,
"TRACE_ID": config.trace_id
}
)
logger.info(f"create agent elapsed: {time.time() - create_start:.3f}s")
return agent, checkpointer
class CustomSkillsMiddleware(SkillsMiddleware):
"""自定义的 SkillsMiddleware使用新的签名格式"""
def before_agent(self, state, runtime, config):
"""Load skills metadata before agent execution.
修改路径显示为相对路径。
Args:
state: Current agent state.
runtime: Runtime context.
Returns:
Updated state with skills_metadata populated.
"""
if "skills_metadata" in state and len(state["skills_metadata"]) ==0 :
del state["skills_metadata"]
state = super().before_agent(state, runtime, config)
return state
async def abefore_agent(self, state, runtime, config):
"""Load skills metadata before agent execution.
修改路径显示为相对路径。
Args:
state: Current agent state.
runtime: Runtime context.
Returns:
Updated state with skills_metadata populated.
"""
if "skills_metadata" in state and len(state["skills_metadata"]) ==0 :
del state["skills_metadata"]
state = await super().abefore_agent(state, runtime, config)
return state
def create_custom_cli_agent(
model: str | BaseChatModel,
assistant_id: str,
*,
tools: list[BaseTool] | None = None,
sandbox: SandboxBackendProtocol | None = None,
sandbox_type: str | None = None,
system_prompt: str | None = None,
auto_approve: bool = False,
enable_skills: bool = True,
enable_shell: bool = True,
middleware: list[AgentMiddleware] = [],
workspace_root: str | None = None,
checkpointer: Checkpointer | None = None,
store: BaseStore | None = None,
shell_env: dict[str, str] | None = None,
) -> tuple[Pregel, CompositeBackend]:
"""Create a CLI-configured agent with custom workspace_root for shell commands.
This is a custom version of create_cli_agent that allows specifying a custom
workspace_root for shell commands instead of using Path.cwd().
Args:
model: LLM model to use (e.g., "anthropic:claude-sonnet-4-5-20250929")
assistant_id: Agent identifier for memory/state storage
tools: Additional tools to provide to agent (default: empty list)
sandbox: Optional sandbox backend for remote execution (e.g., ModalBackend).
If None, uses local filesystem + shell.
sandbox_type: Type of sandbox provider ("modal", "runloop", "daytona").
Used for system prompt generation.
system_prompt: Override the default system prompt. If None, generates one
based on sandbox_type and assistant_id.
auto_approve: If True, automatically approves all tool calls without human
confirmation. Useful for automated workflows.
enable_memory: Enable AgentMemoryMiddleware for persistent memory
enable_skills: Enable SkillsMiddleware for custom agent skills
enable_shell: Enable ShellMiddleware for local shell execution (only in local mode)
workspace_root: Working directory for shell commands. If None, uses Path.cwd().
checkpointer: Optional checkpointer for persisting conversation state
store: Optional BaseStore for persisting user preferences and agent memory
shell_env: Optional custom environment variables to pass to ShellMiddleware.
These will be merged with os.environ. Custom vars take precedence.
Returns:
2-tuple of (agent_graph, composite_backend)
- agent_graph: Configured LangGraph Pregel instance ready for execution
- composite_backend: CompositeBackend for file operations
"""
if tools is None:
tools = []
# Build middleware stack based on enabled features
agent_middleware = middleware
# Prepare workspace root
if workspace_root is None:
workspace_root = str(Path.cwd())
# CONDITIONAL SETUP: Local vs Remote Sandbox
if sandbox is None:
# ========== LOCAL MODE ==========
if enable_shell:
# Create environment for shell commands
final_shell_env = os.environ.copy()
if shell_env:
final_shell_env.update(shell_env)
# Use LocalShellBackend for filesystem + shell execution
backend = LocalShellBackend(
root_dir=workspace_root,
inherit_env=True,
env=final_shell_env,
)
else:
# No shell access - use plain FilesystemBackend
backend = FilesystemBackend(root_dir=workspace_root)
# Set up composite backend with routing (参考新版本实现)
large_results_backend = FilesystemBackend(
root_dir=tempfile.mkdtemp(prefix="deepagents_large_results_"),
)
conversation_history_backend = FilesystemBackend(
root_dir=tempfile.mkdtemp(prefix="deepagents_conversation_history_"),
)
composite_backend = CompositeBackend(
default=backend,
routes={
"/large_tool_results/": large_results_backend,
"/conversation_history/": conversation_history_backend,
},
)
# Add skills middleware (using new signature)
if enable_skills:
skills_sources = ["./skills"]
agent_middleware.append(
CustomSkillsMiddleware(
backend=FilesystemBackend(root_dir=workspace_root),
sources=skills_sources,
)
)
# Add LocalContextMiddleware (new in latest version)
# Check if backend supports execute (is _ExecutableBackend)
if enable_shell:
from deepagents_cli.local_context import LocalContextMiddleware, _ExecutableBackend
if isinstance(backend, _ExecutableBackend):
agent_middleware.append(LocalContextMiddleware(backend=backend))
else:
# ========== REMOTE SANDBOX MODE ==========
composite_backend = CompositeBackend(
default=sandbox, # Remote sandbox (ModalBackend, etc.)
routes={}, # No virtualization
)
# Add skills middleware
if enable_skills:
skills_sources = ["/skills"]
agent_middleware.append(
CustomSkillsMiddleware(
backend=sandbox,
sources=skills_sources,
)
)
# Note: Shell middleware not used in sandbox mode
# File operations and execute tool are provided by the sandbox backend
# Get or use custom system prompt
if system_prompt is None:
from deepagents_cli.agent import get_system_prompt as _get_system_prompt
system_prompt = _get_system_prompt(assistant_id=assistant_id, sandbox_type=sandbox_type)
# Configure interrupt_on based on auto_approve setting
if auto_approve:
# No interrupts - all tools run automatically
interrupt_on = {}
else:
# Full HITL for destructive operations
from deepagents_cli.agent import _add_interrupt_on
interrupt_on = _add_interrupt_on()
deepagent_middleware = [
TodoListMiddleware(),
CustomFilesystemMiddleware(backend=composite_backend), # 使用自定义的 FilesystemMiddleware支持 SKILL.md 完整读取
AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"),
PatchToolCallsMiddleware(),
]
if agent_middleware:
deepagent_middleware.extend(agent_middleware)
if interrupt_on is not None:
deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on))
agent = create_agent(
model,
system_prompt=system_prompt + "\n\n" + BASE_AGENT_PROMPT if system_prompt else BASE_AGENT_PROMPT,
tools=tools,
middleware=deepagent_middleware,
checkpointer=checkpointer,
store=store,
).with_config({"recursion_limit": 1000})
return agent, composite_backend