qwen_agent/routes/file_manager.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

943 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
File management API - HTTP alternative to WebDAV
Provide RESTful endpoints to manage the projects folder
"""
import os
import shutil
from pathlib import Path
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query
from fastapi.responses import FileResponse, StreamingResponse
import mimetypes
import json
import zipfile
import logging
logger = logging.getLogger('app')
import tempfile
import io
import math
router = APIRouter(prefix="/api/v1/file-manager", tags=["file_management"])
# Support multiple directories: projects and prompt
PROJECTS_DIR = Path(".")
SUPPORTED_DIRECTORIES = ["projects", "prompt"]
def resolve_path(path: str) -> Path:
"""Resolve a path and ensure it stays within allowed directories"""
# If the path is empty, default to showing the supported directory list
if not path:
return PROJECTS_DIR
# Check whether the path starts with a supported directory
path_parts = Path(path).parts
if not path_parts or path_parts[0] not in SUPPORTED_DIRECTORIES:
raise HTTPException(
status_code=400,
detail=f"Path must start with one of the following directories: {', '.join(SUPPORTED_DIRECTORIES)}"
)
target_path = PROJECTS_DIR / path
# Normalize the path to prevent directory traversal attacks
try:
resolved_path = target_path.resolve()
except Exception:
raise HTTPException(status_code=400, detail="Invalid path")
# Check whether the path stays within the allowed directories
try:
allowed_root = PROJECTS_DIR.resolve()
# Check whether the resolved path still stays within the allowed directories
try:
resolved_path.relative_to(allowed_root)
except ValueError:
raise HTTPException(status_code=403, detail="Access denied")
except Exception as e:
raise HTTPException(status_code=403, detail="Access denied")
return resolved_path
@router.get("/list")
async def list_files(path: str = "", recursive: bool = False):
"""
List directory contents
Args:
path: relative path; empty string means the root directory
recursive: whether to recursively list all subdirectories
"""
try:
# If the path is empty, return the supported directory list
if not path:
items = []
for dir_name in SUPPORTED_DIRECTORIES:
dir_path = PROJECTS_DIR / dir_name
if dir_path.exists() and dir_path.is_dir():
stat = dir_path.stat()
items.append({
"name": dir_name,
"path": dir_name,
"type": "directory",
"size": 0,
"modified": stat.st_mtime,
"created": stat.st_ctime
})
return {
"success": True,
"path": "",
"items": items,
"total": len(items)
}
target_path = resolve_path(path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="Path does not exist")
if not target_path.is_dir():
raise HTTPException(status_code=400, detail="Path is not a directory")
def scan_directory(directory: Path, base_path: Path = PROJECTS_DIR) -> List[Dict[str, Any]]:
items = []
# Use the resolved absolute path as the base
base_resolved = PROJECTS_DIR.resolve()
try:
for item in directory.iterdir():
# Skip hidden files
if item.name.startswith('.'):
continue
# Compute the path relative to the project root
try:
relative_path = item.relative_to(base_resolved)
except ValueError:
# If the relative path cannot be computed, try using the current directory as the base
try:
relative_path = item.relative_to(directory.resolve().parent)
except ValueError:
# As a final fallback, use the relative path directly
relative_path = item
stat = item.stat()
item_info = {
"name": item.name,
"path": str(relative_path),
"type": "directory" if item.is_dir() else "file",
"size": stat.st_size if item.is_file() else 0,
"modified": stat.st_mtime,
"created": stat.st_ctime
}
items.append(item_info)
# Recursively scan subdirectories
if recursive and item.is_dir():
items.extend(scan_directory(item, base_path))
except PermissionError:
pass
return items
items = scan_directory(target_path, Path("."))
return {
"success": True,
"path": path,
"items": items,
"total": len(items)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list directory: {str(e)}")
@router.post("/upload")
async def upload_file(file: UploadFile = File(...), path: str = Form("")):
"""
Upload a file
Args:
file: uploaded file
path: target path relative to the supported directories
"""
try:
target_path = resolve_path(path) if path else resolve_path("projects")
if not target_path.exists() or not target_path.is_dir():
target_path = resolve_path("projects")
target_path.mkdir(parents=True, exist_ok=True)
# Replace spaces in the file name with underscores
safe_filename = file.filename.replace(" ", "_") if file.filename else file.filename
file_path = target_path / safe_filename
# If the file already exists, check whether to overwrite it
if file_path.exists():
# Versioning or renaming logic can be added here
pass
with open(file_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
return {
"success": True,
"message": "File uploaded successfully",
"filename": safe_filename,
"original_filename": file.filename,
"path": str(Path(path) / safe_filename),
"size": len(content)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}")
@router.get("/download/{file_path:path}")
async def download_file(file_path: str):
"""
Download a file
Args:
file_path: relative file path
"""
try:
target_path = resolve_path(file_path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="File does not exist")
if not target_path.is_file():
raise HTTPException(status_code=400, detail="Path is not a file")
# Guess the MIME type
mime_type, _ = mimetypes.guess_type(str(target_path))
if mime_type is None:
mime_type = "application/octet-stream"
return FileResponse(
path=str(target_path),
filename=target_path.name,
media_type=mime_type
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"File download failed: {str(e)}")
@router.delete("/delete")
async def delete_item(path: str):
"""
Delete a file or directory
Args:
path: path to delete
"""
try:
target_path = resolve_path(path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="Path does not exist")
if target_path.is_file():
target_path.unlink()
elif target_path.is_dir():
shutil.rmtree(target_path)
return {
"success": True,
"message": f"{'File' if target_path.is_file() else 'Directory'} deleted successfully",
"path": path
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
@router.post("/create-folder")
async def create_folder(request: Dict[str, str]):
"""
Create a folder
Args:
request: JSON object containing path and name fields
"""
try:
path = request.get("path", "")
name = request.get("name", "")
if not name:
raise HTTPException(status_code=400, detail="Folder name cannot be empty")
parent_path = resolve_path(path) if path else resolve_path("projects")
parent_path.mkdir(parents=True, exist_ok=True)
new_folder = parent_path / name
if new_folder.exists():
raise HTTPException(status_code=400, detail="Folder already exists")
new_folder.mkdir()
return {
"success": True,
"message": "Folder created successfully",
"path": str(Path(path) / name)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create folder: {str(e)}")
@router.post("/rename")
async def rename_item(old_path: str, new_name: str):
"""
Rename a file or folder
Args:
old_path: original path
new_name: new name
"""
try:
old_full_path = resolve_path(old_path)
if not old_full_path.exists():
raise HTTPException(status_code=404, detail="File or directory does not exist")
new_full_path = old_full_path.parent / new_name
if new_full_path.exists():
raise HTTPException(status_code=400, detail="Target name already exists")
old_full_path.rename(new_full_path)
try:
new_relative_path = new_full_path.relative_to(PROJECTS_DIR)
except ValueError:
new_relative_path = new_full_path
return {
"success": True,
"message": "Renamed successfully",
"old_path": old_path,
"new_path": str(new_relative_path)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Rename failed: {str(e)}")
@router.post("/move")
async def move_item(source_path: str, target_path: str):
"""
Move a file or folder
Args:
source_path: source path
target_path: target path
"""
try:
source_full_path = resolve_path(source_path)
target_full_path = resolve_path(target_path)
if not source_full_path.exists():
raise HTTPException(status_code=404, detail="Source file or directory does not exist")
# Ensure the target directory exists
target_full_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source_full_path), str(target_full_path))
return {
"success": True,
"message": "Moved successfully",
"source_path": source_path,
"target_path": target_path
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Move failed: {str(e)}")
@router.post("/copy")
async def copy_item(source_path: str, target_path: str):
"""
Copy a file or folder
Args:
source_path: source path
target_path: target path
"""
try:
source_full_path = resolve_path(source_path)
target_full_path = resolve_path(target_path)
if not source_full_path.exists():
raise HTTPException(status_code=404, detail="Source file or directory does not exist")
# Ensure the target directory exists
target_full_path.parent.mkdir(parents=True, exist_ok=True)
if source_full_path.is_file():
shutil.copy2(str(source_full_path), str(target_full_path))
else:
shutil.copytree(str(source_full_path), str(target_full_path))
return {
"success": True,
"message": "Copied successfully",
"source_path": source_path,
"target_path": target_path
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Copy failed: {str(e)}")
@router.get("/search")
async def search_files(query: str, path: str = "", file_type: Optional[str] = None):
"""
Search files
Args:
query: search keyword
path: search path
file_type: file type filter
"""
try:
search_path = resolve_path(path) if path else Path(".")
if not search_path.exists():
raise HTTPException(status_code=404, detail="Search path does not exist")
results = []
def scan_for_files(directory: Path):
try:
for item in directory.iterdir():
if item.name.startswith('.'):
continue
# Check whether the file name contains the keyword
if query.lower() in item.name.lower():
# Check the file type filter
# Compute the path relative to the project root
try:
relative_path = item.relative_to(PROJECTS_DIR)
except ValueError:
# If the relative path cannot be computed, use the absolute path
relative_path = item
if file_type:
if item.suffix.lower() == file_type.lower():
results.append({
"name": item.name,
"path": str(relative_path),
"type": "directory" if item.is_dir() else "file"
})
else:
results.append({
"name": item.name,
"path": str(relative_path),
"type": "directory" if item.is_dir() else "file"
})
# Recursively search subdirectories
if item.is_dir():
scan_for_files(item)
except PermissionError:
pass
scan_for_files(search_path)
return {
"success": True,
"query": query,
"path": path,
"results": results,
"total": len(results)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
@router.get("/read")
async def read_file(path: str = Query(...)):
"""
Read file contents
Args:
path: relative file path
"""
try:
target_path = resolve_path(path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="File does not exist")
if not target_path.is_file():
raise HTTPException(status_code=400, detail="Path is not a file")
# Check file size and limit reads to files under 10 MB
file_size = target_path.stat().st_size
max_size = 10 * 1024 * 1024 # 10MB
if file_size > max_size:
raise HTTPException(
status_code=413,
detail=f"File too large ({formatFileSize(file_size)})maximum supported size {formatFileSize(max_size)}"
)
# Try different encodings
content = None
encoding_used = None
for encoding in ['utf-8', 'gbk', 'gb2312', 'latin-1']:
try:
with open(target_path, 'r', encoding=encoding) as f:
content = f.read()
encoding_used = encoding
break
except UnicodeDecodeError:
continue
if content is None:
# If all encodings fail, try reading in binary mode and converting to printable characters
try:
with open(target_path, 'rb') as f:
binary_content = f.read()
content = binary_content.decode('utf-8', errors='replace')
encoding_used = 'utf-8 (with replacement)'
except Exception as e:
raise HTTPException(status_code=400, detail=f"Unable to read file contents: {str(e)}")
# Get file information
stat = target_path.stat()
mime_type, _ = mimetypes.guess_type(str(target_path))
return {
"success": True,
"content": content,
"path": path,
"size": file_size,
"modified": stat.st_mtime,
"encoding": encoding_used,
"mime_type": mime_type or "unknown"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}")
@router.post("/save")
async def save_file(request: Dict[str, str]):
"""
Save file contents
Args:
request: JSON object containing path and content fields
"""
try:
logger.info(f"Received save request: {request}")
path = request.get("path")
content = request.get("content")
logger.info(f"Parsed parameters: path={path}, content length={len(content) if content else 'None'}")
if not path or content is None:
raise HTTPException(status_code=400, detail="Missing required parameters: path and content")
target_path = resolve_path(path)
# Ensure the parent directory exists
target_path.parent.mkdir(parents=True, exist_ok=True)
# Check content size and limit saves to content under 5 MB
content_size = len(content.encode('utf-8'))
max_size = 5 * 1024 * 1024 # 5MB
if content_size > max_size:
raise HTTPException(
status_code=413,
detail=f"Content too large ({formatFileSize(content_size)})maximum supported size {formatFileSize(max_size)}"
)
# Create a backup if the file already exists
backup_path = None
if target_path.exists():
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = target_path.with_suffix(f".{timestamp}.bak")
try:
shutil.copy2(target_path, backup_path)
except Exception as e:
logger.warning(f"Failed to create backup: {e}")
# Write the file
try:
with open(target_path, 'w', encoding='utf-8') as f:
f.write(content)
except Exception as e:
# Restore the backup if writing fails
if backup_path and backup_path.exists():
try:
shutil.copy2(backup_path, target_path)
backup_path.unlink() # Delete the backup file
except:
pass
raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}")
# Delete the backup file if the save succeeds
if backup_path and backup_path.exists():
try:
backup_path.unlink()
except:
logger.warning(f"Failed to remove backup file: {backup_path}")
# Get file information after saving
stat = target_path.stat()
return {
"success": True,
"message": "File saved successfully",
"path": path,
"size": stat.st_size,
"modified": stat.st_mtime,
"encoding": "utf-8"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}")
@router.get("/info/{file_path:path}")
async def get_file_info(file_path: str):
"""
Get detailed file or folder information
Args:
file_path: file path
"""
try:
target_path = resolve_path(file_path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="Path does not exist")
stat = target_path.stat()
info = {
"name": target_path.name,
"path": file_path,
"type": "directory" if target_path.is_dir() else "file",
"size": stat.st_size if target_path.is_file() else 0,
"modified": stat.st_mtime,
"created": stat.st_ctime,
"permissions": oct(stat.st_mode)[-3:]
}
# If this is a file, add extra information
if target_path.is_file():
mime_type, _ = mimetypes.guess_type(str(target_path))
info["mime_type"] = mime_type or "unknown"
# Read a content preview for small files only
if stat.st_size < 1024 * 1024: # smaller than 1 MB
try:
with open(target_path, 'r', encoding='utf-8') as f:
content = f.read(1000) # Read the first 1000 characters
info["preview"] = content
except:
info["preview"] = "[Binary file, preview unavailable]"
return {
"success": True,
"info": info
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get file information: {str(e)}")
@router.post("/download-folder-zip")
async def download_folder_as_zip(request: Dict[str, str]):
"""
Compress a folder into a ZIP archive and download it
Args:
request: JSON object containing the path field
"""
try:
folder_path = request.get("path", "")
if not folder_path:
raise HTTPException(status_code=400, detail="Path cannot be empty")
target_path = resolve_path(folder_path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="Folder does not exist")
if not target_path.is_dir():
raise HTTPException(status_code=400, detail="Path is not a folder")
# Calculate folder size and check whether it is too large
total_size = 0
file_count = 0
for file_path in target_path.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
# Limit the maximum size to 500 MB
max_size = 500 * 1024 * 1024
if total_size > max_size:
raise HTTPException(
status_code=413,
detail=f"Folder too large ({formatFileSize(total_size)})maximum supported size {formatFileSize(max_size)}"
)
# Limit the number of files
max_files = 10000
if file_count > max_files:
raise HTTPException(
status_code=413,
detail=f"Too many files ({file_count})maximum supported size {max_files} files"
)
# Create the ZIP file name
folder_name = target_path.name
zip_filename = f"{folder_name}.zip"
# Create the ZIP archive
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
# Add all files in the folder
for file_path in target_path.rglob('*'):
if file_path.is_file():
try:
# Compute relative paths to preserve the folder structure
arcname = file_path.relative_to(target_path)
zipf.write(file_path, arcname)
except (OSError, IOError) as e:
# Skip unreadable files but log a warning
logger.warning(f"Warning: Skipping file {file_path}: {e}")
continue
zip_buffer.seek(0)
# Set response headers
headers = {
'Content-Disposition': f'attachment; filename="{zip_filename}"',
'Content-Type': 'application/zip',
'Content-Length': str(len(zip_buffer.getvalue())),
'Cache-Control': 'no-cache'
}
# Create a streaming response
from fastapi.responses import StreamingResponse
async def generate_zip():
zip_buffer.seek(0)
yield zip_buffer.getvalue()
return StreamingResponse(
generate_zip(),
media_type="application/zip",
headers=headers
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to compress folder: {str(e)}")
@router.post("/download-multiple-zip")
async def download_multiple_items_as_zip(request: Dict[str, Any]):
"""
Compress multiple files and folders into a ZIP archive and download it
Args:
request: JSON object containing paths and filename fields
"""
try:
paths = request.get("paths", [])
filename = request.get("filename", "batch_download.zip")
if not paths:
raise HTTPException(status_code=400, detail="Please select files to download")
# Validate all paths
valid_paths = []
total_size = 0
file_count = 0
for path in paths:
target_path = resolve_path(path)
if not target_path.exists():
continue # Skip files that do not exist
if target_path.is_file():
total_size += target_path.stat().st_size
file_count += 1
valid_paths.append(path)
elif target_path.is_dir():
# Calculate folder size
for file_path in target_path.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
valid_paths.append(path)
if not valid_paths:
raise HTTPException(status_code=404, detail="No valid files were found")
# Limit total size
max_size = 500 * 1024 * 1024
if total_size > max_size:
raise HTTPException(
status_code=413,
detail=f"Selected files are too large ({formatFileSize(total_size)})maximum supported size {formatFileSize(max_size)}"
)
# Limit the number of files
max_files = 10000
if file_count > max_files:
raise HTTPException(
status_code=413,
detail=f"Too many files ({file_count})maximum supported size {max_files} files"
)
# Create the ZIP archive
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
for path in valid_paths:
target_path = PROJECTS_DIR / path
if target_path.is_file():
# Single file
try:
zipf.write(target_path, target_path.name)
except (OSError, IOError) as e:
logger.warning(f"Warning: Skipping file {target_path}: {e}")
continue
elif target_path.is_dir():
# Folder
for file_path in target_path.rglob('*'):
if file_path.is_file():
try:
# Preserve the relative path structure
arcname = f"{target_path.name}/{file_path.relative_to(target_path)}"
zipf.write(file_path, arcname)
except (OSError, IOError) as e:
logger.warning(f"Warning: Skipping file {file_path}: {e}")
continue
zip_buffer.seek(0)
# Set response headers
headers = {
'Content-Disposition': f'attachment; filename="{filename}"',
'Content-Type': 'application/zip',
'Content-Length': str(len(zip_buffer.getvalue())),
'Cache-Control': 'no-cache'
}
# Create a streaming response
from fastapi.responses import StreamingResponse
async def generate_zip():
zip_buffer.seek(0)
yield zip_buffer.getvalue()
return StreamingResponse(
generate_zip(),
media_type="application/zip",
headers=headers
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Batch compression failed: {str(e)}")
def formatFileSize(bytes_size: int) -> str:
"""Format file size"""
if bytes_size == 0:
return "0 B"
k = 1024
sizes = ["B", "KB", "MB", "GB", "TB"]
i = int(math.floor(math.log(bytes_size, k)))
if i >= len(sizes):
i = len(sizes) - 1
return f"{bytes_size / math.pow(k, i):.1f} {sizes[i]}"
@router.post("/batch-operation")
async def batch_operation(operations: List[Dict[str, Any]]):
"""
Run batch operations on multiple files
Args:
operations: list of operations, each containing type and corresponding parameters
"""
try:
results = []
for op in operations:
op_type = op.get("type")
op_result = {"type": op_type, "success": False}
try:
if op_type == "delete":
await delete_item(op["path"])
op_result["success"] = True
op_result["message"] = "deleted successfully"
elif op_type == "move":
await move_item(op["source_path"], op["target_path"])
op_result["success"] = True
op_result["message"] = "Moved successfully"
elif op_type == "copy":
await copy_item(op["source_path"], op["target_path"])
op_result["success"] = True
op_result["message"] = "Copied successfully"
else:
op_result["error"] = f"Unsupported operation type: {op_type}"
except Exception as e:
op_result["error"] = str(e)
results.append(op_result)
return {
"success": True,
"results": results,
"total": len(operations),
"successful": sum(1 for r in results if r["success"])
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Batch operation failed: {str(e)}")