#!/usr/bin/env python3 """ File management API - HTTP alternative to WebDAV Provide RESTful endpoints to manage the projects folder """ import os import shutil from pathlib import Path from typing import List, Optional, Dict, Any from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query from fastapi.responses import FileResponse, StreamingResponse import mimetypes import json import zipfile import logging logger = logging.getLogger('app') import tempfile import io import math router = APIRouter(prefix="/api/v1/file-manager", tags=["file_management"]) # Support multiple directories: projects and prompt PROJECTS_DIR = Path(".") SUPPORTED_DIRECTORIES = ["projects", "prompt"] def resolve_path(path: str) -> Path: """Resolve a path and ensure it stays within allowed directories""" # If the path is empty, default to showing the supported directory list if not path: return PROJECTS_DIR # Check whether the path starts with a supported directory path_parts = Path(path).parts if not path_parts or path_parts[0] not in SUPPORTED_DIRECTORIES: raise HTTPException( status_code=400, detail=f"Path must start with one of the following directories: {', '.join(SUPPORTED_DIRECTORIES)}" ) target_path = PROJECTS_DIR / path # Normalize the path to prevent directory traversal attacks try: resolved_path = target_path.resolve() except Exception: raise HTTPException(status_code=400, detail="Invalid path") # Check whether the path stays within the allowed directories try: allowed_root = PROJECTS_DIR.resolve() # Check whether the resolved path still stays within the allowed directories try: resolved_path.relative_to(allowed_root) except ValueError: raise HTTPException(status_code=403, detail="Access denied") except Exception as e: raise HTTPException(status_code=403, detail="Access denied") return resolved_path @router.get("/list") async def list_files(path: str = "", recursive: bool = False): """ List directory contents Args: path: relative path; empty string means the root directory recursive: whether to recursively list all subdirectories """ try: # If the path is empty, return the supported directory list if not path: items = [] for dir_name in SUPPORTED_DIRECTORIES: dir_path = PROJECTS_DIR / dir_name if dir_path.exists() and dir_path.is_dir(): stat = dir_path.stat() items.append({ "name": dir_name, "path": dir_name, "type": "directory", "size": 0, "modified": stat.st_mtime, "created": stat.st_ctime }) return { "success": True, "path": "", "items": items, "total": len(items) } target_path = resolve_path(path) if not target_path.exists(): raise HTTPException(status_code=404, detail="Path does not exist") if not target_path.is_dir(): raise HTTPException(status_code=400, detail="Path is not a directory") def scan_directory(directory: Path, base_path: Path = PROJECTS_DIR) -> List[Dict[str, Any]]: items = [] # Use the resolved absolute path as the base base_resolved = PROJECTS_DIR.resolve() try: for item in directory.iterdir(): # Skip hidden files if item.name.startswith('.'): continue # Compute the path relative to the project root try: relative_path = item.relative_to(base_resolved) except ValueError: # If the relative path cannot be computed, try using the current directory as the base try: relative_path = item.relative_to(directory.resolve().parent) except ValueError: # As a final fallback, use the relative path directly relative_path = item stat = item.stat() item_info = { "name": item.name, "path": str(relative_path), "type": "directory" if item.is_dir() else "file", "size": stat.st_size if item.is_file() else 0, "modified": stat.st_mtime, "created": stat.st_ctime } items.append(item_info) # Recursively scan subdirectories if recursive and item.is_dir(): items.extend(scan_directory(item, base_path)) except PermissionError: pass return items items = scan_directory(target_path, Path(".")) return { "success": True, "path": path, "items": items, "total": len(items) } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to list directory: {str(e)}") @router.post("/upload") async def upload_file(file: UploadFile = File(...), path: str = Form("")): """ Upload a file Args: file: uploaded file path: target path relative to the supported directories """ try: target_path = resolve_path(path) if path else resolve_path("projects") if not target_path.exists() or not target_path.is_dir(): target_path = resolve_path("projects") target_path.mkdir(parents=True, exist_ok=True) # Replace spaces in the file name with underscores safe_filename = file.filename.replace(" ", "_") if file.filename else file.filename file_path = target_path / safe_filename # If the file already exists, check whether to overwrite it if file_path.exists(): # Versioning or renaming logic can be added here pass with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) return { "success": True, "message": "File uploaded successfully", "filename": safe_filename, "original_filename": file.filename, "path": str(Path(path) / safe_filename), "size": len(content) } except Exception as e: raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}") @router.get("/download/{file_path:path}") async def download_file(file_path: str): """ Download a file Args: file_path: relative file path """ try: target_path = resolve_path(file_path) if not target_path.exists(): raise HTTPException(status_code=404, detail="File does not exist") if not target_path.is_file(): raise HTTPException(status_code=400, detail="Path is not a file") # Guess the MIME type mime_type, _ = mimetypes.guess_type(str(target_path)) if mime_type is None: mime_type = "application/octet-stream" return FileResponse( path=str(target_path), filename=target_path.name, media_type=mime_type ) except Exception as e: raise HTTPException(status_code=500, detail=f"File download failed: {str(e)}") @router.delete("/delete") async def delete_item(path: str): """ Delete a file or directory Args: path: path to delete """ try: target_path = resolve_path(path) if not target_path.exists(): raise HTTPException(status_code=404, detail="Path does not exist") if target_path.is_file(): target_path.unlink() elif target_path.is_dir(): shutil.rmtree(target_path) return { "success": True, "message": f"{'File' if target_path.is_file() else 'Directory'} deleted successfully", "path": path } except Exception as e: raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}") @router.post("/create-folder") async def create_folder(request: Dict[str, str]): """ Create a folder Args: request: JSON object containing path and name fields """ try: path = request.get("path", "") name = request.get("name", "") if not name: raise HTTPException(status_code=400, detail="Folder name cannot be empty") parent_path = resolve_path(path) if path else resolve_path("projects") parent_path.mkdir(parents=True, exist_ok=True) new_folder = parent_path / name if new_folder.exists(): raise HTTPException(status_code=400, detail="Folder already exists") new_folder.mkdir() return { "success": True, "message": "Folder created successfully", "path": str(Path(path) / name) } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create folder: {str(e)}") @router.post("/rename") async def rename_item(old_path: str, new_name: str): """ Rename a file or folder Args: old_path: original path new_name: new name """ try: old_full_path = resolve_path(old_path) if not old_full_path.exists(): raise HTTPException(status_code=404, detail="File or directory does not exist") new_full_path = old_full_path.parent / new_name if new_full_path.exists(): raise HTTPException(status_code=400, detail="Target name already exists") old_full_path.rename(new_full_path) try: new_relative_path = new_full_path.relative_to(PROJECTS_DIR) except ValueError: new_relative_path = new_full_path return { "success": True, "message": "Renamed successfully", "old_path": old_path, "new_path": str(new_relative_path) } except Exception as e: raise HTTPException(status_code=500, detail=f"Rename failed: {str(e)}") @router.post("/move") async def move_item(source_path: str, target_path: str): """ Move a file or folder Args: source_path: source path target_path: target path """ try: source_full_path = resolve_path(source_path) target_full_path = resolve_path(target_path) if not source_full_path.exists(): raise HTTPException(status_code=404, detail="Source file or directory does not exist") # Ensure the target directory exists target_full_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(source_full_path), str(target_full_path)) return { "success": True, "message": "Moved successfully", "source_path": source_path, "target_path": target_path } except Exception as e: raise HTTPException(status_code=500, detail=f"Move failed: {str(e)}") @router.post("/copy") async def copy_item(source_path: str, target_path: str): """ Copy a file or folder Args: source_path: source path target_path: target path """ try: source_full_path = resolve_path(source_path) target_full_path = resolve_path(target_path) if not source_full_path.exists(): raise HTTPException(status_code=404, detail="Source file or directory does not exist") # Ensure the target directory exists target_full_path.parent.mkdir(parents=True, exist_ok=True) if source_full_path.is_file(): shutil.copy2(str(source_full_path), str(target_full_path)) else: shutil.copytree(str(source_full_path), str(target_full_path)) return { "success": True, "message": "Copied successfully", "source_path": source_path, "target_path": target_path } except Exception as e: raise HTTPException(status_code=500, detail=f"Copy failed: {str(e)}") @router.get("/search") async def search_files(query: str, path: str = "", file_type: Optional[str] = None): """ Search files Args: query: search keyword path: search path file_type: file type filter """ try: search_path = resolve_path(path) if path else Path(".") if not search_path.exists(): raise HTTPException(status_code=404, detail="Search path does not exist") results = [] def scan_for_files(directory: Path): try: for item in directory.iterdir(): if item.name.startswith('.'): continue # Check whether the file name contains the keyword if query.lower() in item.name.lower(): # Check the file type filter # Compute the path relative to the project root try: relative_path = item.relative_to(PROJECTS_DIR) except ValueError: # If the relative path cannot be computed, use the absolute path relative_path = item if file_type: if item.suffix.lower() == file_type.lower(): results.append({ "name": item.name, "path": str(relative_path), "type": "directory" if item.is_dir() else "file" }) else: results.append({ "name": item.name, "path": str(relative_path), "type": "directory" if item.is_dir() else "file" }) # Recursively search subdirectories if item.is_dir(): scan_for_files(item) except PermissionError: pass scan_for_files(search_path) return { "success": True, "query": query, "path": path, "results": results, "total": len(results) } except Exception as e: raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") @router.get("/read") async def read_file(path: str = Query(...)): """ Read file contents Args: path: relative file path """ try: target_path = resolve_path(path) if not target_path.exists(): raise HTTPException(status_code=404, detail="File does not exist") if not target_path.is_file(): raise HTTPException(status_code=400, detail="Path is not a file") # Check file size and limit reads to files under 10 MB file_size = target_path.stat().st_size max_size = 10 * 1024 * 1024 # 10MB if file_size > max_size: raise HTTPException( status_code=413, detail=f"File too large ({formatFileSize(file_size)}),maximum supported size {formatFileSize(max_size)}" ) # Try different encodings content = None encoding_used = None for encoding in ['utf-8', 'gbk', 'gb2312', 'latin-1']: try: with open(target_path, 'r', encoding=encoding) as f: content = f.read() encoding_used = encoding break except UnicodeDecodeError: continue if content is None: # If all encodings fail, try reading in binary mode and converting to printable characters try: with open(target_path, 'rb') as f: binary_content = f.read() content = binary_content.decode('utf-8', errors='replace') encoding_used = 'utf-8 (with replacement)' except Exception as e: raise HTTPException(status_code=400, detail=f"Unable to read file contents: {str(e)}") # Get file information stat = target_path.stat() mime_type, _ = mimetypes.guess_type(str(target_path)) return { "success": True, "content": content, "path": path, "size": file_size, "modified": stat.st_mtime, "encoding": encoding_used, "mime_type": mime_type or "unknown" } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}") @router.post("/save") async def save_file(request: Dict[str, str]): """ Save file contents Args: request: JSON object containing path and content fields """ try: logger.info(f"Received save request: {request}") path = request.get("path") content = request.get("content") logger.info(f"Parsed parameters: path={path}, content length={len(content) if content else 'None'}") if not path or content is None: raise HTTPException(status_code=400, detail="Missing required parameters: path and content") target_path = resolve_path(path) # Ensure the parent directory exists target_path.parent.mkdir(parents=True, exist_ok=True) # Check content size and limit saves to content under 5 MB content_size = len(content.encode('utf-8')) max_size = 5 * 1024 * 1024 # 5MB if content_size > max_size: raise HTTPException( status_code=413, detail=f"Content too large ({formatFileSize(content_size)}),maximum supported size {formatFileSize(max_size)}" ) # Create a backup if the file already exists backup_path = None if target_path.exists(): import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = target_path.with_suffix(f".{timestamp}.bak") try: shutil.copy2(target_path, backup_path) except Exception as e: logger.warning(f"Failed to create backup: {e}") # Write the file try: with open(target_path, 'w', encoding='utf-8') as f: f.write(content) except Exception as e: # Restore the backup if writing fails if backup_path and backup_path.exists(): try: shutil.copy2(backup_path, target_path) backup_path.unlink() # Delete the backup file except: pass raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}") # Delete the backup file if the save succeeds if backup_path and backup_path.exists(): try: backup_path.unlink() except: logger.warning(f"Failed to remove backup file: {backup_path}") # Get file information after saving stat = target_path.stat() return { "success": True, "message": "File saved successfully", "path": path, "size": stat.st_size, "modified": stat.st_mtime, "encoding": "utf-8" } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}") @router.get("/info/{file_path:path}") async def get_file_info(file_path: str): """ Get detailed file or folder information Args: file_path: file path """ try: target_path = resolve_path(file_path) if not target_path.exists(): raise HTTPException(status_code=404, detail="Path does not exist") stat = target_path.stat() info = { "name": target_path.name, "path": file_path, "type": "directory" if target_path.is_dir() else "file", "size": stat.st_size if target_path.is_file() else 0, "modified": stat.st_mtime, "created": stat.st_ctime, "permissions": oct(stat.st_mode)[-3:] } # If this is a file, add extra information if target_path.is_file(): mime_type, _ = mimetypes.guess_type(str(target_path)) info["mime_type"] = mime_type or "unknown" # Read a content preview for small files only if stat.st_size < 1024 * 1024: # smaller than 1 MB try: with open(target_path, 'r', encoding='utf-8') as f: content = f.read(1000) # Read the first 1000 characters info["preview"] = content except: info["preview"] = "[Binary file, preview unavailable]" return { "success": True, "info": info } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get file information: {str(e)}") @router.post("/download-folder-zip") async def download_folder_as_zip(request: Dict[str, str]): """ Compress a folder into a ZIP archive and download it Args: request: JSON object containing the path field """ try: folder_path = request.get("path", "") if not folder_path: raise HTTPException(status_code=400, detail="Path cannot be empty") target_path = resolve_path(folder_path) if not target_path.exists(): raise HTTPException(status_code=404, detail="Folder does not exist") if not target_path.is_dir(): raise HTTPException(status_code=400, detail="Path is not a folder") # Calculate folder size and check whether it is too large total_size = 0 file_count = 0 for file_path in target_path.rglob('*'): if file_path.is_file(): total_size += file_path.stat().st_size file_count += 1 # Limit the maximum size to 500 MB max_size = 500 * 1024 * 1024 if total_size > max_size: raise HTTPException( status_code=413, detail=f"Folder too large ({formatFileSize(total_size)}),maximum supported size {formatFileSize(max_size)}" ) # Limit the number of files max_files = 10000 if file_count > max_files: raise HTTPException( status_code=413, detail=f"Too many files ({file_count}),maximum supported size {max_files} files" ) # Create the ZIP file name folder_name = target_path.name zip_filename = f"{folder_name}.zip" # Create the ZIP archive zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: # Add all files in the folder for file_path in target_path.rglob('*'): if file_path.is_file(): try: # Compute relative paths to preserve the folder structure arcname = file_path.relative_to(target_path) zipf.write(file_path, arcname) except (OSError, IOError) as e: # Skip unreadable files but log a warning logger.warning(f"Warning: Skipping file {file_path}: {e}") continue zip_buffer.seek(0) # Set response headers headers = { 'Content-Disposition': f'attachment; filename="{zip_filename}"', 'Content-Type': 'application/zip', 'Content-Length': str(len(zip_buffer.getvalue())), 'Cache-Control': 'no-cache' } # Create a streaming response from fastapi.responses import StreamingResponse async def generate_zip(): zip_buffer.seek(0) yield zip_buffer.getvalue() return StreamingResponse( generate_zip(), media_type="application/zip", headers=headers ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to compress folder: {str(e)}") @router.post("/download-multiple-zip") async def download_multiple_items_as_zip(request: Dict[str, Any]): """ Compress multiple files and folders into a ZIP archive and download it Args: request: JSON object containing paths and filename fields """ try: paths = request.get("paths", []) filename = request.get("filename", "batch_download.zip") if not paths: raise HTTPException(status_code=400, detail="Please select files to download") # Validate all paths valid_paths = [] total_size = 0 file_count = 0 for path in paths: target_path = resolve_path(path) if not target_path.exists(): continue # Skip files that do not exist if target_path.is_file(): total_size += target_path.stat().st_size file_count += 1 valid_paths.append(path) elif target_path.is_dir(): # Calculate folder size for file_path in target_path.rglob('*'): if file_path.is_file(): total_size += file_path.stat().st_size file_count += 1 valid_paths.append(path) if not valid_paths: raise HTTPException(status_code=404, detail="No valid files were found") # Limit total size max_size = 500 * 1024 * 1024 if total_size > max_size: raise HTTPException( status_code=413, detail=f"Selected files are too large ({formatFileSize(total_size)}),maximum supported size {formatFileSize(max_size)}" ) # Limit the number of files max_files = 10000 if file_count > max_files: raise HTTPException( status_code=413, detail=f"Too many files ({file_count}),maximum supported size {max_files} files" ) # Create the ZIP archive zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: for path in valid_paths: target_path = PROJECTS_DIR / path if target_path.is_file(): # Single file try: zipf.write(target_path, target_path.name) except (OSError, IOError) as e: logger.warning(f"Warning: Skipping file {target_path}: {e}") continue elif target_path.is_dir(): # Folder for file_path in target_path.rglob('*'): if file_path.is_file(): try: # Preserve the relative path structure arcname = f"{target_path.name}/{file_path.relative_to(target_path)}" zipf.write(file_path, arcname) except (OSError, IOError) as e: logger.warning(f"Warning: Skipping file {file_path}: {e}") continue zip_buffer.seek(0) # Set response headers headers = { 'Content-Disposition': f'attachment; filename="{filename}"', 'Content-Type': 'application/zip', 'Content-Length': str(len(zip_buffer.getvalue())), 'Cache-Control': 'no-cache' } # Create a streaming response from fastapi.responses import StreamingResponse async def generate_zip(): zip_buffer.seek(0) yield zip_buffer.getvalue() return StreamingResponse( generate_zip(), media_type="application/zip", headers=headers ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Batch compression failed: {str(e)}") def formatFileSize(bytes_size: int) -> str: """Format file size""" if bytes_size == 0: return "0 B" k = 1024 sizes = ["B", "KB", "MB", "GB", "TB"] i = int(math.floor(math.log(bytes_size, k))) if i >= len(sizes): i = len(sizes) - 1 return f"{bytes_size / math.pow(k, i):.1f} {sizes[i]}" @router.post("/batch-operation") async def batch_operation(operations: List[Dict[str, Any]]): """ Run batch operations on multiple files Args: operations: list of operations, each containing type and corresponding parameters """ try: results = [] for op in operations: op_type = op.get("type") op_result = {"type": op_type, "success": False} try: if op_type == "delete": await delete_item(op["path"]) op_result["success"] = True op_result["message"] = "deleted successfully" elif op_type == "move": await move_item(op["source_path"], op["target_path"]) op_result["success"] = True op_result["message"] = "Moved successfully" elif op_type == "copy": await copy_item(op["source_path"], op["target_path"]) op_result["success"] = True op_result["message"] = "Copied successfully" else: op_result["error"] = f"Unsupported operation type: {op_type}" except Exception as e: op_result["error"] = str(e) results.append(op_result) return { "success": True, "results": results, "total": len(operations), "successful": sum(1 for r in results if r["success"]) } except Exception as e: raise HTTPException(status_code=500, detail=f"Batch operation failed: {str(e)}")