import json import logging import time import copy from pathlib import Path from typing import Any, Dict from langchain.chat_models import init_chat_model from deepagents import create_deep_agent from deepagents.backends import CompositeBackend from deepagents.backends.filesystem import FilesystemBackend from deepagents.backends.sandbox import SandboxBackendProtocol from deepagents_cli.agent import create_cli_agent from langchain.agents import create_agent from langchain.agents.middleware import SummarizationMiddleware from langchain_mcp_adapters.client import MultiServerMCPClient from sympy.printing.cxx import none from utils.fastapi_utils import detect_provider from .guideline_middleware import GuidelineMiddleware from .tool_output_length_middleware import ToolOutputLengthMiddleware from .tool_use_cleanup_middleware import ToolUseCleanupMiddleware from utils.settings import SUMMARIZATION_MAX_TOKENS, SUMMARIZATION_MESSAGES_TO_KEEP, TOOL_OUTPUT_MAX_LENGTH, MCP_HTTP_TIMEOUT, MCP_SSE_READ_TIMEOUT from agent.agent_config import AgentConfig from agent.prompt_loader import load_system_prompt_async, load_mcp_settings_async from agent.agent_memory_cache import get_memory_cache_manager from .checkpoint_utils import prepare_checkpoint_message from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from langgraph.checkpoint.memory import InMemorySaver from langchain.tools import BaseTool from langchain_core.language_models import BaseChatModel from langgraph.pregel import Pregel from deepagents_cli.shell import ShellMiddleware from deepagents_cli.agent_memory import AgentMemoryMiddleware from deepagents_cli.skills import SkillsMiddleware from deepagents_cli.config import settings, get_default_coding_instructions import os # 全局 MemorySaver 实例 # from langgraph.checkpoint.memory import MemorySaver # _global_checkpointer = MemorySaver() logger = logging.getLogger('app') # Utility functions def read_system_prompt(): """读取通用的无状态系统prompt""" with open("./prompt/system_prompt_default.md", "r", encoding="utf-8") as f: return f.read().strip() def read_mcp_settings(): """读取MCP工具配置""" with open("./mcp/mcp_settings.json", "r") as f: mcp_settings_json = json.load(f) return mcp_settings_json async def get_tools_from_mcp(mcp): """从MCP配置中提取工具(带缓存)""" start_time = time.time() # 防御式处理:确保 mcp 是列表且长度大于 0,且包含 mcpServers if not isinstance(mcp, list) or len(mcp) == 0 or "mcpServers" not in mcp[0]: logger.info(f"get_tools_from_mcp: invalid mcp config, elapsed: {time.time() - start_time:.3f}s") return [] # 尝试从缓存获取 cache_manager = get_memory_cache_manager() cached_tools = cache_manager.get_mcp_tools(mcp) if cached_tools is not None: logger.info(f"get_tools_from_mcp: cached {len(cached_tools)} tools, elapsed: {time.time() - start_time:.3f}s") return cached_tools # 深拷贝 mcp 配置,避免修改原始配置(影响缓存键) mcp_copy = copy.deepcopy(mcp) # 修改 mcp_copy[0]["mcpServers"] 列表,把 type 字段改成 transport # 如果没有 transport,则根据是否存在 url 默认 transport 为 http 或 stdio for cfg in mcp_copy[0]["mcpServers"].values(): if "type" in cfg: cfg.pop("type") if "transport" not in cfg: cfg["transport"] = "http" if "url" in cfg else "stdio" # 为 HTTP/ SSE 传输的 MCP 服务器添加超时配置 # 如果配置中未设置超时,使用全局默认值 if cfg.get("transport") in ("http", "sse"): if "timeout" not in cfg: cfg["timeout"] = MCP_HTTP_TIMEOUT if "sse_read_timeout" not in cfg: cfg["sse_read_timeout"] = MCP_SSE_READ_TIMEOUT # 确保 mcp_copy[0]["mcpServers"] 是字典类型 if not isinstance(mcp_copy[0]["mcpServers"], dict): logger.info(f"get_tools_from_mcp: mcpServers is not dict, elapsed: {time.time() - start_time:.3f}s") return [] try: mcp_client = MultiServerMCPClient(mcp_copy[0]["mcpServers"]) mcp_tools = await mcp_client.get_tools() # 缓存结果 cache_manager.set_mcp_tools(mcp, mcp_tools) logger.info(f"get_tools_from_mcp: loaded {len(mcp_tools)} tools, elapsed: {time.time() - start_time:.3f}s") return mcp_tools except Exception as e: import traceback error_details = traceback.format_exc() # 发生异常时返回空列表,避免上层调用报错 logger.error(f"get_tools_from_mcp: error {str(e)}, elapsed: {time.time() - start_time:.3f}s") logger.error(f"Full traceback: {error_details}") return [] async def init_agent(config: AgentConfig): """ 初始化 Agent,支持持久化内存和对话摘要 注意:不再缓存 agent,只缓存 mcp_tools 返回 (agent, checkpointer) 元组,调用后需要归还 checkpointer Args: config: AgentConfig 对象,包含所有初始化参数 Returns: (agent, checkpointer) 元组 """ # 加载配置 final_system_prompt = await load_system_prompt_async( config.project_dir, config.language, config.system_prompt, config.robot_type, config.bot_id, config.user_identifier ) final_mcp_settings = await load_mcp_settings_async( config.project_dir, config.mcp_settings, config.bot_id, config.robot_type ) # 如果没有提供mcp,使用config中的mcp_settings mcp_settings = final_mcp_settings if final_mcp_settings else read_mcp_settings() system_prompt = final_system_prompt if final_system_prompt else read_system_prompt() config.system_prompt = mcp_settings config.mcp_settings = system_prompt # 获取 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp 中) try: mcp_tools = await get_tools_from_mcp(mcp_settings) logger.info(f"Successfully loaded {len(mcp_tools)} MCP tools") except Exception as e: logger.error(f"Failed to load MCP tools: {str(e)}, using empty tool list") mcp_tools = [] # 检测或使用指定的提供商 model_provider, base_url = detect_provider(config.model_name, config.model_server) # 构建模型参数 model_kwargs = { "model": config.model_name, "model_provider": model_provider, "temperature": 0.8, "base_url": base_url, "api_key": config.api_key } if config.generate_cfg: model_kwargs.update(config.generate_cfg) llm_instance = init_chat_model(**model_kwargs) # 创建新的 agent(不再缓存) logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}") checkpointer = None create_start = time.time() if config.robot_type == "deep_agent": # 使用 DeepAgentX 创建 agent,自定义 workspace_root workspace_root = f"projects/robot/{config.bot_id}" # workspace_root = str(Path.home() / ".deepagents" / config.bot_id) agent, composite_backend = create_custom_cli_agent( model=llm_instance, assistant_id=config.bot_id, system_prompt=system_prompt, tools=mcp_tools, auto_approve=True, enable_memory=False, workspace_root=workspace_root ) else: # 构建中间件列表 middleware = [] # 首先添加 ToolUseCleanupMiddleware 来清理孤立的 tool_use middleware.append(ToolUseCleanupMiddleware()) # 只有在 enable_thinking 为 True 时才添加 GuidelineMiddleware if config.enable_thinking: middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt)) # 添加工具输出长度控制中间件 tool_output_middleware = ToolOutputLengthMiddleware( max_length=getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None or TOOL_OUTPUT_MAX_LENGTH, truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart', tool_filters=getattr(config.generate_cfg, 'tool_output_filters', None) if config.generate_cfg else None, exclude_tools=getattr(config.generate_cfg, 'tool_output_exclude', []) if config.generate_cfg else [], preserve_code_blocks=getattr(config.generate_cfg, 'preserve_code_blocks', True) if config.generate_cfg else True, preserve_json=getattr(config.generate_cfg, 'preserve_json', True) if config.generate_cfg else True ) middleware.append(tool_output_middleware) # 从连接池获取 checkpointer if config.session_id: from .checkpoint_manager import get_checkpointer_manager manager = get_checkpointer_manager() checkpointer = manager.checkpointer await prepare_checkpoint_message(config, checkpointer) summarization_middleware = SummarizationMiddleware( model=llm_instance, max_tokens_before_summary=SUMMARIZATION_MAX_TOKENS, messages_to_keep=SUMMARIZATION_MESSAGES_TO_KEEP, summary_prompt="请简洁地总结以上对话的要点,包括重要的用户信息、讨论过的话题和关键结论。" ) middleware.append(summarization_middleware) agent = create_agent( model=llm_instance, system_prompt=system_prompt, tools=mcp_tools, middleware=middleware, checkpointer=checkpointer ) logger.info(f"create {config.robot_type} elapsed: {time.time() - create_start:.3f}s") return agent, checkpointer class CustomAgentMemoryMiddleware(AgentMemoryMiddleware): def __init__( self, *, settings, assistant_id: str, system_prompt_template: str | None = None, ) -> None: super().__init__( settings=settings, assistant_id=assistant_id, system_prompt_template=system_prompt_template ) self.agent_dir_display = f"." class CustomSkillsMiddleware(SkillsMiddleware): def __init__( self, *, skills_dir: str | Path, assistant_id: str, project_skills_dir: str | Path | None = None, ) -> None: super().__init__( skills_dir=skills_dir, assistant_id=assistant_id, project_skills_dir=project_skills_dir ) self.skills_dir = None self.project_skills_display = f"./skills" def _format_skills_locations(self) -> str: """Format skills locations for display in system prompt.""" return "**Project Skills**: `{self.project_skills_dir}`" def _format_skills_list(self, skills) -> str: """Format skills metadata for display in system prompt.""" if not skills: locations = [f"{self.user_skills_display}/"] if self.project_skills_dir: locations.append(f"{self.project_skills_display}/") return f"(No skills available yet. You can create skills in {' or '.join(locations)})" # Group skills by source user_skills = [s for s in skills if s["source"] == "user"] project_skills = [s for s in skills if s["source"] == "project"] lines = [] # Show user skills if user_skills: lines.append("**User Skills:**") for skill in user_skills: lines.append(f"- **{skill['name']}**: {skill['description']}") lines.append(f" → Read `{skill['path']}` for full instructions") lines.append("") # Show project skills if project_skills: lines.append("**Project Skills:**") for skill in project_skills: lines.append(f"- **{skill['name']}**: {skill['description']}") lines.append(f" → Read `{skill['path']}` for full instructions") return "\n".join(lines) def before_agent(self, state, runtime): """Load skills metadata before agent execution. This runs once at session start to discover available skills from both user-level and project-level directories. Args: state: Current agent state. runtime: Runtime context. Returns: Updated state with skills_metadata populated. """ state = super().before_agent(state, runtime) for item in state["skills_metadata"]: if item["source"] == "project": item["path"] = self.project_skills_display + item["path"].replace(str(self.project_skills_dir), "") else: item["path"] = self.user_skills_display + item["path"].replace(str(self.skills_dir), "") return state def create_custom_cli_agent( model: str | BaseChatModel, assistant_id: str, *, tools: list[BaseTool] | None = None, sandbox: SandboxBackendProtocol | None = None, sandbox_type: str | None = None, system_prompt: str | None = None, auto_approve: bool = False, enable_memory: bool = True, enable_skills: bool = True, enable_shell: bool = True, workspace_root: str | None = None, ) -> tuple[Pregel, CompositeBackend]: """Create a CLI-configured agent with custom workspace_root for shell commands. This is a custom version of create_cli_agent that allows specifying a custom workspace_root for shell commands instead of using Path.cwd(). Args: model: LLM model to use (e.g., "anthropic:claude-sonnet-4-5-20250929") assistant_id: Agent identifier for memory/state storage tools: Additional tools to provide to agent (default: empty list) sandbox: Optional sandbox backend for remote execution (e.g., ModalBackend). If None, uses local filesystem + shell. sandbox_type: Type of sandbox provider ("modal", "runloop", "daytona"). Used for system prompt generation. system_prompt: Override the default system prompt. If None, generates one based on sandbox_type and assistant_id. auto_approve: If True, automatically approves all tool calls without human confirmation. Useful for automated workflows. enable_memory: Enable AgentMemoryMiddleware for persistent memory enable_skills: Enable SkillsMiddleware for custom agent skills enable_shell: Enable ShellMiddleware for local shell execution (only in local mode) workspace_root: Working directory for shell commands. If None, uses Path.cwd(). Returns: 2-tuple of (agent_graph, composite_backend) - agent_graph: Configured LangGraph Pregel instance ready for execution - composite_backend: CompositeBackend for file operations """ if tools is None: tools = [] # Setup agent directory for persistent memory (if enabled) if enable_memory or enable_skills: agent_dir = settings.ensure_agent_dir(assistant_id) agent_md = agent_dir / "agent.md" if not agent_md.exists(): source_content = get_default_coding_instructions() agent_md.write_text(source_content) # Build middleware stack based on enabled features agent_middleware = [] # CONDITIONAL SETUP: Local vs Remote Sandbox if sandbox is None: # ========== LOCAL MODE ========== composite_backend = CompositeBackend( default=FilesystemBackend(root_dir=workspace_root, virtual_mode=True), # Current working directory routes={}, # No virtualization - use real paths ) # Add memory middleware if enable_memory: agent_middleware.append( CustomAgentMemoryMiddleware(settings=settings, assistant_id=assistant_id) ) # Add skills middleware if enable_skills: agent_middleware.append( CustomSkillsMiddleware( skills_dir=workspace_root, project_skills_dir=workspace_root+"/skills", assistant_id=assistant_id ) ) # Add shell middleware (only in local mode) if enable_shell: # Create environment for shell commands # Restore user's original LANGSMITH_PROJECT so their code traces separately shell_env = os.environ.copy() # Use custom workspace_root if provided, otherwise use current directory shell_workspace = workspace_root if workspace_root is not None else str(Path.cwd()) agent_middleware.append( ShellMiddleware( workspace_root=shell_workspace, env=shell_env, ) ) else: # ========== REMOTE SANDBOX MODE ========== composite_backend = CompositeBackend( default=sandbox, # Remote sandbox (ModalBackend, etc.) routes={}, # No virtualization ) # Add memory middleware if enable_memory: agent_middleware.append( AgentMemoryMiddleware(settings=settings, assistant_id=assistant_id) ) # Add skills middleware if enable_skills: agent_middleware.append( CustomSkillsMiddleware( skills_dir=workspace_root, project_skills_dir=workspace_root+"/skills", assistant_id=assistant_id ) ) # Note: Shell middleware not used in sandbox mode # File operations and execute tool are provided by the sandbox backend # Get or use custom system prompt if system_prompt is None: # Import get_system_prompt from deepagents_cli.agent from deepagents_cli.agent import get_system_prompt as _get_system_prompt system_prompt = _get_system_prompt(assistant_id=assistant_id, sandbox_type=sandbox_type) # Import InterruptOnConfig from langchain.agents.middleware import InterruptOnConfig # Configure interrupt_on based on auto_approve setting if auto_approve: # No interrupts - all tools run automatically interrupt_on = {} else: # Full HITL for destructive operations - import from deepagents_cli.agent from deepagents_cli.agent import _add_interrupt_on interrupt_on = _add_interrupt_on() # Import config from deepagents_cli.config import config # Create the agent agent = create_deep_agent( model=model, system_prompt=system_prompt, tools=tools, backend=composite_backend, middleware=agent_middleware, interrupt_on=interrupt_on, checkpointer=InMemorySaver(), ).with_config(config) return agent, composite_backend