""" WebDAV 文件管理路由 为 projects/{resource_type}/ 目录提供 WebDAV 协议支持 支持的资源类型: robot, dataset """ import secrets import shutil import mimetypes from pathlib import Path from datetime import datetime, timezone from xml.etree import ElementTree as ET from fastapi import APIRouter, Request, HTTPException, Response, Depends from fastapi.responses import FileResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials import logging from utils.settings import WEBDAV_USERNAME, WEBDAV_PASSWORD logger = logging.getLogger('app') security = HTTPBasic(realm="WebDAV") def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)): """验证 WebDAV Basic Auth 账号密码""" username_correct = secrets.compare_digest(credentials.username, WEBDAV_USERNAME) password_correct = secrets.compare_digest(credentials.password, WEBDAV_PASSWORD) if not (username_correct and password_correct): raise HTTPException( status_code=401, detail="认证失败", headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}, ) return credentials # 不在 router 级别加认证,OPTIONS 需要免认证(macOS Finder 等客户端预检不带凭据) router = APIRouter(prefix="/webdav", tags=["webdav"]) # 需要认证的依赖,用于非 OPTIONS 的端点 auth_dep = [Depends(verify_credentials)] # WebDAV XML 命名空间 DAV_NS = "DAV:" DAV_PREFIX = "{DAV:}" PROJECTS_BASE_DIR = Path("projects") ALLOWED_RESOURCE_TYPES = {"robot", "dataset"} # 确保基础目录存在 for resource_type in ALLOWED_RESOURCE_TYPES: (PROJECTS_BASE_DIR / resource_type).mkdir(parents=True, exist_ok=True) def get_resource_type_root(resource_type: str) -> Path: """获取资源类型的根目录""" if resource_type not in ALLOWED_RESOURCE_TYPES: raise HTTPException( status_code=400, detail=f"无效的资源类型,仅支持: {', '.join(ALLOWED_RESOURCE_TYPES)}" ) return PROJECTS_BASE_DIR / resource_type def resolve_safe_path(root: Path, rel_path: str) -> Path: """安全地解析相对路径,防止目录遍历""" rel_path = rel_path.strip("/") if not rel_path: return root target = (root / rel_path).resolve() try: target.relative_to(root.resolve()) except ValueError: raise HTTPException(status_code=403, detail="访问被拒绝") return target def format_http_date(dt: datetime) -> str: return dt.strftime("%a, %d %b %Y %H:%M:%S GMT") def get_file_props(file_path: Path, href: str) -> ET.Element: """生成单个资源的 DAV:response XML 元素""" response_el = ET.SubElement(ET.Element("dummy"), f"{DAV_PREFIX}response") href_el = ET.SubElement(response_el, f"{DAV_PREFIX}href") href_el.text = href propstat_el = ET.SubElement(response_el, f"{DAV_PREFIX}propstat") prop_el = ET.SubElement(propstat_el, f"{DAV_PREFIX}prop") status_el = ET.SubElement(propstat_el, f"{DAV_PREFIX}status") status_el.text = "HTTP/1.1 200 OK" stat = file_path.stat() mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) dn = ET.SubElement(prop_el, f"{DAV_PREFIX}displayname") dn.text = file_path.name lm = ET.SubElement(prop_el, f"{DAV_PREFIX}getlastmodified") lm.text = format_http_date(mtime) if file_path.is_dir(): rt = ET.SubElement(prop_el, f"{DAV_PREFIX}resourcetype") ET.SubElement(rt, f"{DAV_PREFIX}collection") else: ET.SubElement(prop_el, f"{DAV_PREFIX}resourcetype") cl = ET.SubElement(prop_el, f"{DAV_PREFIX}getcontentlength") cl.text = str(stat.st_size) ct = ET.SubElement(prop_el, f"{DAV_PREFIX}getcontenttype") mime, _ = mimetypes.guess_type(str(file_path)) ct.text = mime or "application/octet-stream" return response_el def build_multistatus(responses: list[ET.Element]) -> str: """构建 DAV:multistatus XML""" ET.register_namespace("D", DAV_NS) multistatus = ET.Element(f"{DAV_PREFIX}multistatus") for resp in responses: multistatus.append(resp) return '\n' + ET.tostring( multistatus, encoding="unicode" ) def _collect_propfind_responses(root: Path, target: Path, base_href: str, rel: str, depth: str) -> list[ET.Element]: """收集 PROPFIND 响应""" responses = [] href = f"{base_href}/{rel}" if rel else f"{base_href}/" if not href.endswith("/") and target.is_dir(): href += "/" responses.append(get_file_props(target, href)) if target.is_dir() and depth != "0": try: for item in sorted(target.iterdir()): if item.name.startswith("."): continue item_rel = f"{rel}/{item.name}" if rel else item.name item_href = f"{base_href}/{item_rel}" if item.is_dir(): item_href += "/" responses.append(get_file_props(item, item_href)) except PermissionError: pass return responses # ===== PROPFIND ===== @router.api_route("/{resource_type}/{path:path}", methods=["PROPFIND"], dependencies=auth_dep) @router.api_route("/{resource_type}", methods=["PROPFIND"], dependencies=auth_dep) async def propfind(resource_type: str, request: Request, path: str = ""): """PROPFIND - 获取资源属性(列出目录)""" root = get_resource_type_root(resource_type) target = resolve_safe_path(root, path) if not target.exists(): raise HTTPException(status_code=404, detail="资源不存在") depth = request.headers.get("Depth", "1") base_href = f"/webdav/{resource_type}" rel = path.strip("/") responses = _collect_propfind_responses(root, target, base_href, rel, depth) xml_body = build_multistatus(responses) return Response( content=xml_body, status_code=207, media_type="application/xml; charset=utf-8", ) # ===== OPTIONS ===== @router.api_route("/{resource_type}/{path:path}", methods=["OPTIONS"]) @router.api_route("/{resource_type}", methods=["OPTIONS"]) async def options(resource_type: str, path: str = ""): """OPTIONS - 返回支持的 WebDAV 方法""" return Response( status_code=200, headers={ "Allow": "OPTIONS, GET, HEAD, PUT, DELETE, MKCOL, COPY, MOVE, PROPFIND", "DAV": "1, 2", }, ) # ===== GET ===== @router.get("/{resource_type}/{path:path}", dependencies=auth_dep) @router.get("/{resource_type}", dependencies=auth_dep) async def get_file(resource_type: str, path: str = ""): """GET - 下载文件或浏览目录""" root = get_resource_type_root(resource_type) target = resolve_safe_path(root, path) if not target.exists(): raise HTTPException(status_code=404, detail="资源不存在") if target.is_dir(): items = [] for item in sorted(target.iterdir()): if item.name.startswith("."): continue name = item.name + ("/" if item.is_dir() else "") rel = path.strip("/") item_href = f"/webdav/{resource_type}/{rel}/{item.name}" if rel else f"/webdav/{resource_type}/{item.name}" items.append(f'