import json import logging import time import copy import os import tempfile from pathlib import Path from typing import Any, Dict from langchain.chat_models import init_chat_model from deepagents import create_deep_agent from deepagents.backends import CompositeBackend, LocalShellBackend from deepagents.backends.filesystem import FilesystemBackend from deepagents.backends.sandbox import SandboxBackendProtocol from deepagents_cli.agent import create_cli_agent from langchain.agents import create_agent from langgraph.store.base import BaseStore from langchain.agents.middleware import SummarizationMiddleware as LangchainSummarizationMiddleware from .summarization_middleware import SummarizationMiddleware from langchain_mcp_adapters.client import MultiServerMCPClient from sympy.printing.cxx import none from utils.fastapi_utils import detect_provider from .guideline_middleware import GuidelineMiddleware from .tool_output_length_middleware import ToolOutputLengthMiddleware from .tool_use_cleanup_middleware import ToolUseCleanupMiddleware from utils.settings import ( SUMMARIZATION_MAX_TOKENS, SUMMARIZATION_TOKENS_TO_KEEP, TOOL_OUTPUT_MAX_LENGTH, MCP_HTTP_TIMEOUT, MCP_SSE_READ_TIMEOUT, DEFAULT_TRIM_TOKEN_LIMIT ) from utils.token_counter import create_token_counter from agent.agent_config import AgentConfig from .mem0_manager import get_mem0_manager from .mem0_middleware import create_mem0_middleware from .mem0_config import Mem0Config from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async from agent.agent_memory_cache import get_memory_cache_manager from .checkpoint_manager import get_checkpointer_manager from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from langgraph.checkpoint.memory import InMemorySaver from langchain.tools import BaseTool from langchain_core.language_models import BaseChatModel from langgraph.pregel import Pregel # 新版本导入:MemoryMiddleware 和 SkillsMiddleware 已迁移到 deepagents.middleware from deepagents.middleware import MemoryMiddleware, SkillsMiddleware from langchain.agents.middleware import AgentMiddleware from langgraph.types import Checkpointer from deepagents_cli.config import settings, get_default_coding_instructions from langchain.agents.middleware import HumanInTheLoopMiddleware, InterruptOnConfig, TodoListMiddleware from deepagents.middleware.filesystem import FilesystemMiddleware from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware from deepagents.graph import BASE_AGENT_PROMPT # 新增:LocalContextMiddleware from deepagents_cli.local_context import LocalContextMiddleware # 自定义:支持 SKILL.md 完整读取的 FilesystemMiddleware from .custom_filesystem_middleware import CustomFilesystemMiddleware # 全局 MemorySaver 实例 # from langgraph.checkpoint.memory import MemorySaver # _global_checkpointer = MemorySaver() logger = logging.getLogger('app') # Utility functions def read_system_prompt(): """读取通用的无状态系统prompt""" with open("./prompt/system_prompt_default.md", "r", encoding="utf-8") as f: return f.read().strip() def read_mcp_settings(): """读取MCP工具配置""" with open("./mcp/mcp_settings.json", "r") as f: mcp_settings_json = json.load(f) return mcp_settings_json async def get_tools_from_mcp(mcp): """从MCP配置中提取工具(带缓存)""" start_time = time.time() # 防御式处理:确保 mcp 是列表且长度大于 0,且包含 mcpServers if not isinstance(mcp, list) or len(mcp) == 0 or "mcpServers" not in mcp[0]: logger.info(f"get_tools_from_mcp: invalid mcp config, elapsed: {time.time() - start_time:.3f}s") return [] # 尝试从缓存获取 cache_manager = get_memory_cache_manager() cached_tools = cache_manager.get_mcp_tools(mcp) if cached_tools is not None: logger.info(f"get_tools_from_mcp: cached {len(cached_tools)} tools, elapsed: {time.time() - start_time:.3f}s") return cached_tools # 深拷贝 mcp 配置,避免修改原始配置(影响缓存键) mcp_copy = copy.deepcopy(mcp) # 修改 mcp_copy[0]["mcpServers"] 列表,把 type 字段改成 transport # 如果没有 transport,则根据是否存在 url 默认 transport 为 http 或 stdio for cfg in mcp_copy[0]["mcpServers"].values(): if "type" in cfg: cfg.pop("type") if "transport" not in cfg: cfg["transport"] = "http" if "url" in cfg else "stdio" # 为 HTTP/ SSE 传输的 MCP 服务器添加超时配置 # 如果配置中未设置超时,使用全局默认值 if cfg.get("transport") in ("http", "sse"): if "timeout" not in cfg: cfg["timeout"] = MCP_HTTP_TIMEOUT if "sse_read_timeout" not in cfg: cfg["sse_read_timeout"] = MCP_SSE_READ_TIMEOUT # 确保 mcp_copy[0]["mcpServers"] 是字典类型 if not isinstance(mcp_copy[0]["mcpServers"], dict): logger.info(f"get_tools_from_mcp: mcpServers is not dict, elapsed: {time.time() - start_time:.3f}s") return [] try: mcp_client = MultiServerMCPClient(mcp_copy[0]["mcpServers"]) mcp_tools = await mcp_client.get_tools() # 缓存结果 cache_manager.set_mcp_tools(mcp, mcp_tools) logger.info(f"get_tools_from_mcp: loaded {len(mcp_tools)} tools, elapsed: {time.time() - start_time:.3f}s") return mcp_tools except Exception as e: # 发生异常时返回空列表,避免上层调用报错 logger.error(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s") return [] async def init_agent(config: AgentConfig): """ 初始化 Agent,支持持久化内存和对话摘要 注意:不再缓存 agent,只缓存 mcp_tools 返回 (agent, checkpointer) 元组,调用后需要归还 checkpointer Args: config: AgentConfig 对象,包含所有初始化参数 Returns: (agent, checkpointer) 元组 """ # 加载配置 final_system_prompt = await load_system_prompt_async(config) final_mcp_settings = await load_mcp_settings_async(config) # 如果没有提供mcp,使用config中的mcp_settings mcp_settings = final_mcp_settings if final_mcp_settings else read_mcp_settings() system_prompt = final_system_prompt if final_system_prompt else read_system_prompt() config.system_prompt = mcp_settings config.mcp_settings = system_prompt # 获取 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp 中) mcp_tools = await get_tools_from_mcp(mcp_settings) logger.info(f"Loaded {len(mcp_tools)} MCP tools") # 检测或使用指定的提供商 model_provider, base_url = detect_provider(config.model_name, config.model_server) # 构建模型参数 model_kwargs = { "model": config.model_name, "model_provider": model_provider, "temperature": 0.8, "base_url": base_url, "api_key": config.api_key } if config.generate_cfg: # 内部使用的参数,不应传给任何 LLM internal_params = { 'tool_output_max_length', 'tool_output_truncation_strategy', 'tool_output_filters', 'tool_output_exclude', 'preserve_code_blocks', 'preserve_json', } # Anthropic 不支持的 OpenAI 特有参数 openai_only_params = { 'n', # 生成多少个响应 'presence_penalty', 'frequency_penalty', 'logprobs', 'top_logprobs', 'logit_bias', 'seed', 'suffix', 'best_of', 'echo', 'user', } # 根据提供商决定需要过滤的参数 params_to_filter = internal_params.copy() if model_provider == 'anthropic': params_to_filter.update(openai_only_params) filtered_cfg = {k: v for k, v in config.generate_cfg.items() if k not in params_to_filter} model_kwargs.update(filtered_cfg) llm_instance = init_chat_model(**model_kwargs) # 创建新的 agent(不再缓存) logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}") checkpointer = None create_start = time.time() # 从连接池获取 checkpointer(prepare_checkpoint_message 已在 from_v1/from_v2_request 中调用) if config.session_id: try: manager = get_checkpointer_manager() checkpointer = manager.checkpointer except Exception as e: logger.warning(f"Failed to load checkpointer: {e}") # 构建中间件列表 middleware = [] # 首先添加 ToolUseCleanupMiddleware 来清理孤立的 tool_use middleware.append(ToolUseCleanupMiddleware()) # 添加工具输出长度控制中间件 tool_output_middleware = ToolOutputLengthMiddleware( max_length=(getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None) or TOOL_OUTPUT_MAX_LENGTH, truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart', tool_filters=getattr(config.generate_cfg, 'tool_output_filters', None) if config.generate_cfg else None, exclude_tools=getattr(config.generate_cfg, 'tool_output_exclude', []) if config.generate_cfg else [], preserve_code_blocks=getattr(config.generate_cfg, 'preserve_code_blocks', True) if config.generate_cfg else True, preserve_json=getattr(config.generate_cfg, 'preserve_json', True) if config.generate_cfg else True ) middleware.append(tool_output_middleware) # 添加 Mem0 记忆中间件(如果启用) if config.enable_memori: try: # 确保有 user_identifier if not config.user_identifier: logger.warning("Mem0 enabled but user_identifier is missing, skipping Mem0") else: # 获取全局 Mem0Manager(已在 fastapi_app.py 中初始化) mem0_manager = get_mem0_manager() # 创建 Mem0 中间件,传入现有的 llm_instance 和 config mem0_middleware = create_mem0_middleware( bot_id=config.bot_id, user_identifier=config.user_identifier, session_id=config.session_id or "default", agent_config=config, # 传入 AgentConfig 用于中间件间传递数据 enabled=config.enable_memori, semantic_search_top_k=config.memori_semantic_search_top_k, mem0_manager=mem0_manager, llm_instance=llm_instance, # 传入现有 LLM 实例 ) if mem0_middleware: middleware.append(mem0_middleware) logger.info("Mem0 middleware added to agent") except Exception as e: logger.error(f"Failed to create Mem0 middleware: {e}, continuing without Mem0") # 只有在 enable_thinking 为 True 时才添加 GuidelineMiddleware if config.enable_thinking: middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt)) summarization_middleware = SummarizationMiddleware( model=llm_instance, trigger=('tokens', SUMMARIZATION_MAX_TOKENS), trim_tokens_to_summarize=DEFAULT_TRIM_TOKEN_LIMIT, keep=('tokens', SUMMARIZATION_TOKENS_TO_KEEP), token_counter=create_token_counter(config.model_name) ) middleware.append(summarization_middleware) workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id) # workspace_root = str(Path.home() / ".deepagents" / config.bot_id) agent, composite_backend = create_custom_cli_agent( model=llm_instance, assistant_id=config.bot_id, system_prompt=system_prompt, tools=mcp_tools, auto_approve=True, workspace_root=workspace_root, middleware=middleware, checkpointer=checkpointer, shell_env={ k: v for k, v in { "ASSISTANT_ID": str(config.bot_id), "USER_IDENTIFIER": str(config.user_identifier) if config.user_identifier else None, "TRACE_ID": str(config.trace_id) if config.trace_id else None, **(config.shell_env or {}), }.items() if v is not None } ) logger.info(f"create agent elapsed: {time.time() - create_start:.3f}s") return agent, checkpointer class CustomSkillsMiddleware(SkillsMiddleware): """自定义的 SkillsMiddleware,使用新的签名格式""" def before_agent(self, state, runtime, config): """Load skills metadata before agent execution. 修改路径显示为相对路径。 Args: state: Current agent state. runtime: Runtime context. Returns: Updated state with skills_metadata populated. """ if "skills_metadata" in state and len(state["skills_metadata"]) ==0 : del state["skills_metadata"] state = super().before_agent(state, runtime, config) return state async def abefore_agent(self, state, runtime, config): """Load skills metadata before agent execution. 修改路径显示为相对路径。 Args: state: Current agent state. runtime: Runtime context. Returns: Updated state with skills_metadata populated. """ if "skills_metadata" in state and len(state["skills_metadata"]) ==0 : del state["skills_metadata"] state = await super().abefore_agent(state, runtime, config) return state def create_custom_cli_agent( model: str | BaseChatModel, assistant_id: str, *, tools: list[BaseTool] | None = None, sandbox: SandboxBackendProtocol | None = None, sandbox_type: str | None = None, system_prompt: str | None = None, auto_approve: bool = False, enable_skills: bool = True, enable_shell: bool = True, middleware: list[AgentMiddleware] = [], workspace_root: str | None = None, checkpointer: Checkpointer | None = None, store: BaseStore | None = None, shell_env: dict[str, str] | None = None, ) -> tuple[Pregel, CompositeBackend]: """Create a CLI-configured agent with custom workspace_root for shell commands. This is a custom version of create_cli_agent that allows specifying a custom workspace_root for shell commands instead of using Path.cwd(). Args: model: LLM model to use (e.g., "anthropic:claude-sonnet-4-5-20250929") assistant_id: Agent identifier for memory/state storage tools: Additional tools to provide to agent (default: empty list) sandbox: Optional sandbox backend for remote execution (e.g., ModalBackend). If None, uses local filesystem + shell. sandbox_type: Type of sandbox provider ("modal", "runloop", "daytona"). Used for system prompt generation. system_prompt: Override the default system prompt. If None, generates one based on sandbox_type and assistant_id. auto_approve: If True, automatically approves all tool calls without human confirmation. Useful for automated workflows. enable_memory: Enable AgentMemoryMiddleware for persistent memory enable_skills: Enable SkillsMiddleware for custom agent skills enable_shell: Enable ShellMiddleware for local shell execution (only in local mode) workspace_root: Working directory for shell commands. If None, uses Path.cwd(). checkpointer: Optional checkpointer for persisting conversation state store: Optional BaseStore for persisting user preferences and agent memory shell_env: Optional custom environment variables to pass to ShellMiddleware. These will be merged with os.environ. Custom vars take precedence. Returns: 2-tuple of (agent_graph, composite_backend) - agent_graph: Configured LangGraph Pregel instance ready for execution - composite_backend: CompositeBackend for file operations """ if tools is None: tools = [] # Build middleware stack based on enabled features agent_middleware = middleware # Prepare workspace root if workspace_root is None: workspace_root = str(Path.cwd()) # CONDITIONAL SETUP: Local vs Remote Sandbox if sandbox is None: # ========== LOCAL MODE ========== if enable_shell: # Create environment for shell commands final_shell_env = os.environ.copy() if shell_env: final_shell_env.update(shell_env) # Use LocalShellBackend for filesystem + shell execution backend = LocalShellBackend( root_dir=workspace_root, virtual_mode=True, inherit_env=True, env=final_shell_env, ) else: # No shell access - use plain FilesystemBackend backend = FilesystemBackend(root_dir=workspace_root, virtual_mode=True) # Set up composite backend with routing (参考新版本实现) large_results_backend = FilesystemBackend( root_dir=tempfile.mkdtemp(prefix="deepagents_large_results_"), virtual_mode=True, ) conversation_history_backend = FilesystemBackend( root_dir=tempfile.mkdtemp(prefix="deepagents_conversation_history_"), virtual_mode=True, ) composite_backend = CompositeBackend( default=backend, routes={ "/large_tool_results/": large_results_backend, "/conversation_history/": conversation_history_backend, }, ) # Add skills middleware (using new signature) if enable_skills: skills_sources = ["./skills"] agent_middleware.append( CustomSkillsMiddleware( backend=FilesystemBackend(root_dir=workspace_root, virtual_mode=True), sources=skills_sources, ) ) # Add LocalContextMiddleware (new in latest version) # Check if backend supports execute (is _ExecutableBackend) if enable_shell: from deepagents_cli.local_context import LocalContextMiddleware, _ExecutableBackend if isinstance(backend, _ExecutableBackend): agent_middleware.append(LocalContextMiddleware(backend=backend)) else: # ========== REMOTE SANDBOX MODE ========== composite_backend = CompositeBackend( default=sandbox, # Remote sandbox (ModalBackend, etc.) routes={}, # No virtualization ) # Add skills middleware if enable_skills: skills_sources = ["/skills"] agent_middleware.append( CustomSkillsMiddleware( backend=sandbox, sources=skills_sources, ) ) # Note: Shell middleware not used in sandbox mode # File operations and execute tool are provided by the sandbox backend # Get or use custom system prompt if system_prompt is None: from deepagents_cli.agent import get_system_prompt as _get_system_prompt system_prompt = _get_system_prompt(assistant_id=assistant_id, sandbox_type=sandbox_type) # Configure interrupt_on based on auto_approve setting if auto_approve: # No interrupts - all tools run automatically interrupt_on = {} else: # Full HITL for destructive operations from deepagents_cli.agent import _add_interrupt_on interrupt_on = _add_interrupt_on() deepagent_middleware = [ TodoListMiddleware(), CustomFilesystemMiddleware(backend=composite_backend), # 使用自定义的 FilesystemMiddleware,支持 SKILL.md 完整读取 AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore"), PatchToolCallsMiddleware(), ] if agent_middleware: deepagent_middleware.extend(agent_middleware) if interrupt_on is not None: deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=interrupt_on)) agent = create_agent( model, system_prompt=system_prompt + "\n\n" + BASE_AGENT_PROMPT if system_prompt else BASE_AGENT_PROMPT, tools=tools, middleware=deepagent_middleware, checkpointer=checkpointer, store=store, ).with_config({"recursion_limit": 1000}) return agent, composite_backend