daytona support

This commit is contained in:
朱潮 2026-04-23 15:09:09 +08:00
parent c9e07898fc
commit 8446dab1e4
5 changed files with 256 additions and 52 deletions

View File

@ -4,6 +4,7 @@ import time
import copy
import os
import tempfile
import asyncio
from pathlib import Path
from typing import Any, Dict
from langchain.chat_models import init_chat_model
@ -173,6 +174,10 @@ async def get_tools_from_mcp(mcp):
logger.error(f"get_tools_from_mcp: error {e}, elapsed: {time.time() - start_time:.3f}s")
return []
from utils.daytona_sync import init_daytona_sandbox, sync_sandbox_to_local
async def init_agent(config: AgentConfig):
"""
初始化 Agent支持持久化内存和对话摘要
@ -187,19 +192,27 @@ async def init_agent(config: AgentConfig):
(agent, checkpointer) 元组
"""
# 加载配置
final_system_prompt = await load_system_prompt_async(config)
final_mcp_settings = await load_mcp_settings_async(config)
create_start = time.time()
# 并行加载配置
final_system_prompt, final_mcp_settings = await asyncio.gather(
load_system_prompt_async(config),
load_mcp_settings_async(config),
)
logger.info(f"init_agent config loaded, elapsed: {time.time() - create_start:.3f}s")
mcp_settings = final_mcp_settings if final_mcp_settings else []
system_prompt = final_system_prompt if final_system_prompt else read_system_prompt()
config.system_prompt = mcp_settings
config.mcp_settings = system_prompt
config.system_prompt = system_prompt
config.mcp_settings = mcp_settings
# 获取 mcp_tools缓存逻辑已内置到 get_tools_from_mcp 中)
mcp_tools = await get_tools_from_mcp(mcp_settings)
logger.info(f"Loaded {len(mcp_tools)} MCP tools")
workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id)
local_workspace_root = workspace_root
# 并行执行高耗时 IOMCP tools 加载 + Daytona sandbox 初始化
mcp_tools_task = asyncio.create_task(get_tools_from_mcp(mcp_settings))
sandbox_task = asyncio.create_task(asyncio.to_thread(init_daytona_sandbox, config.bot_id, local_workspace_root))
# 检测或使用指定的提供商
model_provider, base_url = detect_provider(config.model_name, config.model_server)
@ -225,7 +238,6 @@ async def init_agent(config: AgentConfig):
logger.info(f"Creating new agent for session: {getattr(config, 'session_id', 'no-session')}")
checkpointer = None
create_start = time.time()
# 从连接池获取 checkpointerprepare_checkpoint_message 已在 from_v1/from_v2_request 中调用)
if config.session_id:
@ -237,11 +249,8 @@ async def init_agent(config: AgentConfig):
# 构建中间件列表
middleware = []
# 添加空响应重试中间件(最先执行,最外层包裹)
middleware.append(EmptyResponseRetryMiddleware())
# 首先添加 ToolUseCleanupMiddleware 来清理孤立的 tool_use
middleware.append(ToolUseCleanupMiddleware())
# 添加工具输出长度控制中间件
tool_output_middleware = ToolOutputLengthMiddleware(
max_length=(getattr(config.generate_cfg, 'tool_output_max_length', None) if config.generate_cfg else None) or TOOL_OUTPUT_MAX_LENGTH,
truncation_strategy=getattr(config.generate_cfg, 'tool_output_truncation_strategy', 'smart') if config.generate_cfg else 'smart',
@ -252,26 +261,21 @@ async def init_agent(config: AgentConfig):
)
middleware.append(tool_output_middleware)
# 添加 Mem0 记忆中间件(如果启用)
if config.enable_memori:
try:
# 确保有 user_identifier
if not config.user_identifier:
logger.warning("Mem0 enabled but user_identifier is missing, skipping Mem0")
else:
# 获取全局 Mem0Manager已在 fastapi_app.py 中初始化)
mem0_manager = get_mem0_manager()
# 创建 Mem0 中间件,传入现有的 llm_instance 和 config
mem0_middleware = create_mem0_middleware(
bot_id=config.bot_id,
user_identifier=config.user_identifier,
session_id=config.session_id or "default",
agent_config=config, # 传入 AgentConfig 用于中间件间传递数据
agent_config=config,
enabled=config.enable_memori,
semantic_search_top_k=config.memori_semantic_search_top_k,
mem0_manager=mem0_manager,
llm_instance=llm_instance, # 传入现有 LLM 实例
llm_instance=llm_instance,
)
if mem0_middleware:
@ -281,7 +285,6 @@ async def init_agent(config: AgentConfig):
except Exception as e:
logger.error(f"Failed to create Mem0 middleware: {e}, continuing without Mem0")
# 只有在 enable_thinking 为 True 时才添加 GuidelineMiddleware
if config.enable_thinking:
middleware.append(GuidelineMiddleware(llm_instance, config, system_prompt))
@ -293,30 +296,14 @@ async def init_agent(config: AgentConfig):
token_counter=create_token_counter(config.model_name)
)
middleware.append(summarization_middleware)
workspace_root = str(Path.cwd() / "projects" /"robot"/ config.bot_id)
# workspace_root = str(Path.home() / ".deepagents" / config.bot_id)
logger.info(f"init_agent middleware ready, elapsed: {time.time() - create_start:.3f}s")
# Daytona Sandbox 初始化
sandbox = None
sandbox_type = None
mcp_tools = await mcp_tools_task
logger.info(f"Loaded {len(mcp_tools)} MCP tools")
logger.info(f"init_agent mcp tools ready, elapsed: {time.time() - create_start:.3f}s")
if DAYTONA_ENABLED and DAYTONA_API_KEY and DAYTONA_SERVER_URL:
try:
from daytona import Daytona, DaytonaConfig
from langchain_daytona import DaytonaSandbox
daytona_config = DaytonaConfig(
api_key=DAYTONA_API_KEY,
api_url=DAYTONA_SERVER_URL,
)
daytona_client = Daytona(daytona_config)
sandbox_instance = daytona_client.create()
sandbox = DaytonaSandbox(sandbox=sandbox_instance)
sandbox_type = "daytona"
logger.info(f"Daytona sandbox created: {sandbox_instance.id}")
except Exception as e:
logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode")
sandbox = None
sandbox, sandbox_type, workspace_root = await sandbox_task
logger.info(f"init_agent sandbox ready, elapsed: {time.time() - create_start:.3f}s")
agent, composite_backend = create_custom_cli_agent(
model=llm_instance,
@ -339,7 +326,7 @@ async def init_agent(config: AgentConfig):
)
logger.info(f"create agent elapsed: {time.time() - create_start:.3f}s")
return agent, checkpointer
return agent, checkpointer, sandbox
class CustomSkillsMiddleware(SkillsMiddleware):
@ -501,7 +488,7 @@ def create_custom_cli_agent(
# Add skills middleware
if enable_skills:
skills_sources = ["/skills"]
skills_sources = ["/workspace/skills"]
agent_middleware.append(
CustomSkillsMiddleware(

View File

@ -8,7 +8,7 @@ import asyncio
from typing import List, Dict, Optional, Any
from datetime import datetime, timezone, timedelta
import logging
from utils.settings import BACKEND_HOST, MASTERKEY
from utils.settings import BACKEND_HOST, MASTERKEY, DAYTONA_ENABLED
logger = logging.getLogger('app')
from .plugin_hook_loader import execute_hooks, merge_skill_mcp_configs
from pathlib import Path
@ -119,13 +119,14 @@ async def load_system_prompt_async(config) -> str:
readme = await config_cache.get_text_file(readme_path) or ""
# agent_dir_path = f"~/.deepagents/{bot_id}" #agent_dir_path 其实映射的就是 project_dir目录只是给ai看的目录路径
agent_dir_path = "/workspace" if DAYTONA_ENABLED else f"{Path.cwd()}/projects/robot/{config.bot_id}"
prompt = system_prompt_default.format(
readme=str(readme),
extra_prompt=system_prompt or "",
language=language_display,
user_identifier=user_identifier,
datetime=datetime_str,
agent_dir_path=f"{Path.cwd()}/projects/robot/{config.bot_id}",
agent_dir_path=agent_dir_path,
trace_id=trace_id or ""
)

View File

@ -23,6 +23,8 @@ from langchain_core.messages import AIMessageChunk, ToolMessage, AIMessage, Huma
from utils.settings import MAX_OUTPUT_TOKENS
from agent.agent_config import AgentConfig
from agent.deep_assistant import init_agent
from utils.daytona_sync import sync_sandbox_to_local
from utils.settings import DAYTONA_ENABLED
router = APIRouter()
@ -87,7 +89,7 @@ async def enhanced_generate_stream_response(
logger.info(f"Starting agent stream response")
chunk_id = 0
message_tag = ""
agent, checkpointer = await init_agent(config)
agent, checkpointer, sandbox = await init_agent(config)
async for msg, metadata in agent.astream({"messages": config.messages}, stream_mode="messages", config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS):
# 检查是否收到取消信号
if cancel_event and cancel_event.is_set():
@ -145,7 +147,7 @@ async def enhanced_generate_stream_response(
# ============ 执行 PostAgent hooks ============
# 注意:这里在单独的异步任务中执行,不阻塞流式输出
full_response = "".join(full_response_content)
asyncio.create_task(_execute_post_agent_hooks(config, full_response))
asyncio.create_task(_execute_post_agent_hooks(config, full_response, sandbox))
# ===========================================
await output_queue.put(("agent_done", None))
@ -261,13 +263,13 @@ async def create_agent_and_generate_response(
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
agent, checkpointer = await init_agent(config)
agent, checkpointer, sandbox = await init_agent(config)
# 使用更新后的 messages
agent_responses = await agent.ainvoke({"messages": config.messages}, config=config.invoke_config(), max_tokens=MAX_OUTPUT_TOKENS)
# ============ 执行 PostAgent hooks ============
# 注意这里在非流式模式下同步执行hooks
await _execute_post_agent_hooks(config, "")
await _execute_post_agent_hooks(config, "", sandbox)
# ===========================================
# 从后往前找第一个 HumanMessage之后的内容都给 append_messages
@ -409,13 +411,14 @@ async def _save_assistant_response(config: AgentConfig, assistant_response: str)
logger.error(f"Failed to save assistant response: {e}")
async def _execute_post_agent_hooks(config: AgentConfig, response: str) -> None:
async def _execute_post_agent_hooks(config: AgentConfig, response: str, sandbox=None) -> None:
"""
执行 PostAgent hooks在agent执行后
Args:
config: AgentConfig 对象
response: Agent 的完整响应内容
sandbox: DaytonaSandbox 实例可选用于反向同步文件
"""
try:
from agent.plugin_hook_loader import execute_hooks
@ -436,6 +439,15 @@ async def _execute_post_agent_hooks(config: AgentConfig, response: str) -> None:
# 清理 executable_code/tmp 文件夹
await _cleanup_tmp_folder(config)
# Daytona: 反向同步 sandbox 文件到本地
if sandbox is not None and DAYTONA_ENABLED:
try:
from pathlib import Path
local_workspace = str(Path.cwd() / "projects" / "robot" / config.bot_id)
sync_sandbox_to_local(sandbox, local_workspace)
except Exception as e:
logger.error(f"Failed to sync sandbox to local: {e}")
async def _cleanup_tmp_folder(config: AgentConfig) -> None:
"""

204
utils/daytona_sync.py Normal file
View File

@ -0,0 +1,204 @@
"""Daytona sandbox 双向文件同步工具。"""
import io
import logging
import subprocess
import tarfile
import time
from pathlib import Path
from utils.settings import DAYTONA_API_KEY, DAYTONA_SERVER_URL, DAYTONA_ENABLED
logger = logging.getLogger('app')
def _list_local_changed_files(workspace_path: Path) -> tuple[bool, list[str]]:
"""返回是否需要首次同步,以及本地增量变更文件列表。"""
marker_local = workspace_path / ".last_sync"
if not marker_local.exists():
return True, []
result = subprocess.run(
[
"find", str(workspace_path), "-newer", str(marker_local), "-type", "f",
"-not", "-name", ".last_sync", "-not", "-name", ".DS_Store",
],
capture_output=True,
text=True,
timeout=30,
)
changed_files = [f for f in result.stdout.strip().split('\n') if f]
return False, changed_files
def init_daytona_sandbox(bot_id: str, local_workspace_root: str):
"""初始化 Daytona sandbox失败时回退到本地模式。"""
sandbox = None
sandbox_type = None
workspace_root = local_workspace_root
if not (DAYTONA_ENABLED and DAYTONA_API_KEY and DAYTONA_SERVER_URL):
return sandbox, sandbox_type, workspace_root
try:
from daytona import Daytona, DaytonaConfig, VolumeMount, CreateSandboxFromSnapshotParams
from langchain_daytona import DaytonaSandbox
start_time = time.time()
daytona_config = DaytonaConfig(
api_key=DAYTONA_API_KEY,
api_url=DAYTONA_SERVER_URL,
)
daytona_client = Daytona(daytona_config)
sandbox_name = f"bot-{bot_id}"
sandbox_instance = None
created_new_sandbox = False
try:
existing = daytona_client.get(sandbox_name)
if existing.state in ("Started", "Creating"):
sandbox_instance = existing
logger.info(f"Reusing existing sandbox: {sandbox_instance.id} (state={existing.state})")
else:
existing.start()
sandbox_instance = existing
logger.info(f"Restarted existing sandbox: {sandbox_instance.id}")
except Exception:
volume_name = f"bot-{bot_id}"
volume = daytona_client.volume.get(volume_name, create=True)
for _ in range(30):
volume = daytona_client.volume.get(volume_name)
if "READY" in str(volume.state).upper():
break
time.sleep(1)
else:
raise RuntimeError(f"Volume {volume_name} not ready after 30s, state: {volume.state}")
sandbox_params = CreateSandboxFromSnapshotParams(
name=sandbox_name,
volumes=[VolumeMount(volume_id=volume.id, mount_path="/workspace")],
env_vars={"BASH_ENV": "/home/daytona/.bash_env"},
)
sandbox_instance = daytona_client.create(sandbox_params)
created_new_sandbox = True
logger.info(f"Created new sandbox: {sandbox_instance.id}, volume: {volume.id}")
logger.info(f"daytona get/start done, elapsed: {time.time() - start_time:.3f}s")
sandbox = DaytonaSandbox(sandbox=sandbox_instance)
sandbox_type = "daytona"
workspace_root = "/workspace"
sync_workspace_to_sandbox(sandbox, local_workspace_root)
logger.info(f"daytona sync done, elapsed: {time.time() - start_time:.3f}s")
if created_new_sandbox:
sandbox.execute("test -f /home/daytona/.bash_env || echo 'cd /workspace' > /home/daytona/.bash_env")
logger.info(f"daytona bash_env done, elapsed: {time.time() - start_time:.3f}s")
except Exception as e:
logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode")
sandbox = None
sandbox_type = None
workspace_root = local_workspace_root
return sandbox, sandbox_type, workspace_root
def sync_workspace_to_sandbox(sandbox, workspace_root: str) -> None:
"""增量同步本地 workspace 到 Daytona sandbox。
基于 .last_sync 时间戳标记
- 首次无标记文件全量同步
- 后续只同步比标记更新的文件
Args:
sandbox: DaytonaSandbox 实例
workspace_root: 本地 workspace 目录路径
"""
workspace_path = Path(workspace_root)
if not workspace_path.exists() or not any(workspace_path.iterdir()):
return
is_first_sync, changed_files = _list_local_changed_files(workspace_path)
if not is_first_sync and not changed_files:
logger.info("No local file changes to sync")
return
if is_first_sync:
check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no")
if "yes" in check.output:
logger.info("Local marker missing but sandbox already synced, refreshing local marker")
(workspace_path / ".last_sync").touch()
return
logger.info("First sync: uploading all workspace files...")
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode='w:gz') as tar:
for item in workspace_path.iterdir():
if item.name in ('.DS_Store', '.last_sync'):
continue
tar.add(str(item), arcname=item.name)
buf.seek(0)
sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace.tar.gz")
sandbox.execute("cd /workspace && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz")
sandbox.execute("echo 'cd /workspace' > /home/daytona/.bash_env")
logger.info("Full sync complete")
else:
logger.info(f"Incremental sync: {len(changed_files)} changed files")
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode='w:gz') as tar:
for fpath in changed_files:
arcname = str(Path(fpath).relative_to(workspace_path))
tar.add(fpath, arcname=arcname)
buf.seek(0)
sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace_inc.tar.gz")
sandbox.execute("cd /workspace && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz")
logger.info(f"Incremental sync complete: {len(changed_files)} files")
sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync")
(workspace_path / ".last_sync").touch()
def sync_sandbox_to_local(sandbox, workspace_root: str) -> None:
"""Agent 执行完成后,将 sandbox 中的变更文件同步回本地。
基于 /workspace/.last_sync 时间戳 sandbox 中更新的文件并下载
Args:
sandbox: DaytonaSandbox 实例
workspace_root: 本地 workspace 目录路径
"""
workspace_path = Path(workspace_root)
workspace_path.mkdir(parents=True, exist_ok=True)
check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no")
if "no" in check.output:
logger.info("No .last_sync in sandbox, skipping reverse sync")
return
result = sandbox.execute(
"find /workspace -newer /workspace/.last_sync -type f "
"-not -name '.last_sync' -not -name '.DS_Store' "
"-not -path '/workspace/.daytona*' 2>/dev/null"
)
changed_files = [f for f in result.output.strip().split('\n') if f and f != '/workspace']
if not changed_files:
logger.info("No sandbox file changes to sync back")
return
logger.info(f"Reverse sync: {len(changed_files)} changed files from sandbox")
rel_files = [f.removeprefix("/workspace/") for f in changed_files]
file_list = " ".join(f"'{f}'" for f in rel_files)
sandbox.execute(f"cd /workspace && tar -czf /tmp/sync_back.tar.gz {file_list}")
tar_data = sandbox._sandbox.fs.download_file("/tmp/sync_back.tar.gz")
sandbox.execute("rm -f /tmp/sync_back.tar.gz")
buf = io.BytesIO(tar_data)
with tarfile.open(fileobj=buf, mode='r:gz') as tar:
tar.extractall(path=str(workspace_path))
sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync")
(workspace_path / ".last_sync").touch()
logger.info(f"Reverse sync complete: {len(changed_files)} files downloaded")

View File

@ -104,7 +104,7 @@ SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5"))
# DAYTONA_API_KEY = os.getenv("DAYTONA_API_KEY", "dtn_1c888ed16ec448b965e2e07afd75d69f8e0dd38efbad47744f9de49fcf7b7e2a")
# DAYTONA_SERVER_URL = os.getenv("DAYTONA_SERVER_URL", "https://daytona.45.66.216.154.nip.io/api")
DAYTONA_API_KEY = os.getenv("DAYTONA_API_KEY", "dtn_fd3ad4bf548de9fb61d3aa24b5d73bf4c0cafc3de568233d88698821b3e687c0")
DAYTONA_API_KEY = os.getenv("DAYTONA_API_KEY", "dtn_696a914ff54e45bb97132c32fba10995a4cab8ebef0cd8dea18129d447f805a3")
DAYTONA_SERVER_URL = os.getenv("DAYTONA_SERVER_URL", "https://app.daytona.io/api")
DAYTONA_ENABLED = os.getenv("DAYTONA_ENABLED", "false") == "true"