"""Daytona sandbox bidirectional file sync tool.""" import io import logging import subprocess import tarfile import time from pathlib import Path from typing import Any from utils.settings import DAYTONA_API_KEY, DAYTONA_SERVER_URL, DAYTONA_ENABLED logger = logging.getLogger('app') LOCAL_MARKER_NAME = ".last_sync" REMOTE_WORKSPACE_ROOT = "/workspace" REMOTE_MARKER_PATH = f"{REMOTE_WORKSPACE_ROOT}/{LOCAL_MARKER_NAME}" REMOTE_BASH_ENV_PATH = "/home/daytona/.bash_env" EXCLUDED_FILE_NAMES = {".DS_Store", LOCAL_MARKER_NAME} CHECK_REMOTE_MARKER_CMD = f"test -f {REMOTE_MARKER_PATH} && echo yes || echo no" UPDATE_REMOTE_MARKER_CMD = f"date +%Y%m%d%H%M.%S > {REMOTE_MARKER_PATH}" ENSURE_BASH_ENV_CMD = f"test -f {REMOTE_BASH_ENV_PATH} || echo 'cd {REMOTE_WORKSPACE_ROOT}' > {REMOTE_BASH_ENV_PATH}" WRITE_BASH_ENV_CMD = f"echo 'cd {REMOTE_WORKSPACE_ROOT}' > {REMOTE_BASH_ENV_PATH}" def _local_marker_path(workspace_path: Path) -> Path: return workspace_path / LOCAL_MARKER_NAME def _touch_local_marker(workspace_path: Path) -> None: _local_marker_path(workspace_path).touch() def _list_local_changed_files(workspace_path: Path) -> tuple[bool, list[str]]: """Return whether a first sync is needed, and the list of locally changed files.""" marker_local = _local_marker_path(workspace_path) if not marker_local.exists(): return True, [] result = subprocess.run( [ "find", str(workspace_path), "-newer", str(marker_local), "(", "-type", "f", "-o", "-type", "l", ")", "-not", "-name", LOCAL_MARKER_NAME, "-not", "-name", ".DS_Store", ], capture_output=True, text=True, timeout=30, ) changed_files = [f for f in result.stdout.strip().split("\n") if f] return False, changed_files def _tar_workspace_entries(workspace_path: Path, entries: list[Path]) -> bytes: buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:gz") as tar: for entry in entries: if entry.is_absolute(): tar.add(str(entry), arcname=entry.relative_to(workspace_path).as_posix(), dereference=True) else: tar.add(str(workspace_path / entry), arcname=entry.as_posix(), dereference=True) buf.seek(0) return buf.read() def _workspace_items_for_full_sync(workspace_path: Path) -> list[Path]: return [item for item in workspace_path.iterdir() if item.name not in EXCLUDED_FILE_NAMES] def _extract_tar_to_path(tar_data: bytes, workspace_path: Path) -> None: buf = io.BytesIO(tar_data) with tarfile.open(fileobj=buf, mode="r:gz") as tar: tar.extractall(path=str(workspace_path), filter="data") def init_daytona_sandbox(bot_id: str, local_workspace_root: str) -> tuple[Any, str | None, str]: """Initialize Daytona sandbox, falling back to local mode on failure.""" sandbox = None sandbox_type = None workspace_root = local_workspace_root if not (DAYTONA_ENABLED and DAYTONA_API_KEY and DAYTONA_SERVER_URL): return sandbox, sandbox_type, workspace_root try: from daytona import Daytona, DaytonaConfig, VolumeMount, CreateSandboxFromSnapshotParams from langchain_daytona import DaytonaSandbox start_time = time.time() daytona_config = DaytonaConfig( api_key=DAYTONA_API_KEY, api_url=DAYTONA_SERVER_URL, ) daytona_client = Daytona(daytona_config) sandbox_name = f"bot-{bot_id}" sandbox_instance = None created_new_sandbox = False try: existing = daytona_client.get(sandbox_name) if existing.state in ("Started", "Creating"): sandbox_instance = existing logger.info(f"Reusing existing sandbox: {sandbox_instance.id} (state={existing.state})") else: existing.start() sandbox_instance = existing logger.info(f"Restarted existing sandbox: {sandbox_instance.id}") except Exception: volume_name = f"bot-{bot_id}" volume = daytona_client.volume.get(volume_name, create=True) for _ in range(30): volume = daytona_client.volume.get(volume_name) if "READY" in str(volume.state).upper(): break time.sleep(1) else: raise RuntimeError(f"Volume {volume_name} not ready after 30s, state: {volume.state}") sandbox_params = CreateSandboxFromSnapshotParams( name=sandbox_name, volumes=[VolumeMount(volume_id=volume.id, mount_path=REMOTE_WORKSPACE_ROOT)], env_vars={"BASH_ENV": REMOTE_BASH_ENV_PATH}, ) sandbox_instance = daytona_client.create(sandbox_params) created_new_sandbox = True logger.info(f"Created new sandbox: {sandbox_instance.id}, volume: {volume.id}") logger.info(f"daytona get/start done, elapsed: {time.time() - start_time:.3f}s") sandbox = DaytonaSandbox(sandbox=sandbox_instance) sandbox_type = "daytona" workspace_root = REMOTE_WORKSPACE_ROOT sync_workspace_to_sandbox(sandbox, local_workspace_root) logger.info(f"daytona sync done, elapsed: {time.time() - start_time:.3f}s") if created_new_sandbox: sandbox.execute(ENSURE_BASH_ENV_CMD) logger.info(f"daytona bash_env done, elapsed: {time.time() - start_time:.3f}s") except Exception as e: logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode") sandbox = None sandbox_type = None workspace_root = local_workspace_root return sandbox, sandbox_type, workspace_root def sync_workspace_to_sandbox(sandbox: Any, workspace_root: str) -> None: """Incrementally sync local workspace to Daytona sandbox.""" workspace_path = Path(workspace_root) if not workspace_path.exists() or not any(workspace_path.iterdir()): return is_first_sync, changed_files = _list_local_changed_files(workspace_path) if not is_first_sync and not changed_files: logger.info("No local file changes to sync") return if is_first_sync: check = sandbox.execute(CHECK_REMOTE_MARKER_CMD) if "yes" in check.output: logger.info("Local marker missing but sandbox already synced, refreshing local marker") _touch_local_marker(workspace_path) return logger.info("First sync: uploading all workspace files...") tar_data = _tar_workspace_entries(workspace_path, _workspace_items_for_full_sync(workspace_path)) sandbox._sandbox.fs.upload_file(tar_data, "/tmp/workspace.tar.gz") sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz") sandbox.execute(WRITE_BASH_ENV_CMD) logger.info("Full sync complete") else: logger.info(f"Incremental sync: {len(changed_files)} changed files") rel_paths = [Path(fpath).relative_to(workspace_path) for fpath in changed_files] tar_data = _tar_workspace_entries(workspace_path, rel_paths) sandbox._sandbox.fs.upload_file(tar_data, "/tmp/workspace_inc.tar.gz") sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz") logger.info(f"Incremental sync complete: {len(changed_files)} files") sandbox.execute(UPDATE_REMOTE_MARKER_CMD) _touch_local_marker(workspace_path) def sync_sandbox_to_local(sandbox: Any, workspace_root: str) -> None: """After agent execution, sync changed files from sandbox back to local.""" workspace_path = Path(workspace_root) workspace_path.mkdir(parents=True, exist_ok=True) check = sandbox.execute(CHECK_REMOTE_MARKER_CMD) if "no" in check.output: logger.info("No .last_sync in sandbox, skipping reverse sync") return result = sandbox.execute( f"find {REMOTE_WORKSPACE_ROOT} -newer {REMOTE_MARKER_PATH} -type f " f"-not -name '{LOCAL_MARKER_NAME}' -not -name '.DS_Store' " f"-not -path '{REMOTE_WORKSPACE_ROOT}/.daytona*' 2>/dev/null" ) changed_files = [f for f in result.output.strip().split("\n") if f and f != REMOTE_WORKSPACE_ROOT] if not changed_files: logger.info("No sandbox file changes to sync back") return logger.info(f"Reverse sync: {len(changed_files)} changed files from sandbox") rel_files = [f.removeprefix(f"{REMOTE_WORKSPACE_ROOT}/") for f in changed_files] file_list = " ".join(f"'{f}'" for f in rel_files) sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -czf /tmp/sync_back.tar.gz {file_list}") tar_data = sandbox._sandbox.fs.download_file("/tmp/sync_back.tar.gz") sandbox.execute("rm -f /tmp/sync_back.tar.gz") _extract_tar_to_path(tar_data, workspace_path) sandbox.execute(UPDATE_REMOTE_MARKER_CMD) _touch_local_marker(workspace_path) logger.info(f"Reverse sync complete: {len(changed_files)} files downloaded")