diff --git a/agent/deep_assistant.py b/agent/deep_assistant.py index 888a15d..ac6688b 100644 --- a/agent/deep_assistant.py +++ b/agent/deep_assistant.py @@ -4,6 +4,7 @@ import time import copy import os import tempfile +import asyncio from pathlib import Path from typing import Any, Dict from langchain.chat_models import init_chat_model @@ -173,6 +174,10 @@ async def get_tools_from_mcp(mcp): logger.error(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s") return [] + +from utils.daytona_sync import init_daytona_sandbox, sync_sandbox_to_local + + async def init_agent(config: AgentConfig): """ 初始化 Agent,支持持久化内存和对话摘要 @@ -187,19 +192,27 @@ async def init_agent(config: AgentConfig): (agent, checkpointer) 元组 """ - # 加载配置 - final_system_prompt = await load_system_prompt_async(config) - final_mcp_settings = await load_mcp_settings_async(config) + create_start = time.time() + + # 并行加载配置 + final_system_prompt, final_mcp_settings = await asyncio.gather( + load_system_prompt_async(config), + load_mcp_settings_async(config), + ) + logger.info(f"init_agent config loaded, elapsed: {time.time() - create_start:.3f}s") mcp_settings = final_mcp_settings if final_mcp_settings else [] system_prompt = final_system_prompt if final_system_prompt else read_system_prompt() - config.system_prompt = mcp_settings - config.mcp_settings = system_prompt + config.system_prompt = system_prompt + config.mcp_settings = mcp_settings - # 获取 mcp_tools(缓存逻辑已内置到 get_tools_from_mcp 中) - mcp_tools = await get_tools_from_mcp(mcp_settings) - logger.info(f"Loaded {len(mcp_tools)} MCP tools") + workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id) + local_workspace_root = workspace_root + + # 并行执行高耗时 IO:MCP tools 加载 + Daytona sandbox 初始化 + mcp_tools_task = asyncio.create_task(get_tools_from_mcp(mcp_settings)) + sandbox_task = asyncio.create_task(asyncio.to_thread(init_daytona_sandbox, config.bot_id, local_workspace_root)) # 检测或使用指定的提供商 model_provider, base_url = detect_provider(config.model_name, config.model_server) @@ -225,7 +238,6 @@ async def init_agent(config: AgentConfig): logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}") checkpointer = None - create_start = time.time() # 从连接池获取 checkpointer(prepare_checkpoint_message 已在 from_v1/from_v2_request 中调用) if config.session_id: @@ -237,11 +249,8 @@ async def init_agent(config: AgentConfig): # 构建中间件列表 middleware = [] - # 添加空响应重试中间件(最先执行,最外层包裹) middleware.append(EmptyResponseRetryMiddleware()) - # 首先添加 ToolUseCleanupMiddleware 来清理孤立的 tool_use middleware.append(ToolUseCleanupMiddleware()) - # 添加工具输出长度控制中间件 tool_output_middleware = ToolOutputLengthMiddleware( max_length=(getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None) or TOOL_OUTPUT_MAX_LENGTH, truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart', @@ -252,26 +261,21 @@ async def init_agent(config: AgentConfig): ) middleware.append(tool_output_middleware) - # 添加 Mem0 记忆中间件(如果启用) if config.enable_memori: try: - # 确保有 user_identifier if not config.user_identifier: logger.warning("Mem0 enabled but user_identifier is missing, skipping Mem0") else: - # 获取全局 Mem0Manager(已在 fastapi_app.py 中初始化) mem0_manager = get_mem0_manager() - - # 创建 Mem0 中间件,传入现有的 llm_instance 和 config mem0_middleware = create_mem0_middleware( bot_id=config.bot_id, user_identifier=config.user_identifier, session_id=config.session_id or "default", - agent_config=config, # 传入 AgentConfig 用于中间件间传递数据 + agent_config=config, enabled=config.enable_memori, semantic_search_top_k=config.memori_semantic_search_top_k, mem0_manager=mem0_manager, - llm_instance=llm_instance, # 传入现有 LLM 实例 + llm_instance=llm_instance, ) if mem0_middleware: @@ -281,7 +285,6 @@ async def init_agent(config: AgentConfig): except Exception as e: logger.error(f"Failed to create Mem0 middleware: {e}, continuing without Mem0") - # 只有在 enable_thinking 为 True 时才添加 GuidelineMiddleware if config.enable_thinking: middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt)) @@ -293,30 +296,14 @@ async def init_agent(config: AgentConfig): token_counter=create_token_counter(config.model_name) ) middleware.append(summarization_middleware) - workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id) - # workspace_root = str(Path.home() / ".deepagents" / config.bot_id) + logger.info(f"init_agent middleware ready, elapsed: {time.time() - create_start:.3f}s") - # Daytona Sandbox 初始化 - sandbox = None - sandbox_type = None + mcp_tools = await mcp_tools_task + logger.info(f"Loaded {len(mcp_tools)} MCP tools") + logger.info(f"init_agent mcp tools ready, elapsed: {time.time() - create_start:.3f}s") - if DAYTONA_ENABLED and DAYTONA_API_KEY and DAYTONA_SERVER_URL: - try: - from daytona import Daytona, DaytonaConfig - from langchain_daytona import DaytonaSandbox - - daytona_config = DaytonaConfig( - api_key=DAYTONA_API_KEY, - api_url=DAYTONA_SERVER_URL, - ) - daytona_client = Daytona(daytona_config) - sandbox_instance = daytona_client.create() - sandbox = DaytonaSandbox(sandbox=sandbox_instance) - sandbox_type = "daytona" - logger.info(f"Daytona sandbox created: {sandbox_instance.id}") - except Exception as e: - logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode") - sandbox = None + sandbox, sandbox_type, workspace_root = await sandbox_task + logger.info(f"init_agent sandbox ready, elapsed: {time.time() - create_start:.3f}s") agent, composite_backend = create_custom_cli_agent( model=llm_instance, @@ -339,7 +326,7 @@ async def init_agent(config: AgentConfig): ) logger.info(f"create agent elapsed: {time.time() - create_start:.3f}s") - return agent, checkpointer + return agent, checkpointer, sandbox class CustomSkillsMiddleware(SkillsMiddleware): @@ -501,7 +488,7 @@ def create_custom_cli_agent( # Add skills middleware if enable_skills: - skills_sources = ["/skills"] + skills_sources = ["/workspace/skills"] agent_middleware.append( CustomSkillsMiddleware( diff --git a/agent/prompt_loader.py b/agent/prompt_loader.py index 5e5407d..b5bde4d 100644 --- a/agent/prompt_loader.py +++ b/agent/prompt_loader.py @@ -8,7 +8,7 @@ import asyncio from typing import List, Dict, Optional, Any from datetime import datetime, timezone, timedelta import logging -from utils.settings import BACKEND_HOST, MASTERKEY +from utils.settings import BACKEND_HOST, MASTERKEY, DAYTONA_ENABLED logger = logging.getLogger('app') from .plugin_hook_loader import execute_hooks, merge_skill_mcp_configs from pathlib import Path @@ -119,13 +119,14 @@ async def load_system_prompt_async(config) -> str: readme = await config_cache.get_text_file(readme_path) or "" # agent_dir_path = f"~/.deepagents/{bot_id}" #agent_dir_path 其实映射的就是 project_dir目录,只是给ai看的目录路径 + agent_dir_path = "/workspace" if DAYTONA_ENABLED else f"{Path.cwd()}/projects/robot/{config.bot_id}" prompt = system_prompt_default.format( readme=str(readme), extra_prompt=system_prompt or "", language=language_display, user_identifier=user_identifier, datetime=datetime_str, - agent_dir_path=f"{Path.cwd()}/projects/robot/{config.bot_id}", + agent_dir_path=agent_dir_path, trace_id=trace_id or "" ) diff --git a/routes/chat.py b/routes/chat.py index 87e8584..c84b51c 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -23,6 +23,8 @@ from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage, Huma from utils.settings import MAX_OUTPUT_TOKENS from agent.agent_config import AgentConfig from agent.deep_assistant import init_agent +from utils.daytona_sync import sync_sandbox_to_local +from utils.settings import DAYTONA_ENABLED router = APIRouter() @@ -87,7 +89,7 @@ async def enhanced_generate_stream_response( logger.info(f"Starting agent stream response") chunk_id = 0 message_tag = "" - agent, checkpointer = await init_agent(config) + agent, checkpointer, sandbox = await init_agent(config) async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS): # 检查是否收到取消信号 if cancel_event and cancel_event.is_set(): @@ -145,7 +147,7 @@ async def enhanced_generate_stream_response( # ============ 执行 PostAgent hooks ============ # 注意:这里在单独的异步任务中执行,不阻塞流式输出 full_response = "".join(full_response_content) - asyncio.create_task(_execute_post_agent_hooks(config, full_response)) + asyncio.create_task(_execute_post_agent_hooks(config, full_response, sandbox)) # =========================================== await output_queue.put(("agent_done", None)) @@ -261,13 +263,13 @@ async def create_agent_and_generate_response( headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} ) - agent, checkpointer = await init_agent(config) + agent, checkpointer, sandbox = await init_agent(config) # 使用更新后的 messages agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS) # ============ 执行 PostAgent hooks ============ # 注意:这里在非流式模式下同步执行hooks - await _execute_post_agent_hooks(config, "") + await _execute_post_agent_hooks(config, "", sandbox) # =========================================== # 从后往前找第一个 HumanMessage,之后的内容都给 append_messages @@ -409,13 +411,14 @@ async def _save_assistant_response(config: AgentConfig, assistant_response: str) logger.error(f"Failed to save assistant response: {e}") -async def _execute_post_agent_hooks(config: AgentConfig, response: str) -> None: +async def _execute_post_agent_hooks(config: AgentConfig, response: str, sandbox=None) -> None: """ 执行 PostAgent hooks(在agent执行后) Args: config: AgentConfig 对象 response: Agent 的完整响应内容 + sandbox: DaytonaSandbox 实例(可选),用于反向同步文件 """ try: from agent.plugin_hook_loader import execute_hooks @@ -436,6 +439,15 @@ async def _execute_post_agent_hooks(config: AgentConfig, response: str) -> None: # 清理 executable_code/tmp 文件夹 await _cleanup_tmp_folder(config) + # Daytona: 反向同步 sandbox 文件到本地 + if sandbox is not None and DAYTONA_ENABLED: + try: + from pathlib import Path + local_workspace = str(Path.cwd() / "projects" / "robot" / config.bot_id) + sync_sandbox_to_local(sandbox, local_workspace) + except Exception as e: + logger.error(f"Failed to sync sandbox to local: {e}") + async def _cleanup_tmp_folder(config: AgentConfig) -> None: """ diff --git a/utils/daytona_sync.py b/utils/daytona_sync.py new file mode 100644 index 0000000..753ef03 --- /dev/null +++ b/utils/daytona_sync.py @@ -0,0 +1,204 @@ +"""Daytona sandbox 双向文件同步工具。""" + +import io +import logging +import subprocess +import tarfile +import time +from pathlib import Path + +from utils.settings import DAYTONA_API_KEY, DAYTONA_SERVER_URL, DAYTONA_ENABLED + +logger = logging.getLogger('app') + + +def _list_local_changed_files(workspace_path: Path) -> tuple[bool, list[str]]: + """返回是否需要首次同步,以及本地增量变更文件列表。""" + marker_local = workspace_path / ".last_sync" + if not marker_local.exists(): + return True, [] + + result = subprocess.run( + [ + "find", str(workspace_path), "-newer", str(marker_local), "-type", "f", + "-not", "-name", ".last_sync", "-not", "-name", ".DS_Store", + ], + capture_output=True, + text=True, + timeout=30, + ) + changed_files = [f for f in result.stdout.strip().split('\n') if f] + return False, changed_files + + +def init_daytona_sandbox(bot_id: str, local_workspace_root: str): + """初始化 Daytona sandbox,失败时回退到本地模式。""" + sandbox = None + sandbox_type = None + workspace_root = local_workspace_root + + if not (DAYTONA_ENABLED and DAYTONA_API_KEY and DAYTONA_SERVER_URL): + return sandbox, sandbox_type, workspace_root + + try: + from daytona import Daytona, DaytonaConfig, VolumeMount, CreateSandboxFromSnapshotParams + from langchain_daytona import DaytonaSandbox + + start_time = time.time() + daytona_config = DaytonaConfig( + api_key=DAYTONA_API_KEY, + api_url=DAYTONA_SERVER_URL, + ) + daytona_client = Daytona(daytona_config) + + sandbox_name = f"bot-{bot_id}" + sandbox_instance = None + created_new_sandbox = False + try: + existing = daytona_client.get(sandbox_name) + if existing.state in ("Started", "Creating"): + sandbox_instance = existing + logger.info(f"Reusing existing sandbox: {sandbox_instance.id} (state={existing.state})") + else: + existing.start() + sandbox_instance = existing + logger.info(f"Restarted existing sandbox: {sandbox_instance.id}") + except Exception: + volume_name = f"bot-{bot_id}" + volume = daytona_client.volume.get(volume_name, create=True) + + for _ in range(30): + volume = daytona_client.volume.get(volume_name) + if "READY" in str(volume.state).upper(): + break + time.sleep(1) + else: + raise RuntimeError(f"Volume {volume_name} not ready after 30s, state: {volume.state}") + + sandbox_params = CreateSandboxFromSnapshotParams( + name=sandbox_name, + volumes=[VolumeMount(volume_id=volume.id, mount_path="/workspace")], + env_vars={"BASH_ENV": "/home/daytona/.bash_env"}, + ) + sandbox_instance = daytona_client.create(sandbox_params) + created_new_sandbox = True + logger.info(f"Created new sandbox: {sandbox_instance.id}, volume: {volume.id}") + + logger.info(f"daytona get/start done, elapsed: {time.time() - start_time:.3f}s") + + sandbox = DaytonaSandbox(sandbox=sandbox_instance) + sandbox_type = "daytona" + workspace_root = "/workspace" + + sync_workspace_to_sandbox(sandbox, local_workspace_root) + logger.info(f"daytona sync done, elapsed: {time.time() - start_time:.3f}s") + + if created_new_sandbox: + sandbox.execute("test -f /home/daytona/.bash_env || echo 'cd /workspace' > /home/daytona/.bash_env") + logger.info(f"daytona bash_env done, elapsed: {time.time() - start_time:.3f}s") + except Exception as e: + logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode") + sandbox = None + sandbox_type = None + workspace_root = local_workspace_root + + return sandbox, sandbox_type, workspace_root + + +def sync_workspace_to_sandbox(sandbox, workspace_root: str) -> None: + """增量同步本地 workspace 到 Daytona sandbox。 + + 基于 .last_sync 时间戳标记: + - 首次(无标记文件):全量同步 + - 后续:只同步比标记更新的文件 + + Args: + sandbox: DaytonaSandbox 实例 + workspace_root: 本地 workspace 目录路径 + """ + workspace_path = Path(workspace_root) + if not workspace_path.exists() or not any(workspace_path.iterdir()): + return + + is_first_sync, changed_files = _list_local_changed_files(workspace_path) + if not is_first_sync and not changed_files: + logger.info("No local file changes to sync") + return + + if is_first_sync: + check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no") + if "yes" in check.output: + logger.info("Local marker missing but sandbox already synced, refreshing local marker") + (workspace_path / ".last_sync").touch() + return + + logger.info("First sync: uploading all workspace files...") + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode='w:gz') as tar: + for item in workspace_path.iterdir(): + if item.name in ('.DS_Store', '.last_sync'): + continue + tar.add(str(item), arcname=item.name) + buf.seek(0) + sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace.tar.gz") + sandbox.execute("cd /workspace && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz") + sandbox.execute("echo 'cd /workspace' > /home/daytona/.bash_env") + logger.info("Full sync complete") + else: + logger.info(f"Incremental sync: {len(changed_files)} changed files") + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode='w:gz') as tar: + for fpath in changed_files: + arcname = str(Path(fpath).relative_to(workspace_path)) + tar.add(fpath, arcname=arcname) + buf.seek(0) + sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace_inc.tar.gz") + sandbox.execute("cd /workspace && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz") + logger.info(f"Incremental sync complete: {len(changed_files)} files") + + sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync") + (workspace_path / ".last_sync").touch() + + +def sync_sandbox_to_local(sandbox, workspace_root: str) -> None: + """Agent 执行完成后,将 sandbox 中的变更文件同步回本地。 + + 基于 /workspace/.last_sync 时间戳,找 sandbox 中更新的文件并下载。 + + Args: + sandbox: DaytonaSandbox 实例 + workspace_root: 本地 workspace 目录路径 + """ + workspace_path = Path(workspace_root) + workspace_path.mkdir(parents=True, exist_ok=True) + + check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no") + if "no" in check.output: + logger.info("No .last_sync in sandbox, skipping reverse sync") + return + + result = sandbox.execute( + "find /workspace -newer /workspace/.last_sync -type f " + "-not -name '.last_sync' -not -name '.DS_Store' " + "-not -path '/workspace/.daytona*' 2>/dev/null" + ) + changed_files = [f for f in result.output.strip().split('\n') if f and f != '/workspace'] + if not changed_files: + logger.info("No sandbox file changes to sync back") + return + + logger.info(f"Reverse sync: {len(changed_files)} changed files from sandbox") + rel_files = [f.removeprefix("/workspace/") for f in changed_files] + file_list = " ".join(f"'{f}'" for f in rel_files) + sandbox.execute(f"cd /workspace && tar -czf /tmp/sync_back.tar.gz {file_list}") + + tar_data = sandbox._sandbox.fs.download_file("/tmp/sync_back.tar.gz") + sandbox.execute("rm -f /tmp/sync_back.tar.gz") + + buf = io.BytesIO(tar_data) + with tarfile.open(fileobj=buf, mode='r:gz') as tar: + tar.extractall(path=str(workspace_path)) + + sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync") + (workspace_path / ".last_sync").touch() + logger.info(f"Reverse sync complete: {len(changed_files)} files downloaded") diff --git a/utils/settings.py b/utils/settings.py index dc84165..5594dba 100644 --- a/utils/settings.py +++ b/utils/settings.py @@ -104,7 +104,7 @@ SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # DAYTONA_API_KEY = os.getenv("DAYTONA_API_KEY", "dtn_1c888ed16ec448b965e2e07afd75d69f8e0dd38efbad47744f9de49fcf7b7e2a") # DAYTONA_SERVER_URL = os.getenv("DAYTONA_SERVER_URL", "https://daytona.45.66.216.154.nip.io/api") -DAYTONA_API_KEY = os.getenv("DAYTONA_API_KEY", "dtn_fd3ad4bf548de9fb61d3aa24b5d73bf4c0cafc3de568233d88698821b3e687c0") +DAYTONA_API_KEY = os.getenv("DAYTONA_API_KEY", "dtn_696a914ff54e45bb97132c32fba10995a4cab8ebef0cd8dea18129d447f805a3") DAYTONA_SERVER_URL = os.getenv("DAYTONA_SERVER_URL", "https://app.daytona.io/api") DAYTONA_ENABLED = os.getenv("DAYTONA_ENABLED", "false") == "true"