qwen_agent/agent/deep_assistant.py
朱潮 06974e9744 feat: recover tool exceptions into ToolMessage so SSE stream keeps flowing
Add ToolErrorRecoveryMiddleware as the outermost agent middleware so any
tool-call exception (notably MCP ToolException) is converted into a
ToolMessage with status="error" carrying the raw error text. The agent
can then loop once more and reply to the user in natural language about
what failed, instead of bubbling the exception up through agent.astream
and breaking the SSE response in routes/chat.py.

The recovery layer extracts the inner `text="..."` payload out of the MCP
TextContent repr when present, falling back to str(error) otherwise. It
deliberately re-raises asyncio.CancelledError so task cancellation still
propagates, and sits *outside* ToolMetricsMiddleware so the existing
status=error metric is still emitted before recovery kicks in.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-20 13:07:19 +08:00

600 lines
25 KiB
Python

import json
import logging
import time
import copy
import os
import tempfile
import asyncio
from pathlib import Path
from typing import Any, Dict
from langchain.chat_models import init_chat_model
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, LocalShellBackend
from deepagents.backends.filesystem import FilesystemBackend
from deepagents.backends.sandbox import SandboxBackendProtocol
from deepagents_cli.agent import create_cli_agent
from langchain.agents import create_agent
from langgraph.store.base import BaseStore
from langchain.agents.middleware import SummarizationMiddleware as LangchainSummarizationMiddleware
from .summarization_middleware import SummarizationMiddleware
from langchain_mcp_adapters.client import MultiServerMCPClient
from utils.fastapi_utils import detect_provider, sanitize_model_kwargs
from .guideline_middleware import GuidelineMiddleware
from .tool_output_length_middleware import ToolOutputLengthMiddleware
from .tool_use_cleanup_middleware import ToolUseCleanupMiddleware
from .tool_metrics_middleware import ToolMetricsMiddleware
from .tool_error_recovery_middleware import ToolErrorRecoveryMiddleware
from .filepath_fix_middleware import FilePathFixMiddleware
from .mcp_trace_meta import patch_mcp_client_session_trace_meta
from utils.settings import (
SUMMARIZATION_MAX_TOKENS,
SUMMARIZATION_TOKENS_TO_KEEP,
TOOL_OUTPUT_MAX_LENGTH,
MCP_HTTP_TIMEOUT,
MCP_SSE_READ_TIMEOUT,
DEFAULT_TRIM_TOKEN_LIMIT,
DAYTONA_API_KEY,
DAYTONA_SERVER_URL,
DAYTONA_ENABLED,
)
from utils.token_counter import create_token_counter
from agent.agent_config import AgentConfig
from .mem0_manager import get_mem0_manager
from .mem0_middleware import create_mem0_middleware
from .mem0_config import Mem0Config
from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async
from agent.agent_memory_cache import get_memory_cache_manager
from .subagent_loader import load_subagents
from agent.plugin_hook_loader import collect_main_agent_hidden_tools
from .checkpoint_manager import get_checkpointer_manager
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import BaseTool
from langchain_core.language_models import BaseChatModel
from langgraph.pregel import Pregel
# New version imports: MemoryMiddleware and SkillsMiddleware were moved to deepagents.middleware
from deepagents.middleware import MemoryMiddleware, SkillsMiddleware
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from langchain.agents.middleware import HumanInTheLoopMiddleware, InterruptOnConfig, TodoListMiddleware
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.types import Checkpointer
from deepagents_cli.config import settings, get_default_coding_instructions
from deepagents.middleware.filesystem import FilesystemMiddleware
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware
from deepagents.graph import BASE_AGENT_PROMPT
# Added: LocalContextMiddleware
from deepagents_cli.local_context import LocalContextMiddleware
# Custom: FilesystemMiddleware with full SKILL.md reading support
from .custom_filesystem_middleware import CustomFilesystemMiddleware
# Sub-agent support
from deepagents.middleware.subagents import SubAgent, SubAgentMiddleware
# Global MemorySaver instance
# from langgraph.checkpoint.memory import MemorySaver
# _global_checkpointer = MemorySaver()
logger = logging.getLogger('app')
class EmptyResponseRetryMiddleware(AgentMiddleware):
"""Automatically retry when the model returns an empty response with no text and no tool calls."""
MAX_RETRIES = 5
def _is_empty_response(self, result: ModelResponse) -> bool:
"""Determine whether the response is empty."""
if not result.result:
return True
msg = result.result[0]
if not isinstance(msg, AIMessage):
return False
content = msg.content or ""
has_text = bool(content.strip()) if isinstance(content, str) else bool(content)
return not has_text and len(msg.tool_calls) == 0
def wrap_model_call(self, request, handler):
result = handler(request)
retries = 0
while self._is_empty_response(result) and retries < self.MAX_RETRIES:
retries += 1
logger.warning(f"Empty response detected, retrying ({retries}/{self.MAX_RETRIES})")
retry_messages = list(request.messages) + [
HumanMessage(content="Please continue your response.")
]
request = request.override(messages=retry_messages)
result = handler(request)
return result
async def awrap_model_call(self, request, handler):
result = await handler(request)
retries = 0
while self._is_empty_response(result) and retries < self.MAX_RETRIES:
retries += 1
logger.warning(f"Empty response detected, retrying ({retries}/{self.MAX_RETRIES})")
retry_messages = list(request.messages) + [
HumanMessage(content="Please continue your response.")
]
request = request.override(messages=retry_messages)
result = await handler(request)
return result
# Utility functions
def read_system_prompt():
"""Read the shared stateless system prompt."""
with open("./prompt/system_prompt_default.md", "r", encoding="utf-8") as f:
return f.read().strip()
async def get_tools_from_mcp(mcp):
"""Extract tools from MCP configuration with caching."""
patch_mcp_client_session_trace_meta()
start_time = time.time()
# Defensive handling: ensure mcp is a non-empty list containing mcpServers
if not isinstance(mcp, list) or len(mcp) == 0 or "mcpServers" not in mcp[0]:
logger.info(f"get_tools_from_mcp: invalid mcp config, elapsed: {time.time() - start_time:.3f}s")
return []
# Try to load tools from cache
cache_manager = get_memory_cache_manager()
cached_tools = cache_manager.get_mcp_tools(mcp)
if cached_tools is not None:
logger.info(f"get_tools_from_mcp: cached {len(cached_tools)} tools, elapsed: {time.time() - start_time:.3f}s")
return cached_tools
# Deep-copy the MCP config to avoid mutating the original configuration and affecting the cache key.
mcp_copy = copy.deepcopy(mcp)
# Update mcp_copy[0]["mcpServers"] by renaming the type field to transport.
# If transport is missing, default to http when url exists, otherwise stdio.
for cfg in mcp_copy[0]["mcpServers"].values():
if "type" in cfg:
cfg.pop("type")
if "transport" not in cfg:
cfg["transport"] = "http" if "url" in cfg else "stdio"
# Add timeout settings for MCP servers using HTTP or SSE transport.
# If timeout values are not configured, use the global defaults.
if cfg.get("transport") in ("http", "sse"):
if "timeout" not in cfg:
cfg["timeout"] = MCP_HTTP_TIMEOUT
if "sse_read_timeout" not in cfg:
cfg["sse_read_timeout"] = MCP_SSE_READ_TIMEOUT
# Ensure mcp_copy[0]["mcpServers"] is a dictionary.
if not isinstance(mcp_copy[0]["mcpServers"], dict):
logger.info(f"get_tools_from_mcp: mcpServers is not dict, elapsed: {time.time() - start_time:.3f}s")
return []
try:
mcp_client = MultiServerMCPClient(mcp_copy[0]["mcpServers"])
mcp_tools = await mcp_client.get_tools()
# Cache the result
cache_manager.set_mcp_tools(mcp, mcp_tools)
logger.info(f"get_tools_from_mcp: loaded {len(mcp_tools)} tools, elapsed: {time.time() - start_time:.3f}s")
return mcp_tools
except Exception as e:
# Return an empty list on exception to avoid propagating errors to callers.
logger.error(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s")
return []
from utils.daytona_sync import init_daytona_sandbox, sync_sandbox_to_local
async def init_agent(config: AgentConfig):
"""
Initialize the agent with persistent memory and conversation summarization support.
Note: the agent itself is no longer cached; only mcp_tools are cached.
Returns an (agent, checkpointer) tuple, and the caller must release the checkpointer after use.
Args:
config: AgentConfig object containing all initialization parameters
Returns:
(agent, checkpointer) tuple
"""
create_start = time.time()
# Load configuration concurrently
final_system_prompt, final_mcp_settings = await asyncio.gather(
load_system_prompt_async(config),
load_mcp_settings_async(config),
)
logger.info(f"init_agent config loaded, elapsed: {time.time() - create_start:.3f}s")
mcp_settings = final_mcp_settings if final_mcp_settings else []
system_prompt = final_system_prompt if final_system_prompt else read_system_prompt()
config.system_prompt = system_prompt
config.mcp_settings = mcp_settings
workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id)
local_workspace_root = workspace_root
# Run high-cost I/O in parallel: MCP tools loading + Daytona sandbox initialization
mcp_tools_task = asyncio.create_task(get_tools_from_mcp(mcp_settings))
sandbox_task = asyncio.create_task(asyncio.to_thread(init_daytona_sandbox, config.bot_id, local_workspace_root))
# Detect the provider or use the explicitly specified one
model_provider, base_url = detect_provider(config.model_name, config.model_server)
model_kwargs, dropped_params, default_temperature_applied = sanitize_model_kwargs(
model_name=config.model_name,
model_provider=model_provider,
base_url=base_url,
api_key=config.api_key,
generate_cfg=config.generate_cfg,
source="init_agent"
)
if dropped_params:
logger.info(
"init_agent dropped_params=%s model=%s provider=%s default_temperature_applied=%s",
dropped_params,
config.model_name,
model_provider,
default_temperature_applied
)
llm_instance = init_chat_model(**model_kwargs)
# Create a new agent instance without caching
logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}")
checkpointer = None
# Get the checkpointer from the connection pool
# prepare_checkpoint_message has already been called in from_v1/from_v2_request
if config.session_id:
try:
manager = get_checkpointer_manager()
checkpointer = manager.checkpointer
except Exception as e:
logger.warning(f"Failed to load checkpointer: {e}")
# Build the middleware list
middleware = []
middleware.append(EmptyResponseRetryMiddleware())
middleware.append(ToolMetricsMiddleware(config))
middleware.append(ToolUseCleanupMiddleware())
# tool_output_middleware = ToolOutputLengthMiddleware(
# max_length=(getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None) or TOOL_OUTPUT_MAX_LENGTH,
# truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart',
# tool_filters=getattr(config.generate_cfg, 'tool_output_filters', None) if config.generate_cfg else None,
# exclude_tools=getattr(config.generate_cfg, 'tool_output_exclude', []) if config.generate_cfg else [],
# preserve_code_blocks=getattr(config.generate_cfg, 'preserve_code_blocks', True) if config.generate_cfg else True,
# preserve_json=getattr(config.generate_cfg, 'preserve_json', True) if config.generate_cfg else True
# )
# middleware.append(tool_output_middleware)
if config.enable_memori:
try:
if not config.user_identifier:
logger.warning("Mem0 enabled but user_identifier is missing, skipping Mem0")
else:
mem0_manager = get_mem0_manager()
mem0_middleware = create_mem0_middleware(
bot_id=config.bot_id,
user_identifier=config.user_identifier,
session_id=config.session_id or "default",
agent_config=config,
enabled=config.enable_memori,
semantic_search_top_k=config.memori_semantic_search_top_k,
mem0_manager=mem0_manager,
llm_instance=llm_instance,
)
if mem0_middleware:
middleware.append(mem0_middleware)
logger.info("Mem0 middleware added to agent")
except Exception as e:
logger.error(f"Failed to create Mem0 middleware: {e}, continuing without Mem0")
if config.enable_thinking:
middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt))
summarization_middleware = SummarizationMiddleware(
model=llm_instance,
trigger=('tokens', SUMMARIZATION_MAX_TOKENS),
trim_tokens_to_summarize=DEFAULT_TRIM_TOKEN_LIMIT,
keep=('tokens', SUMMARIZATION_TOKENS_TO_KEEP),
token_counter=create_token_counter(config.model_name)
)
middleware.append(summarization_middleware)
logger.info(f"init_agent middleware ready, elapsed: {time.time() - create_start:.3f}s")
mcp_tools = await mcp_tools_task
logger.info(f"Loaded {len(mcp_tools)} MCP tools")
logger.info(f"init_agent mcp tools ready, elapsed: {time.time() - create_start:.3f}s")
# Build the main agent's tool list by hiding tools blacklisted in plugin.json.
# Sub-agents still receive the full mcp_tools set, so hidden tools remain usable by them.
hidden_tools = collect_main_agent_hidden_tools(config.bot_id)
main_tools = [t for t in mcp_tools if t.name not in hidden_tools] if hidden_tools else mcp_tools
if hidden_tools:
logger.info(
f"Main agent hides {len(mcp_tools) - len(main_tools)} tools: {sorted(hidden_tools)}"
)
sandbox, sandbox_type, workspace_root = await sandbox_task
logger.info(f"init_agent sandbox ready, elapsed: {time.time() - create_start:.3f}s")
# Inject shell_env into Daytona sandbox via BASH_ENV file
if sandbox is not None and sandbox_type == "daytona":
_shell_env = {
"ASSISTANT_ID": config.bot_id,
"USER_IDENTIFIER": config.user_identifier,
"TRACE_ID": config.trace_id,
"ENABLE_SELF_KNOWLEDGE": str(config.enable_self_knowledge).lower(),
**(config.shell_env or {}),
}
env_lines = "\n".join(f'export {k}="{v}"' for k, v in _shell_env.items() if v is not None)
if env_lines:
from utils.daytona_sync import REMOTE_BASH_ENV_PATH, REMOTE_WORKSPACE_ROOT
bash_env_content = f"cd {REMOTE_WORKSPACE_ROOT}\n{env_lines}"
sandbox.execute(f"cat > {REMOTE_BASH_ENV_PATH} << 'ENVEOF'\n{bash_env_content}\nENVEOF")
logger.info(f"Injected {len(_shell_env)} env vars into Daytona BASH_ENV")
# Load sub-agents from skill directories
subagents = await load_subagents(
bot_id=config.bot_id,
tools=mcp_tools,
model=llm_instance,
)
if subagents:
logger.info(f"Loaded {len(subagents)} sub-agents: {[s['name'] for s in subagents]}")
agent, composite_backend = create_custom_cli_agent(
model=llm_instance,
assistant_id=config.bot_id,
system_prompt=system_prompt,
tools=main_tools,
auto_approve=True,
workspace_root=workspace_root,
middleware=middleware,
checkpointer=checkpointer,
sandbox=sandbox,
sandbox_type=sandbox_type,
subagents=subagents if subagents else None,
shell_env={
"ASSISTANT_ID": config.bot_id,
"USER_IDENTIFIER": config.user_identifier,
"TRACE_ID": config.trace_id,
"ENABLE_SELF_KNOWLEDGE": str(config.enable_self_knowledge).lower(),
**(config.shell_env or {}),
}
)
logger.info(f"create agent elapsed: {time.time() - create_start:.3f}s")
return agent, checkpointer, sandbox
class CustomSkillsMiddleware(SkillsMiddleware):
"""Custom SkillsMiddleware using the new signature format."""
def before_agent(self, state, runtime, config):
"""Load skills metadata before agent execution.
Adjust path display to use relative paths.
Args:
state: Current agent state.
runtime: Runtime context.
Returns:
Updated state with skills_metadata populated.
"""
if "skills_metadata" in state and len(state["skills_metadata"]) ==0 :
del state["skills_metadata"]
state = super().before_agent(state, runtime, config)
return state
async def abefore_agent(self, state, runtime, config):
"""Load skills metadata before agent execution.
Adjust path display to use relative paths.
Args:
state: Current agent state.
runtime: Runtime context.
Returns:
Updated state with skills_metadata populated.
"""
if "skills_metadata" in state and len(state["skills_metadata"]) ==0 :
del state["skills_metadata"]
state = await super().abefore_agent(state, runtime, config)
return state
def create_custom_cli_agent(
model: str | BaseChatModel,
assistant_id: str,
*,
tools: list[BaseTool] | None = None,
sandbox: SandboxBackendProtocol | None = None,
sandbox_type: str | None = None,
system_prompt: str | None = None,
auto_approve: bool = False,
enable_skills: bool = True,
enable_shell: bool = True,
middleware: list[AgentMiddleware] = [],
workspace_root: str | None = None,
checkpointer: Checkpointer | None = None,
store: BaseStore | None = None,
shell_env: dict[str, str] | None = None,
subagents: list[SubAgent] | None = None,
) -> tuple[Pregel, CompositeBackend]:
"""Create a CLI-configured agent with custom workspace_root for shell commands.
This is a custom version of create_cli_agent that allows specifying a custom
workspace_root for shell commands instead of using Path.cwd().
Args:
model: LLM model to use (e.g., "anthropic:claude-sonnet-4-5-20250929")
assistant_id: Agent identifier for memory/state storage
tools: Additional tools to provide to agent (default: empty list)
sandbox: Optional sandbox backend for remote execution (e.g., ModalBackend).
If None, uses local filesystem + shell.
sandbox_type: Type of sandbox provider ("modal", "runloop", "daytona").
Used for system prompt generation.
system_prompt: Override the default system prompt. If None, generates one
based on sandbox_type and assistant_id.
auto_approve: If True, automatically approves all tool calls without human
confirmation. Useful for automated workflows.
enable_memory: Enable AgentMemoryMiddleware for persistent memory
enable_skills: Enable SkillsMiddleware for custom agent skills
enable_shell: Enable ShellMiddleware for local shell execution (only in local mode)
workspace_root: Working directory for shell commands. If None, uses Path.cwd().
checkpointer: Optional checkpointer for persisting conversation state
store: Optional BaseStore for persisting user preferences and agent memory
shell_env: Optional custom environment variables to pass to ShellMiddleware.
These will be merged with os.environ. Custom vars take precedence.
Returns:
2-tuple of (agent_graph, composite_backend)
- agent_graph: Configured LangGraph Pregel instance ready for execution
- composite_backend: CompositeBackend for file operations
"""
if tools is None:
tools = []
# Build middleware stack based on enabled features
agent_middleware = middleware
# Prepare workspace root
if workspace_root is None:
workspace_root = str(Path.cwd())
# CONDITIONAL SETUP: Local vs Remote Sandbox
if sandbox is None:
# ========== LOCAL MODE ==========
if enable_shell:
# Create environment for shell commands
final_shell_env = os.environ.copy()
if shell_env:
final_shell_env.update(shell_env)
# Use LocalShellBackend for filesystem + shell execution
backend = LocalShellBackend(
root_dir=workspace_root,
virtual_mode=False,
inherit_env=True,
env=final_shell_env,
)
else:
# No shell access - use plain FilesystemBackend
backend = FilesystemBackend(root_dir=workspace_root, virtual_mode=False)
# Set up composite backend with routing based on the new implementation
# NOTE: virtual_mode=True anchors all paths to root_dir. This is required for
# these offload-only backends: CompositeBackend strips the route prefix and
# forwards "/" to grep, so virtual_mode=False would resolve "/" to the real
# filesystem root and scan the whole disk (hitting /usr, /var, other sessions'
# temp dirs), causing 45-152s grep calls. virtual_mode=True confines grep to
# the temp dir and filters out-of-root results.
large_results_backend = FilesystemBackend(
root_dir=tempfile.mkdtemp(prefix="deepagents_large_results_"),
virtual_mode=True,
)
conversation_history_backend = FilesystemBackend(
root_dir=tempfile.mkdtemp(prefix="deepagents_conversation_history_"),
virtual_mode=True,
)
composite_backend = CompositeBackend(
default=backend,
routes={
"/large_tool_results/": large_results_backend,
"/conversation_history/": conversation_history_backend,
},
)
# Add skills middleware (using new signature)
if enable_skills:
skills_sources = ["./skills"]
agent_middleware.append(
CustomSkillsMiddleware(
backend=FilesystemBackend(root_dir=workspace_root, virtual_mode=False),
sources=skills_sources,
)
)
# Add LocalContextMiddleware (new in latest version)
# Check if backend supports execute (is _ExecutableBackend)
if enable_shell:
from deepagents_cli.local_context import LocalContextMiddleware, _ExecutableBackend
if isinstance(backend, _ExecutableBackend):
agent_middleware.append(LocalContextMiddleware(backend=backend))
else:
# ========== REMOTE SANDBOX MODE ==========
composite_backend = CompositeBackend(
default=sandbox, # Remote sandbox (ModalBackend, etc.)
routes={}, # No virtualization
)
# Add skills middleware
if enable_skills:
skills_sources = ["/workspace/skills"]
agent_middleware.append(
CustomSkillsMiddleware(
backend=sandbox,
sources=skills_sources,
)
)
# Note: Shell middleware not used in sandbox mode
# File operations and execute tool are provided by the sandbox backend
# Get or use custom system prompt
if system_prompt is None:
from deepagents_cli.agent import get_system_prompt as _get_system_prompt
system_prompt = _get_system_prompt(assistant_id=assistant_id, sandbox_type=sandbox_type)
# Configure interrupt_on based on auto_approve setting
if auto_approve:
# No interrupts - all tools run automatically
interrupt_on = {}
else:
# Full HITL for destructive operations
from deepagents_cli.agent import _add_interrupt_on
interrupt_on = _add_interrupt_on()
deepagent_middleware = [
ToolErrorRecoveryMiddleware(), # Outermost: turn any tool exception into a ToolMessage so the agent can recover
TodoListMiddleware(),
FilePathFixMiddleware(), # Fix extra spaces in CJK file names within tool call arguments
CustomFilesystemMiddleware(backend=composite_backend), # Use the custom FilesystemMiddleware with full SKILL.md reading support
]
# Insert SubAgentMiddleware after FilesystemMiddleware (matches create_deep_agent ordering)
if subagents:
subagent_middleware = SubAgentMiddleware(
backend=composite_backend,
subagents=subagents,
)
deepagent_middleware.append(subagent_middleware)
logger.info(f"SubAgentMiddleware added with {len(subagents)} sub-agents: {[s['name'] for s in subagents]}")
deepagent_middleware.extend([
AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"),
PatchToolCallsMiddleware(),
])
if agent_middleware:
deepagent_middleware.extend(agent_middleware)
if interrupt_on is not None:
deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on))
agent = create_agent(
model,
system_prompt=system_prompt + "\n\n" + BASE_AGENT_PROMPT if system_prompt else BASE_AGENT_PROMPT,
tools=tools,
middleware=deepagent_middleware,
checkpointer=checkpointer,
store=store,
).with_config({"recursion_limit": 1000})
return agent, composite_backend