diff --git a/routes/webdav.py b/routes/webdav.py
index b6b1454..6c712e9 100644
--- a/routes/webdav.py
+++ b/routes/webdav.py
@@ -1,9 +1,9 @@
"""
WebDAV 文件管理路由
-为 projects/robot/{bot_id} 目录提供 WebDAV 协议支持
+为 projects/{resource_type}/ 目录提供 WebDAV 协议支持
+支持的资源类型: robot, dataset
"""
-import os
import secrets
import shutil
import mimetypes
@@ -12,7 +12,7 @@ from datetime import datetime, timezone
from xml.etree import ElementTree as ET
from fastapi import APIRouter, Request, HTTPException, Response, Depends
-from fastapi.responses import FileResponse, Response as FastAPIResponse
+from fastapi.responses import FileResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import logging
@@ -53,34 +53,24 @@ for resource_type in ALLOWED_RESOURCE_TYPES:
(PROJECTS_BASE_DIR / resource_type).mkdir(parents=True, exist_ok=True)
-def get_resource_root(resource_type: str, resource_id: str) -> Path:
- """获取资源的根目录,确保安全"""
- # 验证资源类型
+def get_resource_type_root(resource_type: str) -> Path:
+ """获取资源类型的根目录"""
if resource_type not in ALLOWED_RESOURCE_TYPES:
raise HTTPException(
status_code=400,
detail=f"无效的资源类型,仅支持: {', '.join(ALLOWED_RESOURCE_TYPES)}"
)
-
- # 防止路径遍历
- if ".." in resource_id or "/" in resource_id or "\\" in resource_id:
- raise HTTPException(status_code=400, detail="无效的资源 ID")
-
- root = PROJECTS_BASE_DIR / resource_type / resource_id
- root.mkdir(parents=True, exist_ok=True)
- return root
+ return PROJECTS_BASE_DIR / resource_type
-def resolve_safe_path(resource_root: Path, rel_path: str) -> Path:
+def resolve_safe_path(root: Path, rel_path: str) -> Path:
"""安全地解析相对路径,防止目录遍历"""
- # 规范化,去掉开头的 /
rel_path = rel_path.strip("/")
if not rel_path:
- return resource_root
- target = (resource_root / rel_path).resolve()
- # 确保在 resource_root 内
+ return root
+ target = (root / rel_path).resolve()
try:
- target.relative_to(resource_root.resolve())
+ target.relative_to(root.resolve())
except ValueError:
raise HTTPException(status_code=403, detail="访问被拒绝")
return target
@@ -93,7 +83,6 @@ def format_http_date(dt: datetime) -> str:
def get_file_props(file_path: Path, href: str) -> ET.Element:
"""生成单个资源的 DAV:response XML 元素"""
response_el = ET.SubElement(ET.Element("dummy"), f"{DAV_PREFIX}response")
- # href
href_el = ET.SubElement(response_el, f"{DAV_PREFIX}href")
href_el.text = href
@@ -105,25 +94,19 @@ def get_file_props(file_path: Path, href: str) -> ET.Element:
stat = file_path.stat()
mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
- # displayname
dn = ET.SubElement(prop_el, f"{DAV_PREFIX}displayname")
dn.text = file_path.name
- # getlastmodified
lm = ET.SubElement(prop_el, f"{DAV_PREFIX}getlastmodified")
lm.text = format_http_date(mtime)
if file_path.is_dir():
- # resourcetype = collection
rt = ET.SubElement(prop_el, f"{DAV_PREFIX}resourcetype")
ET.SubElement(rt, f"{DAV_PREFIX}collection")
else:
- # resourcetype empty
ET.SubElement(prop_el, f"{DAV_PREFIX}resourcetype")
- # getcontentlength
cl = ET.SubElement(prop_el, f"{DAV_PREFIX}getcontentlength")
cl.text = str(stat.st_size)
- # getcontenttype
ct = ET.SubElement(prop_el, f"{DAV_PREFIX}getcontenttype")
mime, _ = mimetypes.guess_type(str(file_path))
ct.text = mime or "application/octet-stream"
@@ -142,30 +125,15 @@ def build_multistatus(responses: list[ET.Element]) -> str:
)
-# ===== PROPFIND =====
-@router.api_route("/{resource_type}/{resource_id}/{path:path}", methods=["PROPFIND"], dependencies=auth_dep)
-@router.api_route("/{resource_type}/{resource_id}", methods=["PROPFIND"], dependencies=auth_dep)
-async def propfind(resource_type: str, resource_id: str, request: Request, path: str = ""):
- """PROPFIND - 获取资源属性(列出目录)"""
- resource_root = get_resource_root(resource_type, resource_id)
- target = resolve_safe_path(resource_root, path)
-
- if not target.exists():
- raise HTTPException(status_code=404, detail="资源不存在")
-
- depth = request.headers.get("Depth", "1")
- base_href = f"/webdav/{resource_type}/{resource_id}"
-
+def _collect_propfind_responses(root: Path, target: Path, base_href: str, rel: str, depth: str) -> list[ET.Element]:
+ """收集 PROPFIND 响应"""
responses = []
- # 当前资源
- rel = path.strip("/")
href = f"{base_href}/{rel}" if rel else f"{base_href}/"
if not href.endswith("/") and target.is_dir():
href += "/"
responses.append(get_file_props(target, href))
- # 如果是目录且 Depth != 0,列出子项
if target.is_dir() and depth != "0":
try:
for item in sorted(target.iterdir()):
@@ -179,6 +147,26 @@ async def propfind(resource_type: str, resource_id: str, request: Request, path:
except PermissionError:
pass
+ return responses
+
+
+# ===== PROPFIND =====
+@router.api_route("/{resource_type}/{path:path}", methods=["PROPFIND"], dependencies=auth_dep)
+@router.api_route("/{resource_type}", methods=["PROPFIND"], dependencies=auth_dep)
+async def propfind(resource_type: str, request: Request, path: str = ""):
+ """PROPFIND - 获取资源属性(列出目录)"""
+ root = get_resource_type_root(resource_type)
+ target = resolve_safe_path(root, path)
+
+ if not target.exists():
+ raise HTTPException(status_code=404, detail="资源不存在")
+
+ depth = request.headers.get("Depth", "1")
+ base_href = f"/webdav/{resource_type}"
+ rel = path.strip("/")
+
+ responses = _collect_propfind_responses(root, target, base_href, rel, depth)
+
xml_body = build_multistatus(responses)
return Response(
content=xml_body,
@@ -188,9 +176,9 @@ async def propfind(resource_type: str, resource_id: str, request: Request, path:
# ===== OPTIONS =====
-@router.api_route("/{resource_type}/{resource_id}/{path:path}", methods=["OPTIONS"])
-@router.api_route("/{resource_type}/{resource_id}", methods=["OPTIONS"])
-async def options(resource_type: str, resource_id: str, path: str = ""):
+@router.api_route("/{resource_type}/{path:path}", methods=["OPTIONS"])
+@router.api_route("/{resource_type}", methods=["OPTIONS"])
+async def options(resource_type: str, path: str = ""):
"""OPTIONS - 返回支持的 WebDAV 方法"""
return Response(
status_code=200,
@@ -202,25 +190,27 @@ async def options(resource_type: str, resource_id: str, path: str = ""):
# ===== GET =====
-@router.get("/{resource_type}/{resource_id}/{path:path}", dependencies=auth_dep)
-async def get_file(resource_type: str, resource_id: str, path: str):
- """GET - 下载文件"""
- resource_root = get_resource_root(resource_type, resource_id)
- target = resolve_safe_path(resource_root, path)
+@router.get("/{resource_type}/{path:path}", dependencies=auth_dep)
+@router.get("/{resource_type}", dependencies=auth_dep)
+async def get_file(resource_type: str, path: str = ""):
+ """GET - 下载文件或浏览目录"""
+ root = get_resource_type_root(resource_type)
+ target = resolve_safe_path(root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
if target.is_dir():
- # 对目录返回简单的 HTML 列表
items = []
for item in sorted(target.iterdir()):
if item.name.startswith("."):
continue
name = item.name + ("/" if item.is_dir() else "")
- rel_href = f"/webdav/{resource_type}/{resource_id}/{path.strip('/')}/{item.name}" if path.strip("/") else f"/webdav/{resource_type}/{resource_id}/{item.name}"
- items.append(f'
{name}')
- html = f"Index of /{path}
"
+ rel = path.strip("/")
+ item_href = f"/webdav/{resource_type}/{rel}/{item.name}" if rel else f"/webdav/{resource_type}/{item.name}"
+ items.append(f'{name}')
+ display_path = path.strip("/") or resource_type
+ html = f"Index of /{display_path}/
"
return Response(content=html, media_type="text/html")
mime, _ = mimetypes.guess_type(str(target))
@@ -228,11 +218,12 @@ async def get_file(resource_type: str, resource_id: str, path: str):
# ===== HEAD =====
-@router.head("/{resource_type}/{resource_id}/{path:path}", dependencies=auth_dep)
-async def head_file(resource_type: str, resource_id: str, path: str):
+@router.head("/{resource_type}/{path:path}", dependencies=auth_dep)
+@router.head("/{resource_type}", dependencies=auth_dep)
+async def head_file(resource_type: str, path: str = ""):
"""HEAD - 获取文件元数据"""
- resource_root = get_resource_root(resource_type, resource_id)
- target = resolve_safe_path(resource_root, path)
+ root = get_resource_type_root(resource_type)
+ target = resolve_safe_path(root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
@@ -250,13 +241,12 @@ async def head_file(resource_type: str, resource_id: str, path: str):
# ===== PUT =====
-@router.put("/{resource_type}/{resource_id}/{path:path}", dependencies=auth_dep)
-async def put_file(resource_type: str, resource_id: str, path: str, request: Request):
+@router.put("/{resource_type}/{path:path}", dependencies=auth_dep)
+async def put_file(resource_type: str, path: str, request: Request):
"""PUT - 上传/覆盖文件"""
- resource_root = get_resource_root(resource_type, resource_id)
- target = resolve_safe_path(resource_root, path)
+ root = get_resource_type_root(resource_type)
+ target = resolve_safe_path(root, path)
- # 确保父目录存在
target.parent.mkdir(parents=True, exist_ok=True)
is_new = not target.exists()
@@ -265,19 +255,19 @@ async def put_file(resource_type: str, resource_id: str, path: str, request: Req
with open(target, "wb") as f:
f.write(body)
- logger.info(f"WebDAV PUT: {resource_type}/{resource_id}/{path} ({len(body)} bytes)")
+ logger.info(f"WebDAV PUT: {resource_type}/{path} ({len(body)} bytes)")
return Response(status_code=201 if is_new else 204)
# ===== DELETE =====
-@router.delete("/{resource_type}/{resource_id}/{path:path}", dependencies=auth_dep)
-async def delete_resource(resource_type: str, resource_id: str, path: str):
+@router.delete("/{resource_type}/{path:path}", dependencies=auth_dep)
+async def delete_resource(resource_type: str, path: str):
"""DELETE - 删除文件或目录"""
if not path.strip("/"):
raise HTTPException(status_code=403, detail="不能删除资源根目录")
- resource_root = get_resource_root(resource_type, resource_id)
- target = resolve_safe_path(resource_root, path)
+ root = get_resource_type_root(resource_type)
+ target = resolve_safe_path(root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
@@ -287,16 +277,16 @@ async def delete_resource(resource_type: str, resource_id: str, path: str):
else:
shutil.rmtree(target)
- logger.info(f"WebDAV DELETE: {resource_type}/{resource_id}/{path}")
+ logger.info(f"WebDAV DELETE: {resource_type}/{path}")
return Response(status_code=204)
# ===== MKCOL =====
-@router.api_route("/{resource_type}/{resource_id}/{path:path}", methods=["MKCOL"], dependencies=auth_dep)
-async def mkcol(resource_type: str, resource_id: str, path: str):
+@router.api_route("/{resource_type}/{path:path}", methods=["MKCOL"], dependencies=auth_dep)
+async def mkcol(resource_type: str, path: str):
"""MKCOL - 创建目录"""
- resource_root = get_resource_root(resource_type, resource_id)
- target = resolve_safe_path(resource_root, path)
+ root = get_resource_type_root(resource_type)
+ target = resolve_safe_path(root, path)
if target.exists():
raise HTTPException(status_code=405, detail="资源已存在")
@@ -305,16 +295,16 @@ async def mkcol(resource_type: str, resource_id: str, path: str):
raise HTTPException(status_code=409, detail="父目录不存在")
target.mkdir()
- logger.info(f"WebDAV MKCOL: {resource_type}/{resource_id}/{path}")
+ logger.info(f"WebDAV MKCOL: {resource_type}/{path}")
return Response(status_code=201)
# ===== COPY =====
-@router.api_route("/{resource_type}/{resource_id}/{path:path}", methods=["COPY"], dependencies=auth_dep)
-async def copy_resource(resource_type: str, resource_id: str, path: str, request: Request):
+@router.api_route("/{resource_type}/{path:path}", methods=["COPY"], dependencies=auth_dep)
+async def copy_resource(resource_type: str, path: str, request: Request):
"""COPY - 复制文件或目录"""
- resource_root = get_resource_root(resource_type, resource_id)
- source = resolve_safe_path(resource_root, path)
+ root = get_resource_type_root(resource_type)
+ source = resolve_safe_path(root, path)
if not source.exists():
raise HTTPException(status_code=404, detail="源资源不存在")
@@ -323,9 +313,8 @@ async def copy_resource(resource_type: str, resource_id: str, path: str, request
if not destination:
raise HTTPException(status_code=400, detail="缺少 Destination 头")
- # 解析 Destination,提取相对路径
- dest_path = _parse_destination(destination, resource_type, resource_id)
- dest_target = resolve_safe_path(resource_root, dest_path)
+ dest_path = _parse_destination(destination, resource_type)
+ dest_target = resolve_safe_path(root, dest_path)
overwrite = request.headers.get("Overwrite", "T").upper() == "T"
existed = dest_target.exists()
@@ -346,16 +335,16 @@ async def copy_resource(resource_type: str, resource_id: str, path: str, request
else:
shutil.copytree(str(source), str(dest_target))
- logger.info(f"WebDAV COPY: {resource_type}/{resource_id}/{path} -> {dest_path}")
+ logger.info(f"WebDAV COPY: {resource_type}/{path} -> {dest_path}")
return Response(status_code=204 if existed else 201)
# ===== MOVE =====
-@router.api_route("/{resource_type}/{resource_id}/{path:path}", methods=["MOVE"], dependencies=auth_dep)
-async def move_resource(resource_type: str, resource_id: str, path: str, request: Request):
+@router.api_route("/{resource_type}/{path:path}", methods=["MOVE"], dependencies=auth_dep)
+async def move_resource(resource_type: str, path: str, request: Request):
"""MOVE - 移动/重命名文件或目录"""
- resource_root = get_resource_root(resource_type, resource_id)
- source = resolve_safe_path(resource_root, path)
+ root = get_resource_type_root(resource_type)
+ source = resolve_safe_path(root, path)
if not source.exists():
raise HTTPException(status_code=404, detail="源资源不存在")
@@ -364,8 +353,8 @@ async def move_resource(resource_type: str, resource_id: str, path: str, request
if not destination:
raise HTTPException(status_code=400, detail="缺少 Destination 头")
- dest_path = _parse_destination(destination, resource_type, resource_id)
- dest_target = resolve_safe_path(resource_root, dest_path)
+ dest_path = _parse_destination(destination, resource_type)
+ dest_target = resolve_safe_path(root, dest_path)
overwrite = request.headers.get("Overwrite", "T").upper() == "T"
existed = dest_target.exists()
@@ -382,20 +371,19 @@ async def move_resource(resource_type: str, resource_id: str, path: str, request
dest_target.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(dest_target))
- logger.info(f"WebDAV MOVE: {resource_type}/{resource_id}/{path} -> {dest_path}")
+ logger.info(f"WebDAV MOVE: {resource_type}/{path} -> {dest_path}")
return Response(status_code=204 if existed else 201)
-def _parse_destination(destination: str, resource_type: str, resource_id: str) -> str:
+def _parse_destination(destination: str, resource_type: str) -> str:
"""从 Destination 头中提取相对路径"""
- # Destination 格式: http://host/webdav/{resource_type}/{resource_id}/some/path
from urllib.parse import urlparse, unquote
parsed = urlparse(destination)
path = unquote(parsed.path)
- prefix = f"/webdav/{resource_type}/{resource_id}/"
+ prefix = f"/webdav/{resource_type}/"
if path.startswith(prefix):
return path[len(prefix):]
- prefix_no_slash = f"/webdav/{resource_type}/{resource_id}"
+ prefix_no_slash = f"/webdav/{resource_type}"
if path == prefix_no_slash:
return ""
raise HTTPException(status_code=400, detail="无效的 Destination 路径")