update utils/daytona_sync.py

This commit is contained in:
朱潮 2026-04-23 16:38:44 +08:00
parent 8446dab1e4
commit ac32ef8d0b

View File

@ -6,32 +6,83 @@ import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any
from utils.settings import DAYTONA_API_KEY, DAYTONA_SERVER_URL, DAYTONA_ENABLED
logger = logging.getLogger('app')
LOCAL_MARKER_NAME = ".last_sync"
REMOTE_WORKSPACE_ROOT = "/workspace"
REMOTE_MARKER_PATH = f"{REMOTE_WORKSPACE_ROOT}/{LOCAL_MARKER_NAME}"
REMOTE_BASH_ENV_PATH = "/home/daytona/.bash_env"
EXCLUDED_FILE_NAMES = {".DS_Store", LOCAL_MARKER_NAME}
CHECK_REMOTE_MARKER_CMD = f"test -f {REMOTE_MARKER_PATH} && echo yes || echo no"
UPDATE_REMOTE_MARKER_CMD = f"date +%Y%m%d%H%M.%S > {REMOTE_MARKER_PATH}"
ENSURE_BASH_ENV_CMD = f"test -f {REMOTE_BASH_ENV_PATH} || echo 'cd {REMOTE_WORKSPACE_ROOT}' > {REMOTE_BASH_ENV_PATH}"
WRITE_BASH_ENV_CMD = f"echo 'cd {REMOTE_WORKSPACE_ROOT}' > {REMOTE_BASH_ENV_PATH}"
def _local_marker_path(workspace_path: Path) -> Path:
return workspace_path / LOCAL_MARKER_NAME
def _touch_local_marker(workspace_path: Path) -> None:
_local_marker_path(workspace_path).touch()
def _list_local_changed_files(workspace_path: Path) -> tuple[bool, list[str]]:
"""返回是否需要首次同步,以及本地增量变更文件列表。"""
marker_local = workspace_path / ".last_sync"
marker_local = _local_marker_path(workspace_path)
if not marker_local.exists():
return True, []
result = subprocess.run(
[
"find", str(workspace_path), "-newer", str(marker_local), "-type", "f",
"-not", "-name", ".last_sync", "-not", "-name", ".DS_Store",
"find",
str(workspace_path),
"-newer",
str(marker_local),
"-type",
"f",
"-not",
"-name",
LOCAL_MARKER_NAME,
"-not",
"-name",
".DS_Store",
],
capture_output=True,
text=True,
timeout=30,
)
changed_files = [f for f in result.stdout.strip().split('\n') if f]
changed_files = [f for f in result.stdout.strip().split("\n") if f]
return False, changed_files
def init_daytona_sandbox(bot_id: str, local_workspace_root: str):
def _tar_workspace_entries(workspace_path: Path, entries: list[Path]) -> bytes:
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for entry in entries:
if entry.is_absolute():
tar.add(str(entry), arcname=entry.relative_to(workspace_path).as_posix())
else:
tar.add(str(workspace_path / entry), arcname=entry.as_posix())
buf.seek(0)
return buf.read()
def _workspace_items_for_full_sync(workspace_path: Path) -> list[Path]:
return [item for item in workspace_path.iterdir() if item.name not in EXCLUDED_FILE_NAMES]
def _extract_tar_to_path(tar_data: bytes, workspace_path: Path) -> None:
buf = io.BytesIO(tar_data)
with tarfile.open(fileobj=buf, mode="r:gz") as tar:
tar.extractall(path=str(workspace_path), filter="data")
def init_daytona_sandbox(bot_id: str, local_workspace_root: str) -> tuple[Any, str | None, str]:
"""初始化 Daytona sandbox失败时回退到本地模式。"""
sandbox = None
sandbox_type = None
@ -54,6 +105,7 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str):
sandbox_name = f"bot-{bot_id}"
sandbox_instance = None
created_new_sandbox = False
try:
existing = daytona_client.get(sandbox_name)
if existing.state in ("Started", "Creating"):
@ -77,8 +129,8 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str):
sandbox_params = CreateSandboxFromSnapshotParams(
name=sandbox_name,
volumes=[VolumeMount(volume_id=volume.id, mount_path="/workspace")],
env_vars={"BASH_ENV": "/home/daytona/.bash_env"},
volumes=[VolumeMount(volume_id=volume.id, mount_path=REMOTE_WORKSPACE_ROOT)],
env_vars={"BASH_ENV": REMOTE_BASH_ENV_PATH},
)
sandbox_instance = daytona_client.create(sandbox_params)
created_new_sandbox = True
@ -88,13 +140,13 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str):
sandbox = DaytonaSandbox(sandbox=sandbox_instance)
sandbox_type = "daytona"
workspace_root = "/workspace"
workspace_root = REMOTE_WORKSPACE_ROOT
sync_workspace_to_sandbox(sandbox, local_workspace_root)
logger.info(f"daytona sync done, elapsed: {time.time() - start_time:.3f}s")
if created_new_sandbox:
sandbox.execute("test -f /home/daytona/.bash_env || echo 'cd /workspace' > /home/daytona/.bash_env")
sandbox.execute(ENSURE_BASH_ENV_CMD)
logger.info(f"daytona bash_env done, elapsed: {time.time() - start_time:.3f}s")
except Exception as e:
logger.error(f"Failed to create Daytona sandbox: {e}, falling back to local mode")
@ -105,17 +157,8 @@ def init_daytona_sandbox(bot_id: str, local_workspace_root: str):
return sandbox, sandbox_type, workspace_root
def sync_workspace_to_sandbox(sandbox, workspace_root: str) -> None:
"""增量同步本地 workspace 到 Daytona sandbox。
基于 .last_sync 时间戳标记
- 首次无标记文件全量同步
- 后续只同步比标记更新的文件
Args:
sandbox: DaytonaSandbox 实例
workspace_root: 本地 workspace 目录路径
"""
def sync_workspace_to_sandbox(sandbox: Any, workspace_root: str) -> None:
"""增量同步本地 workspace 到 Daytona sandbox。"""
workspace_path = Path(workspace_root)
if not workspace_path.exists() or not any(workspace_path.iterdir()):
return
@ -126,79 +169,59 @@ def sync_workspace_to_sandbox(sandbox, workspace_root: str) -> None:
return
if is_first_sync:
check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no")
check = sandbox.execute(CHECK_REMOTE_MARKER_CMD)
if "yes" in check.output:
logger.info("Local marker missing but sandbox already synced, refreshing local marker")
(workspace_path / ".last_sync").touch()
_touch_local_marker(workspace_path)
return
logger.info("First sync: uploading all workspace files...")
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode='w:gz') as tar:
for item in workspace_path.iterdir():
if item.name in ('.DS_Store', '.last_sync'):
continue
tar.add(str(item), arcname=item.name)
buf.seek(0)
sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace.tar.gz")
sandbox.execute("cd /workspace && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz")
sandbox.execute("echo 'cd /workspace' > /home/daytona/.bash_env")
tar_data = _tar_workspace_entries(workspace_path, _workspace_items_for_full_sync(workspace_path))
sandbox._sandbox.fs.upload_file(tar_data, "/tmp/workspace.tar.gz")
sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -xzf /tmp/workspace.tar.gz && rm /tmp/workspace.tar.gz")
sandbox.execute(WRITE_BASH_ENV_CMD)
logger.info("Full sync complete")
else:
logger.info(f"Incremental sync: {len(changed_files)} changed files")
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode='w:gz') as tar:
for fpath in changed_files:
arcname = str(Path(fpath).relative_to(workspace_path))
tar.add(fpath, arcname=arcname)
buf.seek(0)
sandbox._sandbox.fs.upload_file(buf.read(), "/tmp/workspace_inc.tar.gz")
sandbox.execute("cd /workspace && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz")
rel_paths = [Path(fpath).relative_to(workspace_path) for fpath in changed_files]
tar_data = _tar_workspace_entries(workspace_path, rel_paths)
sandbox._sandbox.fs.upload_file(tar_data, "/tmp/workspace_inc.tar.gz")
sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -xzf /tmp/workspace_inc.tar.gz && rm /tmp/workspace_inc.tar.gz")
logger.info(f"Incremental sync complete: {len(changed_files)} files")
sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync")
(workspace_path / ".last_sync").touch()
sandbox.execute(UPDATE_REMOTE_MARKER_CMD)
_touch_local_marker(workspace_path)
def sync_sandbox_to_local(sandbox, workspace_root: str) -> None:
"""Agent 执行完成后,将 sandbox 中的变更文件同步回本地。
基于 /workspace/.last_sync 时间戳 sandbox 中更新的文件并下载
Args:
sandbox: DaytonaSandbox 实例
workspace_root: 本地 workspace 目录路径
"""
def sync_sandbox_to_local(sandbox: Any, workspace_root: str) -> None:
"""Agent 执行完成后,将 sandbox 中的变更文件同步回本地。"""
workspace_path = Path(workspace_root)
workspace_path.mkdir(parents=True, exist_ok=True)
check = sandbox.execute("test -f /workspace/.last_sync && echo yes || echo no")
check = sandbox.execute(CHECK_REMOTE_MARKER_CMD)
if "no" in check.output:
logger.info("No .last_sync in sandbox, skipping reverse sync")
return
result = sandbox.execute(
"find /workspace -newer /workspace/.last_sync -type f "
"-not -name '.last_sync' -not -name '.DS_Store' "
"-not -path '/workspace/.daytona*' 2>/dev/null"
f"find {REMOTE_WORKSPACE_ROOT} -newer {REMOTE_MARKER_PATH} -type f "
f"-not -name '{LOCAL_MARKER_NAME}' -not -name '.DS_Store' "
f"-not -path '{REMOTE_WORKSPACE_ROOT}/.daytona*' 2>/dev/null"
)
changed_files = [f for f in result.output.strip().split('\n') if f and f != '/workspace']
changed_files = [f for f in result.output.strip().split("\n") if f and f != REMOTE_WORKSPACE_ROOT]
if not changed_files:
logger.info("No sandbox file changes to sync back")
return
logger.info(f"Reverse sync: {len(changed_files)} changed files from sandbox")
rel_files = [f.removeprefix("/workspace/") for f in changed_files]
rel_files = [f.removeprefix(f"{REMOTE_WORKSPACE_ROOT}/") for f in changed_files]
file_list = " ".join(f"'{f}'" for f in rel_files)
sandbox.execute(f"cd /workspace && tar -czf /tmp/sync_back.tar.gz {file_list}")
sandbox.execute(f"cd {REMOTE_WORKSPACE_ROOT} && tar -czf /tmp/sync_back.tar.gz {file_list}")
tar_data = sandbox._sandbox.fs.download_file("/tmp/sync_back.tar.gz")
sandbox.execute("rm -f /tmp/sync_back.tar.gz")
_extract_tar_to_path(tar_data, workspace_path)
buf = io.BytesIO(tar_data)
with tarfile.open(fileobj=buf, mode='r:gz') as tar:
tar.extractall(path=str(workspace_path))
sandbox.execute("date +%Y%m%d%H%M.%S > /workspace/.last_sync")
(workspace_path / ".last_sync").touch()
sandbox.execute(UPDATE_REMOTE_MARKER_CMD)
_touch_local_marker(workspace_path)
logger.info(f"Reverse sync complete: {len(changed_files)} files downloaded")