#!/usr/bin/env python3 """ 文件管理API - WebDAV的HTTP API替代方案 提供RESTful接口来管理projects文件夹 """ import os import shutil from pathlib import Path from typing import List, Optional, Dict, Any from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query from fastapi.responses import FileResponse, StreamingResponse import mimetypes import json import zipfile import tempfile import io import math router = APIRouter(prefix="/api/v1/files", tags=["file_management"]) PROJECTS_DIR = Path("projects") PROJECTS_DIR.mkdir(exist_ok=True) @router.get("/list") async def list_files(path: str = "", recursive: bool = False): """ 列出目录内容 Args: path: 相对路径,空字符串表示根目录 recursive: 是否递归列出所有子目录 """ try: target_path = PROJECTS_DIR / path if not target_path.exists(): raise HTTPException(status_code=404, detail="路径不存在") if not target_path.is_dir(): raise HTTPException(status_code=400, detail="路径不是目录") def scan_directory(directory: Path, base_path: Path = PROJECTS_DIR) -> List[Dict[str, Any]]: items = [] try: for item in directory.iterdir(): # 跳过隐藏文件 if item.name.startswith('.'): continue relative_path = item.relative_to(base_path) stat = item.stat() item_info = { "name": item.name, "path": str(relative_path), "type": "directory" if item.is_dir() else "file", "size": stat.st_size if item.is_file() else 0, "modified": stat.st_mtime, "created": stat.st_ctime } items.append(item_info) # 递归扫描子目录 if recursive and item.is_dir(): items.extend(scan_directory(item, base_path)) except PermissionError: pass return items items = scan_directory(target_path) return { "success": True, "path": path, "items": items, "total": len(items) } except Exception as e: raise HTTPException(status_code=500, detail=f"列出目录失败: {str(e)}") @router.post("/upload") async def upload_file(file: UploadFile = File(...), path: str = Form("")): """ 上传文件 Args: file: 上传的文件 path: 目标路径(相对于projects目录) """ try: target_dir = PROJECTS_DIR / path target_dir.mkdir(parents=True, exist_ok=True) file_path = target_dir / file.filename # 如果文件已存在,检查是否覆盖 if file_path.exists(): # 可以添加版本控制或重命名逻辑 pass with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) return { "success": True, "message": "文件上传成功", "filename": file.filename, "path": str(Path(path) / file.filename), "size": len(content) } except Exception as e: raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}") @router.get("/download/{file_path:path}") async def download_file(file_path: str): """ 下载文件 Args: file_path: 文件相对路径 """ try: target_path = PROJECTS_DIR / file_path if not target_path.exists(): raise HTTPException(status_code=404, detail="文件不存在") if not target_path.is_file(): raise HTTPException(status_code=400, detail="不是文件") # 猜测MIME类型 mime_type, _ = mimetypes.guess_type(str(target_path)) if mime_type is None: mime_type = "application/octet-stream" return FileResponse( path=str(target_path), filename=target_path.name, media_type=mime_type ) except Exception as e: raise HTTPException(status_code=500, detail=f"文件下载失败: {str(e)}") @router.delete("/delete") async def delete_item(path: str): """ 删除文件或目录 Args: path: 要删除的路径 """ try: target_path = PROJECTS_DIR / path if not target_path.exists(): raise HTTPException(status_code=404, detail="路径不存在") if target_path.is_file(): target_path.unlink() elif target_path.is_dir(): shutil.rmtree(target_path) return { "success": True, "message": f"{'文件' if target_path.is_file() else '目录'}删除成功", "path": path } except Exception as e: raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}") @router.post("/create-folder") async def create_folder(path: str, name: str): """ 创建文件夹 Args: path: 父目录路径 name: 新文件夹名称 """ try: parent_path = PROJECTS_DIR / path parent_path.mkdir(parents=True, exist_ok=True) new_folder = parent_path / name if new_folder.exists(): raise HTTPException(status_code=400, detail="文件夹已存在") new_folder.mkdir() return { "success": True, "message": "文件夹创建成功", "path": str(Path(path) / name) } except Exception as e: raise HTTPException(status_code=500, detail=f"创建文件夹失败: {str(e)}") @router.post("/rename") async def rename_item(old_path: str, new_name: str): """ 重命名文件或文件夹 Args: old_path: 原路径 new_name: 新名称 """ try: old_full_path = PROJECTS_DIR / old_path if not old_full_path.exists(): raise HTTPException(status_code=404, detail="文件或目录不存在") new_full_path = old_full_path.parent / new_name if new_full_path.exists(): raise HTTPException(status_code=400, detail="目标名称已存在") old_full_path.rename(new_full_path) return { "success": True, "message": "重命名成功", "old_path": old_path, "new_path": str(new_full_path.relative_to(PROJECTS_DIR)) } except Exception as e: raise HTTPException(status_code=500, detail=f"重命名失败: {str(e)}") @router.post("/move") async def move_item(source_path: str, target_path: str): """ 移动文件或文件夹 Args: source_path: 源路径 target_path: 目标路径 """ try: source_full_path = PROJECTS_DIR / source_path target_full_path = PROJECTS_DIR / target_path if not source_full_path.exists(): raise HTTPException(status_code=404, detail="源文件或目录不存在") # 确保目标目录存在 target_full_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source_full_path), str(target_full_path)) return { "success": True, "message": "移动成功", "source_path": source_path, "target_path": target_path } except Exception as e: raise HTTPException(status_code=500, detail=f"移动失败: {str(e)}") @router.post("/copy") async def copy_item(source_path: str, target_path: str): """ 复制文件或文件夹 Args: source_path: 源路径 target_path: 目标路径 """ try: source_full_path = PROJECTS_DIR / source_path target_full_path = PROJECTS_DIR / target_path if not source_full_path.exists(): raise HTTPException(status_code=404, detail="源文件或目录不存在") # 确保目标目录存在 target_full_path.parent.mkdir(parents=True, exist_ok=True) if source_full_path.is_file(): shutil.copy2(str(source_full_path), str(target_full_path)) else: shutil.copytree(str(source_full_path), str(target_full_path)) return { "success": True, "message": "复制成功", "source_path": source_path, "target_path": target_path } except Exception as e: raise HTTPException(status_code=500, detail=f"复制失败: {str(e)}") @router.get("/search") async def search_files(query: str, path: str = "", file_type: Optional[str] = None): """ 搜索文件 Args: query: 搜索关键词 path: 搜索路径 file_type: 文件类型过滤 """ try: search_path = PROJECTS_DIR / path if not search_path.exists(): raise HTTPException(status_code=404, detail="搜索路径不存在") results = [] def scan_for_files(directory: Path): try: for item in directory.iterdir(): if item.name.startswith('.'): continue # 检查文件名是否包含关键词 if query.lower() in item.name.lower(): # 检查文件类型过滤 if file_type: if item.suffix.lower() == file_type.lower(): results.append({ "name": item.name, "path": str(item.relative_to(PROJECTS_DIR)), "type": "directory" if item.is_dir() else "file" }) else: results.append({ "name": item.name, "path": str(item.relative_to(PROJECTS_DIR)), "type": "directory" if item.is_dir() else "file" }) # 递归搜索子目录 if item.is_dir(): scan_for_files(item) except PermissionError: pass scan_for_files(search_path) return { "success": True, "query": query, "path": path, "results": results, "total": len(results) } except Exception as e: raise HTTPException(status_code=500, detail=f"搜索失败: {str(e)}") @router.get("/info/{file_path:path}") async def get_file_info(file_path: str): """ 获取文件或文件夹详细信息 Args: file_path: 文件路径 """ try: target_path = PROJECTS_DIR / file_path if not target_path.exists(): raise HTTPException(status_code=404, detail="路径不存在") stat = target_path.stat() info = { "name": target_path.name, "path": file_path, "type": "directory" if target_path.is_dir() else "file", "size": stat.st_size if target_path.is_file() else 0, "modified": stat.st_mtime, "created": stat.st_ctime, "permissions": oct(stat.st_mode)[-3:] } # 如果是文件,添加额外信息 if target_path.is_file(): mime_type, _ = mimetypes.guess_type(str(target_path)) info["mime_type"] = mime_type or "unknown" # 读取文件内容预览(仅对小文件) if stat.st_size < 1024 * 1024: # 小于1MB try: with open(target_path, 'r', encoding='utf-8') as f: content = f.read(1000) # 读取前1000字符 info["preview"] = content except: info["preview"] = "[二进制文件,无法预览]" return { "success": True, "info": info } except Exception as e: raise HTTPException(status_code=500, detail=f"获取文件信息失败: {str(e)}") @router.post("/download-folder-zip") async def download_folder_as_zip(request: Dict[str, str]): """ 将文件夹压缩为ZIP并下载 Args: request: 包含path字段的JSON对象 """ try: folder_path = request.get("path", "") if not folder_path: raise HTTPException(status_code=400, detail="路径不能为空") target_path = PROJECTS_DIR / folder_path if not target_path.exists(): raise HTTPException(status_code=404, detail="文件夹不存在") if not target_path.is_dir(): raise HTTPException(status_code=400, detail="路径不是文件夹") # 计算文件夹大小,检查是否过大 total_size = 0 file_count = 0 for file_path in target_path.rglob('*'): if file_path.is_file(): total_size += file_path.stat().st_size file_count += 1 # 限制最大500MB max_size = 500 * 1024 * 1024 if total_size > max_size: raise HTTPException( status_code=413, detail=f"文件夹过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}" ) # 限制文件数量 max_files = 10000 if file_count > max_files: raise HTTPException( status_code=413, detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件" ) # 创建ZIP文件名 folder_name = target_path.name zip_filename = f"{folder_name}.zip" # 创建ZIP文件 zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: # 添加文件夹中的所有文件 for file_path in target_path.rglob('*'): if file_path.is_file(): try: # 计算相对路径,保持文件夹结构 arcname = file_path.relative_to(target_path) zipf.write(file_path, arcname) except (OSError, IOError) as e: # 跳过无法读取的文件,但记录警告 print(f"Warning: Skipping file {file_path}: {e}") continue zip_buffer.seek(0) # 设置响应头 headers = { 'Content-Disposition': f'attachment; filename="{zip_filename}"', 'Content-Type': 'application/zip', 'Content-Length': str(len(zip_buffer.getvalue())), 'Cache-Control': 'no-cache' } # 创建流式响应 from fastapi.responses import StreamingResponse async def generate_zip(): zip_buffer.seek(0) yield zip_buffer.getvalue() return StreamingResponse( generate_zip(), media_type="application/zip", headers=headers ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"压缩文件夹失败: {str(e)}") @router.post("/download-multiple-zip") async def download_multiple_items_as_zip(request: Dict[str, Any]): """ 将多个文件和文件夹压缩为ZIP并下载 Args: request: 包含paths和filename字段的JSON对象 """ try: paths = request.get("paths", []) filename = request.get("filename", "batch_download.zip") if not paths: raise HTTPException(status_code=400, detail="请选择要下载的文件") # 验证所有路径 valid_paths = [] total_size = 0 file_count = 0 for path in paths: target_path = PROJECTS_DIR / path if not target_path.exists(): continue # 跳过不存在的文件 if target_path.is_file(): total_size += target_path.stat().st_size file_count += 1 valid_paths.append(path) elif target_path.is_dir(): # 计算文件夹大小 for file_path in target_path.rglob('*'): if file_path.is_file(): total_size += file_path.stat().st_size file_count += 1 valid_paths.append(path) if not valid_paths: raise HTTPException(status_code=404, detail="没有找到有效的文件") # 限制大小 max_size = 500 * 1024 * 1024 if total_size > max_size: raise HTTPException( status_code=413, detail=f"选中文件过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}" ) # 限制文件数量 max_files = 10000 if file_count > max_files: raise HTTPException( status_code=413, detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件" ) # 创建ZIP文件 zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: for path in valid_paths: target_path = PROJECTS_DIR / path if target_path.is_file(): # 单个文件 try: zipf.write(target_path, target_path.name) except (OSError, IOError) as e: print(f"Warning: Skipping file {target_path}: {e}") continue elif target_path.is_dir(): # 文件夹 for file_path in target_path.rglob('*'): if file_path.is_file(): try: # 保持相对路径结构 arcname = f"{target_path.name}/{file_path.relative_to(target_path)}" zipf.write(file_path, arcname) except (OSError, IOError) as e: print(f"Warning: Skipping file {file_path}: {e}") continue zip_buffer.seek(0) # 设置响应头 headers = { 'Content-Disposition': f'attachment; filename="{filename}"', 'Content-Type': 'application/zip', 'Content-Length': str(len(zip_buffer.getvalue())), 'Cache-Control': 'no-cache' } # 创建流式响应 from fastapi.responses import StreamingResponse async def generate_zip(): zip_buffer.seek(0) yield zip_buffer.getvalue() return StreamingResponse( generate_zip(), media_type="application/zip", headers=headers ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"批量压缩失败: {str(e)}") def formatFileSize(bytes_size: int) -> str: """格式化文件大小""" if bytes_size == 0: return "0 B" k = 1024 sizes = ["B", "KB", "MB", "GB", "TB"] i = int(math.floor(math.log(bytes_size, k))) if i >= len(sizes): i = len(sizes) - 1 return f"{bytes_size / math.pow(k, i):.1f} {sizes[i]}" @router.post("/batch-operation") async def batch_operation(operations: List[Dict[str, Any]]): """ 批量操作多个文件 Args: operations: 操作列表,每个操作包含type和相应的参数 """ try: results = [] for op in operations: op_type = op.get("type") op_result = {"type": op_type, "success": False} try: if op_type == "delete": await delete_item(op["path"]) op_result["success"] = True op_result["message"] = "删除成功" elif op_type == "move": await move_item(op["source_path"], op["target_path"]) op_result["success"] = True op_result["message"] = "移动成功" elif op_type == "copy": await copy_item(op["source_path"], op["target_path"]) op_result["success"] = True op_result["message"] = "复制成功" else: op_result["error"] = f"不支持的操作类型: {op_type}" except Exception as e: op_result["error"] = str(e) results.append(op_result) return { "success": True, "results": results, "total": len(operations), "successful": sum(1 for r in results if r["success"]) } except Exception as e: raise HTTPException(status_code=500, detail=f"批量操作失败: {str(e)}")