qwen_agent/routes/webdav.py
2026-03-09 11:56:17 +08:00

366 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebDAV 文件管理路由
为 projects/robot/{bot_id} 目录提供 WebDAV 协议支持
"""
import os
import shutil
import mimetypes
from pathlib import Path
from datetime import datetime, timezone
from xml.etree import ElementTree as ET
from fastapi import APIRouter, Request, HTTPException, Response
from fastapi.responses import FileResponse, Response as FastAPIResponse
import logging
logger = logging.getLogger('app')
router = APIRouter(prefix="/webdav", tags=["webdav"])
# WebDAV XML 命名空间
DAV_NS = "DAV:"
DAV_PREFIX = "{DAV:}"
ROBOT_BASE_DIR = Path("projects/robot")
def get_bot_root(bot_id: str) -> Path:
"""获取 bot 的根目录,确保安全"""
# 防止路径遍历
if ".." in bot_id or "/" in bot_id or "\\" in bot_id:
raise HTTPException(status_code=400, detail="无效的 bot_id")
root = ROBOT_BASE_DIR / bot_id
root.mkdir(parents=True, exist_ok=True)
return root
def resolve_safe_path(bot_root: Path, rel_path: str) -> Path:
"""安全地解析相对路径,防止目录遍历"""
# 规范化,去掉开头的 /
rel_path = rel_path.strip("/")
if not rel_path:
return bot_root
target = (bot_root / rel_path).resolve()
# 确保在 bot_root 内
try:
target.relative_to(bot_root.resolve())
except ValueError:
raise HTTPException(status_code=403, detail="访问被拒绝")
return target
def format_http_date(dt: datetime) -> str:
return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")
def get_file_props(file_path: Path, href: str) -> ET.Element:
"""生成单个资源的 DAV:response XML 元素"""
response_el = ET.SubElement(ET.Element("dummy"), f"{DAV_PREFIX}response")
# href
href_el = ET.SubElement(response_el, f"{DAV_PREFIX}href")
href_el.text = href
propstat_el = ET.SubElement(response_el, f"{DAV_PREFIX}propstat")
prop_el = ET.SubElement(propstat_el, f"{DAV_PREFIX}prop")
status_el = ET.SubElement(propstat_el, f"{DAV_PREFIX}status")
status_el.text = "HTTP/1.1 200 OK"
stat = file_path.stat()
mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
# displayname
dn = ET.SubElement(prop_el, f"{DAV_PREFIX}displayname")
dn.text = file_path.name
# getlastmodified
lm = ET.SubElement(prop_el, f"{DAV_PREFIX}getlastmodified")
lm.text = format_http_date(mtime)
if file_path.is_dir():
# resourcetype = collection
rt = ET.SubElement(prop_el, f"{DAV_PREFIX}resourcetype")
ET.SubElement(rt, f"{DAV_PREFIX}collection")
else:
# resourcetype empty
ET.SubElement(prop_el, f"{DAV_PREFIX}resourcetype")
# getcontentlength
cl = ET.SubElement(prop_el, f"{DAV_PREFIX}getcontentlength")
cl.text = str(stat.st_size)
# getcontenttype
ct = ET.SubElement(prop_el, f"{DAV_PREFIX}getcontenttype")
mime, _ = mimetypes.guess_type(str(file_path))
ct.text = mime or "application/octet-stream"
return response_el
def build_multistatus(responses: list[ET.Element]) -> str:
"""构建 DAV:multistatus XML"""
ET.register_namespace("D", DAV_NS)
multistatus = ET.Element(f"{DAV_PREFIX}multistatus")
for resp in responses:
multistatus.append(resp)
return '<?xml version="1.0" encoding="utf-8"?>\n' + ET.tostring(
multistatus, encoding="unicode"
)
# ===== PROPFIND =====
@router.api_route("/{bot_id}/{path:path}", methods=["PROPFIND"])
@router.api_route("/{bot_id}", methods=["PROPFIND"])
async def propfind(bot_id: str, request: Request, path: str = ""):
"""PROPFIND - 获取资源属性(列出目录)"""
bot_root = get_bot_root(bot_id)
target = resolve_safe_path(bot_root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
depth = request.headers.get("Depth", "1")
base_href = f"/webdav/{bot_id}"
responses = []
# 当前资源
rel = path.strip("/")
href = f"{base_href}/{rel}" if rel else f"{base_href}/"
if not href.endswith("/") and target.is_dir():
href += "/"
responses.append(get_file_props(target, href))
# 如果是目录且 Depth != 0列出子项
if target.is_dir() and depth != "0":
try:
for item in sorted(target.iterdir()):
if item.name.startswith("."):
continue
item_rel = f"{rel}/{item.name}" if rel else item.name
item_href = f"{base_href}/{item_rel}"
if item.is_dir():
item_href += "/"
responses.append(get_file_props(item, item_href))
except PermissionError:
pass
xml_body = build_multistatus(responses)
return Response(
content=xml_body,
status_code=207,
media_type="application/xml; charset=utf-8",
)
# ===== OPTIONS =====
@router.api_route("/{bot_id}/{path:path}", methods=["OPTIONS"])
@router.api_route("/{bot_id}", methods=["OPTIONS"])
async def options(bot_id: str, path: str = ""):
"""OPTIONS - 返回支持的 WebDAV 方法"""
return Response(
status_code=200,
headers={
"Allow": "OPTIONS, GET, HEAD, PUT, DELETE, MKCOL, COPY, MOVE, PROPFIND",
"DAV": "1, 2",
},
)
# ===== GET =====
@router.get("/{bot_id}/{path:path}")
async def get_file(bot_id: str, path: str):
"""GET - 下载文件"""
bot_root = get_bot_root(bot_id)
target = resolve_safe_path(bot_root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
if target.is_dir():
# 对目录返回简单的 HTML 列表
items = []
for item in sorted(target.iterdir()):
if item.name.startswith("."):
continue
name = item.name + ("/" if item.is_dir() else "")
rel_href = f"/webdav/{bot_id}/{path.strip('/')}/{item.name}" if path.strip("/") else f"/webdav/{bot_id}/{item.name}"
items.append(f'<li><a href="{rel_href}">{name}</a></li>')
html = f"<html><body><h1>Index of /{path}</h1><ul>{''.join(items)}</ul></body></html>"
return Response(content=html, media_type="text/html")
mime, _ = mimetypes.guess_type(str(target))
return FileResponse(path=str(target), media_type=mime or "application/octet-stream")
# ===== HEAD =====
@router.head("/{bot_id}/{path:path}")
async def head_file(bot_id: str, path: str):
"""HEAD - 获取文件元数据"""
bot_root = get_bot_root(bot_id)
target = resolve_safe_path(bot_root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
headers = {}
if target.is_file():
stat = target.stat()
mime, _ = mimetypes.guess_type(str(target))
headers["Content-Type"] = mime or "application/octet-stream"
headers["Content-Length"] = str(stat.st_size)
mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
headers["Last-Modified"] = format_http_date(mtime)
return Response(status_code=200, headers=headers)
# ===== PUT =====
@router.put("/{bot_id}/{path:path}")
async def put_file(bot_id: str, path: str, request: Request):
"""PUT - 上传/覆盖文件"""
bot_root = get_bot_root(bot_id)
target = resolve_safe_path(bot_root, path)
# 确保父目录存在
target.parent.mkdir(parents=True, exist_ok=True)
is_new = not target.exists()
body = await request.body()
with open(target, "wb") as f:
f.write(body)
logger.info(f"WebDAV PUT: {bot_id}/{path} ({len(body)} bytes)")
return Response(status_code=201 if is_new else 204)
# ===== DELETE =====
@router.delete("/{bot_id}/{path:path}")
async def delete_resource(bot_id: str, path: str):
"""DELETE - 删除文件或目录"""
if not path.strip("/"):
raise HTTPException(status_code=403, detail="不能删除 bot 根目录")
bot_root = get_bot_root(bot_id)
target = resolve_safe_path(bot_root, path)
if not target.exists():
raise HTTPException(status_code=404, detail="资源不存在")
if target.is_file():
target.unlink()
else:
shutil.rmtree(target)
logger.info(f"WebDAV DELETE: {bot_id}/{path}")
return Response(status_code=204)
# ===== MKCOL =====
@router.api_route("/{bot_id}/{path:path}", methods=["MKCOL"])
async def mkcol(bot_id: str, path: str):
"""MKCOL - 创建目录"""
bot_root = get_bot_root(bot_id)
target = resolve_safe_path(bot_root, path)
if target.exists():
raise HTTPException(status_code=405, detail="资源已存在")
if not target.parent.exists():
raise HTTPException(status_code=409, detail="父目录不存在")
target.mkdir()
logger.info(f"WebDAV MKCOL: {bot_id}/{path}")
return Response(status_code=201)
# ===== COPY =====
@router.api_route("/{bot_id}/{path:path}", methods=["COPY"])
async def copy_resource(bot_id: str, path: str, request: Request):
"""COPY - 复制文件或目录"""
bot_root = get_bot_root(bot_id)
source = resolve_safe_path(bot_root, path)
if not source.exists():
raise HTTPException(status_code=404, detail="源资源不存在")
destination = request.headers.get("Destination")
if not destination:
raise HTTPException(status_code=400, detail="缺少 Destination 头")
# 解析 Destination提取相对路径
dest_path = _parse_destination(destination, bot_id)
dest_target = resolve_safe_path(bot_root, dest_path)
overwrite = request.headers.get("Overwrite", "T").upper() == "T"
existed = dest_target.exists()
if existed and not overwrite:
raise HTTPException(status_code=412, detail="目标已存在且不允许覆盖")
if existed:
if dest_target.is_file():
dest_target.unlink()
else:
shutil.rmtree(dest_target)
dest_target.parent.mkdir(parents=True, exist_ok=True)
if source.is_file():
shutil.copy2(str(source), str(dest_target))
else:
shutil.copytree(str(source), str(dest_target))
logger.info(f"WebDAV COPY: {bot_id}/{path} -> {dest_path}")
return Response(status_code=204 if existed else 201)
# ===== MOVE =====
@router.api_route("/{bot_id}/{path:path}", methods=["MOVE"])
async def move_resource(bot_id: str, path: str, request: Request):
"""MOVE - 移动/重命名文件或目录"""
bot_root = get_bot_root(bot_id)
source = resolve_safe_path(bot_root, path)
if not source.exists():
raise HTTPException(status_code=404, detail="源资源不存在")
destination = request.headers.get("Destination")
if not destination:
raise HTTPException(status_code=400, detail="缺少 Destination 头")
dest_path = _parse_destination(destination, bot_id)
dest_target = resolve_safe_path(bot_root, dest_path)
overwrite = request.headers.get("Overwrite", "T").upper() == "T"
existed = dest_target.exists()
if existed and not overwrite:
raise HTTPException(status_code=412, detail="目标已存在且不允许覆盖")
if existed:
if dest_target.is_file():
dest_target.unlink()
else:
shutil.rmtree(dest_target)
dest_target.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(dest_target))
logger.info(f"WebDAV MOVE: {bot_id}/{path} -> {dest_path}")
return Response(status_code=204 if existed else 201)
def _parse_destination(destination: str, bot_id: str) -> str:
"""从 Destination 头中提取相对路径"""
# Destination 格式: http://host/webdav/{bot_id}/some/path
from urllib.parse import urlparse, unquote
parsed = urlparse(destination)
path = unquote(parsed.path)
prefix = f"/webdav/{bot_id}/"
if path.startswith(prefix):
return path[len(prefix):]
prefix_no_slash = f"/webdav/{bot_id}"
if path == prefix_no_slash:
return ""
raise HTTPException(status_code=400, detail="无效的 Destination 路径")