From ac32ef8d0b1a786cd0c70f14c2e1c66b002a4c36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Thu, 23 Apr 2026 16:38:44 +0800 Subject: [PATCH] update utils/daytona_sync.py --- utils/daytona_sync.py | 151 ++++++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 64 deletions(-) diff --git a/utils/daytona_sync.py b/utils/daytona_sync.py index 753ef03..829808a 100644 --- a/utils/daytona_sync.py +++ b/utils/daytona_sync.py @@ -6,32 +6,83 @@ import subprocess import tarfile import time from pathlib import Path +from typing import Any from utils.settings import DAYTONA_API_KEY, DAYTONA_SERVER_URL, DAYTONA_ENABLED logger = logging.getLogger('app') +LOCAL_MARKER_NAME = ".last_sync" +REMOTE_WORKSPACE_ROOT = "/workspace" +REMOTE_MARKER_PATH = f"{REMOTE_WORKSPACE_ROOT}/{LOCAL_MARKER_NAME}" +REMOTE_BASH_ENV_PATH = "/home/daytona/.bash_env" +EXCLUDED_FILE_NAMES = {".DS_Store", LOCAL_MARKER_NAME} +CHECK_REMOTE_MARKER_CMD = f"test -f {REMOTE_MARKER_PATH} && echo yes || echo no" +UPDATE_REMOTE_MARKER_CMD = f"date +%Y%m%d%H%M.%S > {REMOTE_MARKER_PATH}" +ENSURE_BASH_ENV_CMD = f"test -f {REMOTE_BASH_ENV_PATH} || echo 'cd {REMOTE_WORKSPACE_ROOT}' > {REMOTE_BASH_ENV_PATH}" +WRITE_BASH_ENV_CMD = f"echo 'cd {REMOTE_WORKSPACE_ROOT}' > {REMOTE_BASH_ENV_PATH}" + + +def _local_marker_path(workspace_path: Path) -> Path: + return workspace_path / LOCAL_MARKER_NAME + + +def _touch_local_marker(workspace_path: Path) -> None: + _local_marker_path(workspace_path).touch() + def _list_local_changed_files(workspace_path: Path) -> tuple[bool, list[str]]: """返回是否需要首次同步,以及本地增量变更文件列表。""" - marker_local = workspace_path / ".last_sync" + marker_local = _local_marker_path(workspace_path) if not marker_local.exists(): return True, [] result = subprocess.run( [ - "find", str(workspace_path), "-newer", str(marker_local), "-type", "f", - "-not", "-name", ".last_sync", "-not", "-name", ".DS_Store", + "find", + str(workspace_path), + "-newer", + str(marker_local), + "-type", + "f", + "-not", + "-name", + LOCAL_MARKER_NAME, + "-not", + "-name", + ".DS_Store", ], capture_output=True, text=True, timeout=30, ) - changed_files = [f for f in result.stdout.strip().split('\n') if f] + changed_files = [f for f in result.stdout.strip().split("\n") if f] return False, changed_files -def init_daytona_sandbox(bot_id: str, local_workspace_root: str): +def _tar_workspace_entries(workspace_path: Path, entries: list[Path]) -> bytes: + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for entry in entries: + if entry.is_absolute(): + tar.add(str(entry), arcname=entry.relative_to(workspace_path).as_posix()) + else: + tar.add(str(workspace_path / entry), arcname=entry.as_posix()) + buf.seek(0) + return buf.read() + + +def _workspace_items_for_full_sync(workspace_path: Path) -> list[Path]: + return [item for item in workspace_path.iterdir() if item.name not in EXCLUDED_FILE_NAMES] + + +def _extract_tar_to_path(tar_data: bytes, workspace_path: Path) -> None: + buf = io.BytesIO(tar_data) + with tarfile.open(fileobj=buf, mode="r:gz") as tar: + tar.extractall(path=str(workspace_path), filter="data") + + +def init_daytona_sandbox(bot_id: str, local_workspace_root: str) -> tuple[Any, str | None, str]: """初始化 Daytona sandbox,失败时回退到本地模式。""" sandbox = None sandbox_type = None @@ -54,6 +105,7 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str): sandbox_name = f"bot-{bot_id}" sandbox_instance = None created_new_sandbox = False + try: existing = daytona_client.get(sandbox_name) if existing.state in ("Started", "Creating"): @@ -77,8 +129,8 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str): sandbox_params = CreateSandboxFromSnapshotParams( name=sandbox_name, - volumes=[VolumeMount(volume_id=volume.id, mount_path="/workspace")], - env_vars={"BASH_ENV": "/home/daytona/.bash_env"}, + volumes=[VolumeMount(volume_id=volume.id, mount_path=REMOTE_WORKSPACE_ROOT)], + env_vars={"BASH_ENV": REMOTE_BASH_ENV_PATH}, ) sandbox_instance = daytona_client.create(sandbox_params) created_new_sandbox = True @@ -88,13 +140,13 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str): sandbox = DaytonaSandbox(sandbox=sandbox_instance) sandbox_type = "daytona" - workspace_root = "/workspace" + workspace_root = REMOTE_WORKSPACE_ROOT sync_workspace_to_sandbox(sandbox, local_workspace_root) logger.info(f"daytona sync done, elapsed: {time.time() - start_time:.3f}s") if created_new_sandbox: - sandbox.execute("test -f /home/daytona/.bash_env || echo 'cd /workspace' > /home/daytona/.bash_env") + sandbox.execute(ENSURE_BASH_ENV_CMD) logger.info(f"daytona bash_env done, elapsed: {time.time() - start_time:.3f}s") except Exception as e: logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode") @@ -105,17 +157,8 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str): return sandbox, sandbox_type, workspace_root -def sync_workspace_to_sandbox(sandbox, workspace_root: str) -> None: - """增量同步本地 workspace 到 Daytona sandbox。 - - 基于 .last_sync 时间戳标记: - - 首次(无标记文件):全量同步 - - 后续:只同步比标记更新的文件 - - Args: - sandbox: DaytonaSandbox 实例 - workspace_root: 本地 workspace 目录路径 - """ +def sync_workspace_to_sandbox(sandbox: Any, workspace_root: str) -> None: + """增量同步本地 workspace 到 Daytona sandbox。""" workspace_path = Path(workspace_root) if not workspace_path.exists() or not any(workspace_path.iterdir()): return @@ -126,79 +169,59 @@ def sync_workspace_to_sandbox(sandbox, workspace_root: str) -> None: return if is_first_sync: - check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no") + check = sandbox.execute(CHECK_REMOTE_MARKER_CMD) if "yes" in check.output: logger.info("Local marker missing but sandbox already synced, refreshing local marker") - (workspace_path / ".last_sync").touch() + _touch_local_marker(workspace_path) return logger.info("First sync: uploading all workspace files...") - buf = io.BytesIO() - with tarfile.open(fileobj=buf, mode='w:gz') as tar: - for item in workspace_path.iterdir(): - if item.name in ('.DS_Store', '.last_sync'): - continue - tar.add(str(item), arcname=item.name) - buf.seek(0) - sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace.tar.gz") - sandbox.execute("cd /workspace && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz") - sandbox.execute("echo 'cd /workspace' > /home/daytona/.bash_env") + tar_data = _tar_workspace_entries(workspace_path, _workspace_items_for_full_sync(workspace_path)) + sandbox._sandbox.fs.upload_file(tar_data, "/tmp/workspace.tar.gz") + sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz") + sandbox.execute(WRITE_BASH_ENV_CMD) logger.info("Full sync complete") else: logger.info(f"Incremental sync: {len(changed_files)} changed files") - buf = io.BytesIO() - with tarfile.open(fileobj=buf, mode='w:gz') as tar: - for fpath in changed_files: - arcname = str(Path(fpath).relative_to(workspace_path)) - tar.add(fpath, arcname=arcname) - buf.seek(0) - sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace_inc.tar.gz") - sandbox.execute("cd /workspace && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz") + rel_paths = [Path(fpath).relative_to(workspace_path) for fpath in changed_files] + tar_data = _tar_workspace_entries(workspace_path, rel_paths) + sandbox._sandbox.fs.upload_file(tar_data, "/tmp/workspace_inc.tar.gz") + sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz") logger.info(f"Incremental sync complete: {len(changed_files)} files") - sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync") - (workspace_path / ".last_sync").touch() + sandbox.execute(UPDATE_REMOTE_MARKER_CMD) + _touch_local_marker(workspace_path) -def sync_sandbox_to_local(sandbox, workspace_root: str) -> None: - """Agent 执行完成后,将 sandbox 中的变更文件同步回本地。 - - 基于 /workspace/.last_sync 时间戳,找 sandbox 中更新的文件并下载。 - - Args: - sandbox: DaytonaSandbox 实例 - workspace_root: 本地 workspace 目录路径 - """ +def sync_sandbox_to_local(sandbox: Any, workspace_root: str) -> None: + """Agent 执行完成后,将 sandbox 中的变更文件同步回本地。""" workspace_path = Path(workspace_root) workspace_path.mkdir(parents=True, exist_ok=True) - check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no") + check = sandbox.execute(CHECK_REMOTE_MARKER_CMD) if "no" in check.output: logger.info("No .last_sync in sandbox, skipping reverse sync") return result = sandbox.execute( - "find /workspace -newer /workspace/.last_sync -type f " - "-not -name '.last_sync' -not -name '.DS_Store' " - "-not -path '/workspace/.daytona*' 2>/dev/null" + f"find {REMOTE_WORKSPACE_ROOT} -newer {REMOTE_MARKER_PATH} -type f " + f"-not -name '{LOCAL_MARKER_NAME}' -not -name '.DS_Store' " + f"-not -path '{REMOTE_WORKSPACE_ROOT}/.daytona*' 2>/dev/null" ) - changed_files = [f for f in result.output.strip().split('\n') if f and f != '/workspace'] + changed_files = [f for f in result.output.strip().split("\n") if f and f != REMOTE_WORKSPACE_ROOT] if not changed_files: logger.info("No sandbox file changes to sync back") return logger.info(f"Reverse sync: {len(changed_files)} changed files from sandbox") - rel_files = [f.removeprefix("/workspace/") for f in changed_files] + rel_files = [f.removeprefix(f"{REMOTE_WORKSPACE_ROOT}/") for f in changed_files] file_list = " ".join(f"'{f}'" for f in rel_files) - sandbox.execute(f"cd /workspace && tar -czf /tmp/sync_back.tar.gz {file_list}") + sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -czf /tmp/sync_back.tar.gz {file_list}") tar_data = sandbox._sandbox.fs.download_file("/tmp/sync_back.tar.gz") sandbox.execute("rm -f /tmp/sync_back.tar.gz") + _extract_tar_to_path(tar_data, workspace_path) - buf = io.BytesIO(tar_data) - with tarfile.open(fileobj=buf, mode='r:gz') as tar: - tar.extractall(path=str(workspace_path)) - - sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync") - (workspace_path / ".last_sync").touch() + sandbox.execute(UPDATE_REMOTE_MARKER_CMD) + _touch_local_marker(workspace_path) logger.info(f"Reverse sync complete: {len(changed_files)} files downloaded")