import json import logging import time import copy import os import tempfile import asyncio from pathlib import Path from typing import Any, Dict from langchain.chat_models import init_chat_model from deepagents import create_deep_agent from deepagents.backends import CompositeBackend, LocalShellBackend from deepagents.backends.filesystem import FilesystemBackend from deepagents.backends.sandbox import SandboxBackendProtocol from deepagents_cli.agent import create_cli_agent from langchain.agents import create_agent from langgraph.store.base import BaseStore from langchain.agents.middleware import SummarizationMiddleware as LangchainSummarizationMiddleware from .summarization_middleware import SummarizationMiddleware from langchain_mcp_adapters.client import MultiServerMCPClient from utils.fastapi_utils import detect_provider, sanitize_model_kwargs from .guideline_middleware import GuidelineMiddleware from .tool_output_length_middleware import ToolOutputLengthMiddleware from .tool_use_cleanup_middleware import ToolUseCleanupMiddleware from .tool_metrics_middleware import ToolMetricsMiddleware from .filepath_fix_middleware import FilePathFixMiddleware from .mcp_trace_meta import patch_mcp_client_session_trace_meta from utils.settings import ( SUMMARIZATION_MAX_TOKENS, SUMMARIZATION_TOKENS_TO_KEEP, TOOL_OUTPUT_MAX_LENGTH, MCP_HTTP_TIMEOUT, MCP_SSE_READ_TIMEOUT, DEFAULT_TRIM_TOKEN_LIMIT, DAYTONA_API_KEY, DAYTONA_SERVER_URL, DAYTONA_ENABLED, ) from utils.token_counter import create_token_counter from agent.agent_config import AgentConfig from .mem0_manager import get_mem0_manager from .mem0_middleware import create_mem0_middleware from .mem0_config import Mem0Config from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async from agent.agent_memory_cache import get_memory_cache_manager from .subagent_loader import load_subagents from agent.plugin_hook_loader import collect_main_agent_hidden_tools from .checkpoint_manager import get_checkpointer_manager from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from langgraph.checkpoint.memory import InMemorySaver from langchain.tools import BaseTool from langchain_core.language_models import BaseChatModel from langgraph.pregel import Pregel # New version imports: MemoryMiddleware and SkillsMiddleware were moved to deepagents.middleware from deepagents.middleware import MemoryMiddleware, SkillsMiddleware from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse from langchain.agents.middleware import HumanInTheLoopMiddleware, InterruptOnConfig, TodoListMiddleware from langchain_core.messages import AIMessage, HumanMessage from langgraph.types import Checkpointer from deepagents_cli.config import settings, get_default_coding_instructions from deepagents.middleware.filesystem import FilesystemMiddleware from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware from deepagents.graph import BASE_AGENT_PROMPT # Added: LocalContextMiddleware from deepagents_cli.local_context import LocalContextMiddleware # Custom: FilesystemMiddleware with full SKILL.md reading support from .custom_filesystem_middleware import CustomFilesystemMiddleware # Sub-agent support from deepagents.middleware.subagents import SubAgent, SubAgentMiddleware # Global MemorySaver instance # from langgraph.checkpoint.memory import MemorySaver # _global_checkpointer = MemorySaver() logger = logging.getLogger('app') class EmptyResponseRetryMiddleware(AgentMiddleware): """Automatically retry when the model returns an empty response with no text and no tool calls.""" MAX_RETRIES = 5 def _is_empty_response(self, result: ModelResponse) -> bool: """Determine whether the response is empty.""" if not result.result: return True msg = result.result[0] if not isinstance(msg, AIMessage): return False content = msg.content or "" has_text = bool(content.strip()) if isinstance(content, str) else bool(content) return not has_text and len(msg.tool_calls) == 0 def wrap_model_call(self, request, handler): result = handler(request) retries = 0 while self._is_empty_response(result) and retries < self.MAX_RETRIES: retries += 1 logger.warning(f"Empty response detected, retrying ({retries}/{self.MAX_RETRIES})") retry_messages = list(request.messages) + [ HumanMessage(content="Please continue your response.") ] request = request.override(messages=retry_messages) result = handler(request) return result async def awrap_model_call(self, request, handler): result = await handler(request) retries = 0 while self._is_empty_response(result) and retries < self.MAX_RETRIES: retries += 1 logger.warning(f"Empty response detected, retrying ({retries}/{self.MAX_RETRIES})") retry_messages = list(request.messages) + [ HumanMessage(content="Please continue your response.") ] request = request.override(messages=retry_messages) result = await handler(request) return result # Utility functions def read_system_prompt(): """Read the shared stateless system prompt.""" with open("./prompt/system_prompt_default.md", "r", encoding="utf-8") as f: return f.read().strip() async def get_tools_from_mcp(mcp): """Extract tools from MCP configuration with caching.""" patch_mcp_client_session_trace_meta() start_time = time.time() # Defensive handling: ensure mcp is a non-empty list containing mcpServers if not isinstance(mcp, list) or len(mcp) == 0 or "mcpServers" not in mcp[0]: logger.info(f"get_tools_from_mcp: invalid mcp config, elapsed: {time.time() - start_time:.3f}s") return [] # Try to load tools from cache cache_manager = get_memory_cache_manager() cached_tools = cache_manager.get_mcp_tools(mcp) if cached_tools is not None: logger.info(f"get_tools_from_mcp: cached {len(cached_tools)} tools, elapsed: {time.time() - start_time:.3f}s") return cached_tools # Deep-copy the MCP config to avoid mutating the original configuration and affecting the cache key. mcp_copy = copy.deepcopy(mcp) # Update mcp_copy[0]["mcpServers"] by renaming the type field to transport. # If transport is missing, default to http when url exists, otherwise stdio. for cfg in mcp_copy[0]["mcpServers"].values(): if "type" in cfg: cfg.pop("type") if "transport" not in cfg: cfg["transport"] = "http" if "url" in cfg else "stdio" # Add timeout settings for MCP servers using HTTP or SSE transport. # If timeout values are not configured, use the global defaults. if cfg.get("transport") in ("http", "sse"): if "timeout" not in cfg: cfg["timeout"] = MCP_HTTP_TIMEOUT if "sse_read_timeout" not in cfg: cfg["sse_read_timeout"] = MCP_SSE_READ_TIMEOUT # Ensure mcp_copy[0]["mcpServers"] is a dictionary. if not isinstance(mcp_copy[0]["mcpServers"], dict): logger.info(f"get_tools_from_mcp: mcpServers is not dict, elapsed: {time.time() - start_time:.3f}s") return [] try: mcp_client = MultiServerMCPClient(mcp_copy[0]["mcpServers"]) mcp_tools = await mcp_client.get_tools() # Cache the result cache_manager.set_mcp_tools(mcp, mcp_tools) logger.info(f"get_tools_from_mcp: loaded {len(mcp_tools)} tools, elapsed: {time.time() - start_time:.3f}s") return mcp_tools except Exception as e: # Return an empty list on exception to avoid propagating errors to callers. logger.error(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s") return [] from utils.daytona_sync import init_daytona_sandbox, sync_sandbox_to_local async def init_agent(config: AgentConfig): """ Initialize the agent with persistent memory and conversation summarization support. Note: the agent itself is no longer cached; only mcp_tools are cached. Returns an (agent, checkpointer) tuple, and the caller must release the checkpointer after use. Args: config: AgentConfig object containing all initialization parameters Returns: (agent, checkpointer) tuple """ create_start = time.time() # Load configuration concurrently final_system_prompt, final_mcp_settings = await asyncio.gather( load_system_prompt_async(config), load_mcp_settings_async(config), ) logger.info(f"init_agent config loaded, elapsed: {time.time() - create_start:.3f}s") mcp_settings = final_mcp_settings if final_mcp_settings else [] system_prompt = final_system_prompt if final_system_prompt else read_system_prompt() config.system_prompt = system_prompt config.mcp_settings = mcp_settings workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id) local_workspace_root = workspace_root # Run high-cost I/O in parallel: MCP tools loading + Daytona sandbox initialization mcp_tools_task = asyncio.create_task(get_tools_from_mcp(mcp_settings)) sandbox_task = asyncio.create_task(asyncio.to_thread(init_daytona_sandbox, config.bot_id, local_workspace_root)) # Detect the provider or use the explicitly specified one model_provider, base_url = detect_provider(config.model_name, config.model_server) model_kwargs, dropped_params, default_temperature_applied = sanitize_model_kwargs( model_name=config.model_name, model_provider=model_provider, base_url=base_url, api_key=config.api_key, generate_cfg=config.generate_cfg, source="init_agent" ) if dropped_params: logger.info( "init_agent dropped_params=%s model=%s provider=%s default_temperature_applied=%s", dropped_params, config.model_name, model_provider, default_temperature_applied ) llm_instance = init_chat_model(**model_kwargs) # Create a new agent instance without caching logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}") checkpointer = None # Get the checkpointer from the connection pool # prepare_checkpoint_message has already been called in from_v1/from_v2_request if config.session_id: try: manager = get_checkpointer_manager() checkpointer = manager.checkpointer except Exception as e: logger.warning(f"Failed to load checkpointer: {e}") # Build the middleware list middleware = [] middleware.append(EmptyResponseRetryMiddleware()) middleware.append(ToolMetricsMiddleware(config)) middleware.append(ToolUseCleanupMiddleware()) # tool_output_middleware = ToolOutputLengthMiddleware( # max_length=(getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None) or TOOL_OUTPUT_MAX_LENGTH, # truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart', # tool_filters=getattr(config.generate_cfg, 'tool_output_filters', None) if config.generate_cfg else None, # exclude_tools=getattr(config.generate_cfg, 'tool_output_exclude', []) if config.generate_cfg else [], # preserve_code_blocks=getattr(config.generate_cfg, 'preserve_code_blocks', True) if config.generate_cfg else True, # preserve_json=getattr(config.generate_cfg, 'preserve_json', True) if config.generate_cfg else True # ) # middleware.append(tool_output_middleware) if config.enable_memori: try: if not config.user_identifier: logger.warning("Mem0 enabled but user_identifier is missing, skipping Mem0") else: mem0_manager = get_mem0_manager() mem0_middleware = create_mem0_middleware( bot_id=config.bot_id, user_identifier=config.user_identifier, session_id=config.session_id or "default", agent_config=config, enabled=config.enable_memori, semantic_search_top_k=config.memori_semantic_search_top_k, mem0_manager=mem0_manager, llm_instance=llm_instance, ) if mem0_middleware: middleware.append(mem0_middleware) logger.info("Mem0 middleware added to agent") except Exception as e: logger.error(f"Failed to create Mem0 middleware: {e}, continuing without Mem0") if config.enable_thinking: middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt)) summarization_middleware = SummarizationMiddleware( model=llm_instance, trigger=('tokens', SUMMARIZATION_MAX_TOKENS), trim_tokens_to_summarize=DEFAULT_TRIM_TOKEN_LIMIT, keep=('tokens', SUMMARIZATION_TOKENS_TO_KEEP), token_counter=create_token_counter(config.model_name) ) middleware.append(summarization_middleware) logger.info(f"init_agent middleware ready, elapsed: {time.time() - create_start:.3f}s") mcp_tools = await mcp_tools_task logger.info(f"Loaded {len(mcp_tools)} MCP tools") logger.info(f"init_agent mcp tools ready, elapsed: {time.time() - create_start:.3f}s") # Build the main agent's tool list by hiding tools blacklisted in plugin.json. # Sub-agents still receive the full mcp_tools set, so hidden tools remain usable by them. hidden_tools = collect_main_agent_hidden_tools(config.bot_id) main_tools = [t for t in mcp_tools if t.name not in hidden_tools] if hidden_tools else mcp_tools if hidden_tools: logger.info( f"Main agent hides {len(mcp_tools) - len(main_tools)} tools: {sorted(hidden_tools)}" ) sandbox, sandbox_type, workspace_root = await sandbox_task logger.info(f"init_agent sandbox ready, elapsed: {time.time() - create_start:.3f}s") # Inject shell_env into Daytona sandbox via BASH_ENV file if sandbox is not None and sandbox_type == "daytona": _shell_env = { "ASSISTANT_ID": config.bot_id, "USER_IDENTIFIER": config.user_identifier, "TRACE_ID": config.trace_id, "ENABLE_SELF_KNOWLEDGE": str(config.enable_self_knowledge).lower(), **(config.shell_env or {}), } env_lines = "\n".join(f'export {k}="{v}"' for k, v in _shell_env.items() if v is not None) if env_lines: from utils.daytona_sync import REMOTE_BASH_ENV_PATH, REMOTE_WORKSPACE_ROOT bash_env_content = f"cd {REMOTE_WORKSPACE_ROOT}\n{env_lines}" sandbox.execute(f"cat > {REMOTE_BASH_ENV_PATH} << 'ENVEOF'\n{bash_env_content}\nENVEOF") logger.info(f"Injected {len(_shell_env)} env vars into Daytona BASH_ENV") # Load sub-agents from skill directories subagents = await load_subagents( bot_id=config.bot_id, tools=mcp_tools, model=llm_instance, ) if subagents: logger.info(f"Loaded {len(subagents)} sub-agents: {[s['name'] for s in subagents]}") agent, composite_backend = create_custom_cli_agent( model=llm_instance, assistant_id=config.bot_id, system_prompt=system_prompt, tools=main_tools, auto_approve=True, workspace_root=workspace_root, middleware=middleware, checkpointer=checkpointer, sandbox=sandbox, sandbox_type=sandbox_type, subagents=subagents if subagents else None, shell_env={ "ASSISTANT_ID": config.bot_id, "USER_IDENTIFIER": config.user_identifier, "TRACE_ID": config.trace_id, "ENABLE_SELF_KNOWLEDGE": str(config.enable_self_knowledge).lower(), **(config.shell_env or {}), } ) logger.info(f"create agent elapsed: {time.time() - create_start:.3f}s") return agent, checkpointer, sandbox class CustomSkillsMiddleware(SkillsMiddleware): """Custom SkillsMiddleware using the new signature format.""" def before_agent(self, state, runtime, config): """Load skills metadata before agent execution. Adjust path display to use relative paths. Args: state: Current agent state. runtime: Runtime context. Returns: Updated state with skills_metadata populated. """ if "skills_metadata" in state and len(state["skills_metadata"]) ==0 : del state["skills_metadata"] state = super().before_agent(state, runtime, config) return state async def abefore_agent(self, state, runtime, config): """Load skills metadata before agent execution. Adjust path display to use relative paths. Args: state: Current agent state. runtime: Runtime context. Returns: Updated state with skills_metadata populated. """ if "skills_metadata" in state and len(state["skills_metadata"]) ==0 : del state["skills_metadata"] state = await super().abefore_agent(state, runtime, config) return state def create_custom_cli_agent( model: str | BaseChatModel, assistant_id: str, *, tools: list[BaseTool] | None = None, sandbox: SandboxBackendProtocol | None = None, sandbox_type: str | None = None, system_prompt: str | None = None, auto_approve: bool = False, enable_skills: bool = True, enable_shell: bool = True, middleware: list[AgentMiddleware] = [], workspace_root: str | None = None, checkpointer: Checkpointer | None = None, store: BaseStore | None = None, shell_env: dict[str, str] | None = None, subagents: list[SubAgent] | None = None, ) -> tuple[Pregel, CompositeBackend]: """Create a CLI-configured agent with custom workspace_root for shell commands. This is a custom version of create_cli_agent that allows specifying a custom workspace_root for shell commands instead of using Path.cwd(). Args: model: LLM model to use (e.g., "anthropic:claude-sonnet-4-5-20250929") assistant_id: Agent identifier for memory/state storage tools: Additional tools to provide to agent (default: empty list) sandbox: Optional sandbox backend for remote execution (e.g., ModalBackend). If None, uses local filesystem + shell. sandbox_type: Type of sandbox provider ("modal", "runloop", "daytona"). Used for system prompt generation. system_prompt: Override the default system prompt. If None, generates one based on sandbox_type and assistant_id. auto_approve: If True, automatically approves all tool calls without human confirmation. Useful for automated workflows. enable_memory: Enable AgentMemoryMiddleware for persistent memory enable_skills: Enable SkillsMiddleware for custom agent skills enable_shell: Enable ShellMiddleware for local shell execution (only in local mode) workspace_root: Working directory for shell commands. If None, uses Path.cwd(). checkpointer: Optional checkpointer for persisting conversation state store: Optional BaseStore for persisting user preferences and agent memory shell_env: Optional custom environment variables to pass to ShellMiddleware. These will be merged with os.environ. Custom vars take precedence. Returns: 2-tuple of (agent_graph, composite_backend) - agent_graph: Configured LangGraph Pregel instance ready for execution - composite_backend: CompositeBackend for file operations """ if tools is None: tools = [] # Build middleware stack based on enabled features agent_middleware = middleware # Prepare workspace root if workspace_root is None: workspace_root = str(Path.cwd()) # CONDITIONAL SETUP: Local vs Remote Sandbox if sandbox is None: # ========== LOCAL MODE ========== if enable_shell: # Create environment for shell commands final_shell_env = os.environ.copy() if shell_env: final_shell_env.update(shell_env) # Use LocalShellBackend for filesystem + shell execution backend = LocalShellBackend( root_dir=workspace_root, virtual_mode=False, inherit_env=True, env=final_shell_env, ) else: # No shell access - use plain FilesystemBackend backend = FilesystemBackend(root_dir=workspace_root, virtual_mode=False) # Set up composite backend with routing based on the new implementation # NOTE: virtual_mode=True anchors all paths to root_dir. This is required for # these offload-only backends: CompositeBackend strips the route prefix and # forwards "/" to grep, so virtual_mode=False would resolve "/" to the real # filesystem root and scan the whole disk (hitting /usr, /var, other sessions' # temp dirs), causing 45-152s grep calls. virtual_mode=True confines grep to # the temp dir and filters out-of-root results. large_results_backend = FilesystemBackend( root_dir=tempfile.mkdtemp(prefix="deepagents_large_results_"), virtual_mode=True, ) conversation_history_backend = FilesystemBackend( root_dir=tempfile.mkdtemp(prefix="deepagents_conversation_history_"), virtual_mode=True, ) composite_backend = CompositeBackend( default=backend, routes={ "/large_tool_results/": large_results_backend, "/conversation_history/": conversation_history_backend, }, ) # Add skills middleware (using new signature) if enable_skills: skills_sources = ["./skills"] agent_middleware.append( CustomSkillsMiddleware( backend=FilesystemBackend(root_dir=workspace_root, virtual_mode=False), sources=skills_sources, ) ) # Add LocalContextMiddleware (new in latest version) # Check if backend supports execute (is _ExecutableBackend) if enable_shell: from deepagents_cli.local_context import LocalContextMiddleware, _ExecutableBackend if isinstance(backend, _ExecutableBackend): agent_middleware.append(LocalContextMiddleware(backend=backend)) else: # ========== REMOTE SANDBOX MODE ========== composite_backend = CompositeBackend( default=sandbox, # Remote sandbox (ModalBackend, etc.) routes={}, # No virtualization ) # Add skills middleware if enable_skills: skills_sources = ["/workspace/skills"] agent_middleware.append( CustomSkillsMiddleware( backend=sandbox, sources=skills_sources, ) ) # Note: Shell middleware not used in sandbox mode # File operations and execute tool are provided by the sandbox backend # Get or use custom system prompt if system_prompt is None: from deepagents_cli.agent import get_system_prompt as _get_system_prompt system_prompt = _get_system_prompt(assistant_id=assistant_id, sandbox_type=sandbox_type) # Configure interrupt_on based on auto_approve setting if auto_approve: # No interrupts - all tools run automatically interrupt_on = {} else: # Full HITL for destructive operations from deepagents_cli.agent import _add_interrupt_on interrupt_on = _add_interrupt_on() deepagent_middleware = [ TodoListMiddleware(), FilePathFixMiddleware(), # Fix extra spaces in CJK file names within tool call arguments CustomFilesystemMiddleware(backend=composite_backend), # Use the custom FilesystemMiddleware with full SKILL.md reading support ] # Insert SubAgentMiddleware after FilesystemMiddleware (matches create_deep_agent ordering) if subagents: subagent_middleware = SubAgentMiddleware( backend=composite_backend, subagents=subagents, ) deepagent_middleware.append(subagent_middleware) logger.info(f"SubAgentMiddleware added with {len(subagents)} sub-agents: {[s['name'] for s in subagents]}") deepagent_middleware.extend([ AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"), PatchToolCallsMiddleware(), ]) if agent_middleware: deepagent_middleware.extend(agent_middleware) if interrupt_on is not None: deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on)) agent = create_agent( model, system_prompt=system_prompt + "\n\n" + BASE_AGENT_PROMPT if system_prompt else BASE_AGENT_PROMPT, tools=tools, middleware=deepagent_middleware, checkpointer=checkpointer, store=store, ).with_config({"recursion_limit": 1000}) return agent, composite_backend