694 lines
22 KiB
Python
694 lines
22 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
文件管理API - WebDAV的HTTP API替代方案
|
||
提供RESTful接口来管理projects文件夹
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
from pathlib import Path
|
||
from typing import List, Optional, Dict, Any
|
||
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query
|
||
from fastapi.responses import FileResponse, StreamingResponse
|
||
import mimetypes
|
||
import json
|
||
import zipfile
|
||
import logging
|
||
|
||
logger = logging.getLogger('app')
|
||
import tempfile
|
||
import io
|
||
import math
|
||
|
||
router = APIRouter(prefix="/api/v1/file-manager", tags=["file_management"])
|
||
|
||
PROJECTS_DIR = Path("projects")
|
||
PROJECTS_DIR.mkdir(exist_ok=True)
|
||
|
||
|
||
@router.get("/list")
|
||
async def list_files(path: str = "", recursive: bool = False):
|
||
"""
|
||
列出目录内容
|
||
|
||
Args:
|
||
path: 相对路径,空字符串表示根目录
|
||
recursive: 是否递归列出所有子目录
|
||
"""
|
||
try:
|
||
target_path = PROJECTS_DIR / path
|
||
|
||
if not target_path.exists():
|
||
raise HTTPException(status_code=404, detail="路径不存在")
|
||
|
||
if not target_path.is_dir():
|
||
raise HTTPException(status_code=400, detail="路径不是目录")
|
||
|
||
def scan_directory(directory: Path, base_path: Path = PROJECTS_DIR) -> List[Dict[str, Any]]:
|
||
items = []
|
||
try:
|
||
for item in directory.iterdir():
|
||
# 跳过隐藏文件
|
||
if item.name.startswith('.'):
|
||
continue
|
||
|
||
relative_path = item.relative_to(base_path)
|
||
stat = item.stat()
|
||
|
||
item_info = {
|
||
"name": item.name,
|
||
"path": str(relative_path),
|
||
"type": "directory" if item.is_dir() else "file",
|
||
"size": stat.st_size if item.is_file() else 0,
|
||
"modified": stat.st_mtime,
|
||
"created": stat.st_ctime
|
||
}
|
||
|
||
items.append(item_info)
|
||
|
||
# 递归扫描子目录
|
||
if recursive and item.is_dir():
|
||
items.extend(scan_directory(item, base_path))
|
||
|
||
except PermissionError:
|
||
pass
|
||
return items
|
||
|
||
items = scan_directory(target_path)
|
||
|
||
return {
|
||
"success": True,
|
||
"path": path,
|
||
"items": items,
|
||
"total": len(items)
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"列出目录失败: {str(e)}")
|
||
|
||
|
||
@router.post("/upload")
|
||
async def upload_file(file: UploadFile = File(...), path: str = Form("")):
|
||
"""
|
||
上传文件
|
||
|
||
Args:
|
||
file: 上传的文件
|
||
path: 目标路径(相对于projects目录)
|
||
"""
|
||
try:
|
||
target_dir = PROJECTS_DIR / path
|
||
target_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
file_path = target_dir / file.filename
|
||
|
||
# 如果文件已存在,检查是否覆盖
|
||
if file_path.exists():
|
||
# 可以添加版本控制或重命名逻辑
|
||
pass
|
||
|
||
with open(file_path, "wb") as buffer:
|
||
content = await file.read()
|
||
buffer.write(content)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "文件上传成功",
|
||
"filename": file.filename,
|
||
"path": str(Path(path) / file.filename),
|
||
"size": len(content)
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}")
|
||
|
||
|
||
@router.get("/download/{file_path:path}")
|
||
async def download_file(file_path: str):
|
||
"""
|
||
下载文件
|
||
|
||
Args:
|
||
file_path: 文件相对路径
|
||
"""
|
||
try:
|
||
target_path = PROJECTS_DIR / file_path
|
||
|
||
if not target_path.exists():
|
||
raise HTTPException(status_code=404, detail="文件不存在")
|
||
|
||
if not target_path.is_file():
|
||
raise HTTPException(status_code=400, detail="不是文件")
|
||
|
||
# 猜测MIME类型
|
||
mime_type, _ = mimetypes.guess_type(str(target_path))
|
||
if mime_type is None:
|
||
mime_type = "application/octet-stream"
|
||
|
||
return FileResponse(
|
||
path=str(target_path),
|
||
filename=target_path.name,
|
||
media_type=mime_type
|
||
)
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"文件下载失败: {str(e)}")
|
||
|
||
|
||
@router.delete("/delete")
|
||
async def delete_item(path: str):
|
||
"""
|
||
删除文件或目录
|
||
|
||
Args:
|
||
path: 要删除的路径
|
||
"""
|
||
try:
|
||
target_path = PROJECTS_DIR / path
|
||
|
||
if not target_path.exists():
|
||
raise HTTPException(status_code=404, detail="路径不存在")
|
||
|
||
if target_path.is_file():
|
||
target_path.unlink()
|
||
elif target_path.is_dir():
|
||
shutil.rmtree(target_path)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"{'文件' if target_path.is_file() else '目录'}删除成功",
|
||
"path": path
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
|
||
|
||
|
||
@router.post("/create-folder")
|
||
async def create_folder(path: str, name: str):
|
||
"""
|
||
创建文件夹
|
||
|
||
Args:
|
||
path: 父目录路径
|
||
name: 新文件夹名称
|
||
"""
|
||
try:
|
||
parent_path = PROJECTS_DIR / path
|
||
parent_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
new_folder = parent_path / name
|
||
|
||
if new_folder.exists():
|
||
raise HTTPException(status_code=400, detail="文件夹已存在")
|
||
|
||
new_folder.mkdir()
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "文件夹创建成功",
|
||
"path": str(Path(path) / name)
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"创建文件夹失败: {str(e)}")
|
||
|
||
|
||
@router.post("/rename")
|
||
async def rename_item(old_path: str, new_name: str):
|
||
"""
|
||
重命名文件或文件夹
|
||
|
||
Args:
|
||
old_path: 原路径
|
||
new_name: 新名称
|
||
"""
|
||
try:
|
||
old_full_path = PROJECTS_DIR / old_path
|
||
|
||
if not old_full_path.exists():
|
||
raise HTTPException(status_code=404, detail="文件或目录不存在")
|
||
|
||
new_full_path = old_full_path.parent / new_name
|
||
|
||
if new_full_path.exists():
|
||
raise HTTPException(status_code=400, detail="目标名称已存在")
|
||
|
||
old_full_path.rename(new_full_path)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "重命名成功",
|
||
"old_path": old_path,
|
||
"new_path": str(new_full_path.relative_to(PROJECTS_DIR))
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"重命名失败: {str(e)}")
|
||
|
||
|
||
@router.post("/move")
|
||
async def move_item(source_path: str, target_path: str):
|
||
"""
|
||
移动文件或文件夹
|
||
|
||
Args:
|
||
source_path: 源路径
|
||
target_path: 目标路径
|
||
"""
|
||
try:
|
||
source_full_path = PROJECTS_DIR / source_path
|
||
target_full_path = PROJECTS_DIR / target_path
|
||
|
||
if not source_full_path.exists():
|
||
raise HTTPException(status_code=404, detail="源文件或目录不存在")
|
||
|
||
# 确保目标目录存在
|
||
target_full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
shutil.move(str(source_full_path), str(target_full_path))
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "移动成功",
|
||
"source_path": source_path,
|
||
"target_path": target_path
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"移动失败: {str(e)}")
|
||
|
||
|
||
@router.post("/copy")
|
||
async def copy_item(source_path: str, target_path: str):
|
||
"""
|
||
复制文件或文件夹
|
||
|
||
Args:
|
||
source_path: 源路径
|
||
target_path: 目标路径
|
||
"""
|
||
try:
|
||
source_full_path = PROJECTS_DIR / source_path
|
||
target_full_path = PROJECTS_DIR / target_path
|
||
|
||
if not source_full_path.exists():
|
||
raise HTTPException(status_code=404, detail="源文件或目录不存在")
|
||
|
||
# 确保目标目录存在
|
||
target_full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
if source_full_path.is_file():
|
||
shutil.copy2(str(source_full_path), str(target_full_path))
|
||
else:
|
||
shutil.copytree(str(source_full_path), str(target_full_path))
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "复制成功",
|
||
"source_path": source_path,
|
||
"target_path": target_path
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"复制失败: {str(e)}")
|
||
|
||
|
||
@router.get("/search")
|
||
async def search_files(query: str, path: str = "", file_type: Optional[str] = None):
|
||
"""
|
||
搜索文件
|
||
|
||
Args:
|
||
query: 搜索关键词
|
||
path: 搜索路径
|
||
file_type: 文件类型过滤
|
||
"""
|
||
try:
|
||
search_path = PROJECTS_DIR / path
|
||
|
||
if not search_path.exists():
|
||
raise HTTPException(status_code=404, detail="搜索路径不存在")
|
||
|
||
results = []
|
||
|
||
def scan_for_files(directory: Path):
|
||
try:
|
||
for item in directory.iterdir():
|
||
if item.name.startswith('.'):
|
||
continue
|
||
|
||
# 检查文件名是否包含关键词
|
||
if query.lower() in item.name.lower():
|
||
# 检查文件类型过滤
|
||
if file_type:
|
||
if item.suffix.lower() == file_type.lower():
|
||
results.append({
|
||
"name": item.name,
|
||
"path": str(item.relative_to(PROJECTS_DIR)),
|
||
"type": "directory" if item.is_dir() else "file"
|
||
})
|
||
else:
|
||
results.append({
|
||
"name": item.name,
|
||
"path": str(item.relative_to(PROJECTS_DIR)),
|
||
"type": "directory" if item.is_dir() else "file"
|
||
})
|
||
|
||
# 递归搜索子目录
|
||
if item.is_dir():
|
||
scan_for_files(item)
|
||
|
||
except PermissionError:
|
||
pass
|
||
|
||
scan_for_files(search_path)
|
||
|
||
return {
|
||
"success": True,
|
||
"query": query,
|
||
"path": path,
|
||
"results": results,
|
||
"total": len(results)
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"搜索失败: {str(e)}")
|
||
|
||
|
||
@router.get("/info/{file_path:path}")
|
||
async def get_file_info(file_path: str):
|
||
"""
|
||
获取文件或文件夹详细信息
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
"""
|
||
try:
|
||
target_path = PROJECTS_DIR / file_path
|
||
|
||
if not target_path.exists():
|
||
raise HTTPException(status_code=404, detail="路径不存在")
|
||
|
||
stat = target_path.stat()
|
||
|
||
info = {
|
||
"name": target_path.name,
|
||
"path": file_path,
|
||
"type": "directory" if target_path.is_dir() else "file",
|
||
"size": stat.st_size if target_path.is_file() else 0,
|
||
"modified": stat.st_mtime,
|
||
"created": stat.st_ctime,
|
||
"permissions": oct(stat.st_mode)[-3:]
|
||
}
|
||
|
||
# 如果是文件,添加额外信息
|
||
if target_path.is_file():
|
||
mime_type, _ = mimetypes.guess_type(str(target_path))
|
||
info["mime_type"] = mime_type or "unknown"
|
||
|
||
# 读取文件内容预览(仅对小文件)
|
||
if stat.st_size < 1024 * 1024: # 小于1MB
|
||
try:
|
||
with open(target_path, 'r', encoding='utf-8') as f:
|
||
content = f.read(1000) # 读取前1000字符
|
||
info["preview"] = content
|
||
except:
|
||
info["preview"] = "[二进制文件,无法预览]"
|
||
|
||
return {
|
||
"success": True,
|
||
"info": info
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取文件信息失败: {str(e)}")
|
||
|
||
|
||
@router.post("/download-folder-zip")
|
||
async def download_folder_as_zip(request: Dict[str, str]):
|
||
"""
|
||
将文件夹压缩为ZIP并下载
|
||
|
||
Args:
|
||
request: 包含path字段的JSON对象
|
||
"""
|
||
try:
|
||
folder_path = request.get("path", "")
|
||
|
||
if not folder_path:
|
||
raise HTTPException(status_code=400, detail="路径不能为空")
|
||
|
||
target_path = PROJECTS_DIR / folder_path
|
||
|
||
if not target_path.exists():
|
||
raise HTTPException(status_code=404, detail="文件夹不存在")
|
||
|
||
if not target_path.is_dir():
|
||
raise HTTPException(status_code=400, detail="路径不是文件夹")
|
||
|
||
# 计算文件夹大小,检查是否过大
|
||
total_size = 0
|
||
file_count = 0
|
||
for file_path in target_path.rglob('*'):
|
||
if file_path.is_file():
|
||
total_size += file_path.stat().st_size
|
||
file_count += 1
|
||
|
||
# 限制最大500MB
|
||
max_size = 500 * 1024 * 1024
|
||
if total_size > max_size:
|
||
raise HTTPException(
|
||
status_code=413,
|
||
detail=f"文件夹过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}"
|
||
)
|
||
|
||
# 限制文件数量
|
||
max_files = 10000
|
||
if file_count > max_files:
|
||
raise HTTPException(
|
||
status_code=413,
|
||
detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件"
|
||
)
|
||
|
||
# 创建ZIP文件名
|
||
folder_name = target_path.name
|
||
zip_filename = f"{folder_name}.zip"
|
||
|
||
# 创建ZIP文件
|
||
zip_buffer = io.BytesIO()
|
||
|
||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
|
||
# 添加文件夹中的所有文件
|
||
for file_path in target_path.rglob('*'):
|
||
if file_path.is_file():
|
||
try:
|
||
# 计算相对路径,保持文件夹结构
|
||
arcname = file_path.relative_to(target_path)
|
||
zipf.write(file_path, arcname)
|
||
except (OSError, IOError) as e:
|
||
# 跳过无法读取的文件,但记录警告
|
||
logger.warning(f"Warning: Skipping file {file_path}: {e}")
|
||
continue
|
||
|
||
zip_buffer.seek(0)
|
||
|
||
# 设置响应头
|
||
headers = {
|
||
'Content-Disposition': f'attachment; filename="{zip_filename}"',
|
||
'Content-Type': 'application/zip',
|
||
'Content-Length': str(len(zip_buffer.getvalue())),
|
||
'Cache-Control': 'no-cache'
|
||
}
|
||
|
||
# 创建流式响应
|
||
from fastapi.responses import StreamingResponse
|
||
|
||
async def generate_zip():
|
||
zip_buffer.seek(0)
|
||
yield zip_buffer.getvalue()
|
||
|
||
return StreamingResponse(
|
||
generate_zip(),
|
||
media_type="application/zip",
|
||
headers=headers
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"压缩文件夹失败: {str(e)}")
|
||
|
||
|
||
@router.post("/download-multiple-zip")
|
||
async def download_multiple_items_as_zip(request: Dict[str, Any]):
|
||
"""
|
||
将多个文件和文件夹压缩为ZIP并下载
|
||
|
||
Args:
|
||
request: 包含paths和filename字段的JSON对象
|
||
"""
|
||
try:
|
||
paths = request.get("paths", [])
|
||
filename = request.get("filename", "batch_download.zip")
|
||
|
||
if not paths:
|
||
raise HTTPException(status_code=400, detail="请选择要下载的文件")
|
||
|
||
# 验证所有路径
|
||
valid_paths = []
|
||
total_size = 0
|
||
file_count = 0
|
||
|
||
for path in paths:
|
||
target_path = PROJECTS_DIR / path
|
||
|
||
if not target_path.exists():
|
||
continue # 跳过不存在的文件
|
||
|
||
if target_path.is_file():
|
||
total_size += target_path.stat().st_size
|
||
file_count += 1
|
||
valid_paths.append(path)
|
||
elif target_path.is_dir():
|
||
# 计算文件夹大小
|
||
for file_path in target_path.rglob('*'):
|
||
if file_path.is_file():
|
||
total_size += file_path.stat().st_size
|
||
file_count += 1
|
||
valid_paths.append(path)
|
||
|
||
if not valid_paths:
|
||
raise HTTPException(status_code=404, detail="没有找到有效的文件")
|
||
|
||
# 限制大小
|
||
max_size = 500 * 1024 * 1024
|
||
if total_size > max_size:
|
||
raise HTTPException(
|
||
status_code=413,
|
||
detail=f"选中文件过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}"
|
||
)
|
||
|
||
# 限制文件数量
|
||
max_files = 10000
|
||
if file_count > max_files:
|
||
raise HTTPException(
|
||
status_code=413,
|
||
detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件"
|
||
)
|
||
|
||
# 创建ZIP文件
|
||
zip_buffer = io.BytesIO()
|
||
|
||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
|
||
for path in valid_paths:
|
||
target_path = PROJECTS_DIR / path
|
||
|
||
if target_path.is_file():
|
||
# 单个文件
|
||
try:
|
||
zipf.write(target_path, target_path.name)
|
||
except (OSError, IOError) as e:
|
||
logger.warning(f"Warning: Skipping file {target_path}: {e}")
|
||
continue
|
||
elif target_path.is_dir():
|
||
# 文件夹
|
||
for file_path in target_path.rglob('*'):
|
||
if file_path.is_file():
|
||
try:
|
||
# 保持相对路径结构
|
||
arcname = f"{target_path.name}/{file_path.relative_to(target_path)}"
|
||
zipf.write(file_path, arcname)
|
||
except (OSError, IOError) as e:
|
||
logger.warning(f"Warning: Skipping file {file_path}: {e}")
|
||
continue
|
||
|
||
zip_buffer.seek(0)
|
||
|
||
# 设置响应头
|
||
headers = {
|
||
'Content-Disposition': f'attachment; filename="{filename}"',
|
||
'Content-Type': 'application/zip',
|
||
'Content-Length': str(len(zip_buffer.getvalue())),
|
||
'Cache-Control': 'no-cache'
|
||
}
|
||
|
||
# 创建流式响应
|
||
from fastapi.responses import StreamingResponse
|
||
|
||
async def generate_zip():
|
||
zip_buffer.seek(0)
|
||
yield zip_buffer.getvalue()
|
||
|
||
return StreamingResponse(
|
||
generate_zip(),
|
||
media_type="application/zip",
|
||
headers=headers
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"批量压缩失败: {str(e)}")
|
||
|
||
|
||
def formatFileSize(bytes_size: int) -> str:
|
||
"""格式化文件大小"""
|
||
if bytes_size == 0:
|
||
return "0 B"
|
||
|
||
k = 1024
|
||
sizes = ["B", "KB", "MB", "GB", "TB"]
|
||
i = int(math.floor(math.log(bytes_size, k)))
|
||
|
||
if i >= len(sizes):
|
||
i = len(sizes) - 1
|
||
|
||
return f"{bytes_size / math.pow(k, i):.1f} {sizes[i]}"
|
||
|
||
|
||
@router.post("/batch-operation")
|
||
async def batch_operation(operations: List[Dict[str, Any]]):
|
||
"""
|
||
批量操作多个文件
|
||
|
||
Args:
|
||
operations: 操作列表,每个操作包含type和相应的参数
|
||
"""
|
||
try:
|
||
results = []
|
||
|
||
for op in operations:
|
||
op_type = op.get("type")
|
||
op_result = {"type": op_type, "success": False}
|
||
|
||
try:
|
||
if op_type == "delete":
|
||
await delete_item(op["path"])
|
||
op_result["success"] = True
|
||
op_result["message"] = "删除成功"
|
||
elif op_type == "move":
|
||
await move_item(op["source_path"], op["target_path"])
|
||
op_result["success"] = True
|
||
op_result["message"] = "移动成功"
|
||
elif op_type == "copy":
|
||
await copy_item(op["source_path"], op["target_path"])
|
||
op_result["success"] = True
|
||
op_result["message"] = "复制成功"
|
||
else:
|
||
op_result["error"] = f"不支持的操作类型: {op_type}"
|
||
|
||
except Exception as e:
|
||
op_result["error"] = str(e)
|
||
|
||
results.append(op_result)
|
||
|
||
return {
|
||
"success": True,
|
||
"results": results,
|
||
"total": len(operations),
|
||
"successful": sum(1 for r in results if r["success"])
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"批量操作失败: {str(e)}") |