qwen_agent/file_manager_api.py
2025-11-08 20:23:04 +08:00

691 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
文件管理API - WebDAV的HTTP API替代方案
提供RESTful接口来管理projects文件夹
"""
import os
import shutil
from pathlib import Path
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query
from fastapi.responses import FileResponse, StreamingResponse
import mimetypes
import json
import zipfile
import tempfile
import io
import math
router = APIRouter(prefix="/api/v1/files", tags=["file_management"])
PROJECTS_DIR = Path("projects")
PROJECTS_DIR.mkdir(exist_ok=True)
@router.get("/list")
async def list_files(path: str = "", recursive: bool = False):
"""
列出目录内容
Args:
path: 相对路径,空字符串表示根目录
recursive: 是否递归列出所有子目录
"""
try:
target_path = PROJECTS_DIR / path
if not target_path.exists():
raise HTTPException(status_code=404, detail="路径不存在")
if not target_path.is_dir():
raise HTTPException(status_code=400, detail="路径不是目录")
def scan_directory(directory: Path, base_path: Path = PROJECTS_DIR) -> List[Dict[str, Any]]:
items = []
try:
for item in directory.iterdir():
# 跳过隐藏文件
if item.name.startswith('.'):
continue
relative_path = item.relative_to(base_path)
stat = item.stat()
item_info = {
"name": item.name,
"path": str(relative_path),
"type": "directory" if item.is_dir() else "file",
"size": stat.st_size if item.is_file() else 0,
"modified": stat.st_mtime,
"created": stat.st_ctime
}
items.append(item_info)
# 递归扫描子目录
if recursive and item.is_dir():
items.extend(scan_directory(item, base_path))
except PermissionError:
pass
return items
items = scan_directory(target_path)
return {
"success": True,
"path": path,
"items": items,
"total": len(items)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"列出目录失败: {str(e)}")
@router.post("/upload")
async def upload_file(file: UploadFile = File(...), path: str = Form("")):
"""
上传文件
Args:
file: 上传的文件
path: 目标路径相对于projects目录
"""
try:
target_dir = PROJECTS_DIR / path
target_dir.mkdir(parents=True, exist_ok=True)
file_path = target_dir / file.filename
# 如果文件已存在,检查是否覆盖
if file_path.exists():
# 可以添加版本控制或重命名逻辑
pass
with open(file_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
return {
"success": True,
"message": "文件上传成功",
"filename": file.filename,
"path": str(Path(path) / file.filename),
"size": len(content)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}")
@router.get("/download/{file_path:path}")
async def download_file(file_path: str):
"""
下载文件
Args:
file_path: 文件相对路径
"""
try:
target_path = PROJECTS_DIR / file_path
if not target_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
if not target_path.is_file():
raise HTTPException(status_code=400, detail="不是文件")
# 猜测MIME类型
mime_type, _ = mimetypes.guess_type(str(target_path))
if mime_type is None:
mime_type = "application/octet-stream"
return FileResponse(
path=str(target_path),
filename=target_path.name,
media_type=mime_type
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"文件下载失败: {str(e)}")
@router.delete("/delete")
async def delete_item(path: str):
"""
删除文件或目录
Args:
path: 要删除的路径
"""
try:
target_path = PROJECTS_DIR / path
if not target_path.exists():
raise HTTPException(status_code=404, detail="路径不存在")
if target_path.is_file():
target_path.unlink()
elif target_path.is_dir():
shutil.rmtree(target_path)
return {
"success": True,
"message": f"{'文件' if target_path.is_file() else '目录'}删除成功",
"path": path
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
@router.post("/create-folder")
async def create_folder(path: str, name: str):
"""
创建文件夹
Args:
path: 父目录路径
name: 新文件夹名称
"""
try:
parent_path = PROJECTS_DIR / path
parent_path.mkdir(parents=True, exist_ok=True)
new_folder = parent_path / name
if new_folder.exists():
raise HTTPException(status_code=400, detail="文件夹已存在")
new_folder.mkdir()
return {
"success": True,
"message": "文件夹创建成功",
"path": str(Path(path) / name)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"创建文件夹失败: {str(e)}")
@router.post("/rename")
async def rename_item(old_path: str, new_name: str):
"""
重命名文件或文件夹
Args:
old_path: 原路径
new_name: 新名称
"""
try:
old_full_path = PROJECTS_DIR / old_path
if not old_full_path.exists():
raise HTTPException(status_code=404, detail="文件或目录不存在")
new_full_path = old_full_path.parent / new_name
if new_full_path.exists():
raise HTTPException(status_code=400, detail="目标名称已存在")
old_full_path.rename(new_full_path)
return {
"success": True,
"message": "重命名成功",
"old_path": old_path,
"new_path": str(new_full_path.relative_to(PROJECTS_DIR))
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"重命名失败: {str(e)}")
@router.post("/move")
async def move_item(source_path: str, target_path: str):
"""
移动文件或文件夹
Args:
source_path: 源路径
target_path: 目标路径
"""
try:
source_full_path = PROJECTS_DIR / source_path
target_full_path = PROJECTS_DIR / target_path
if not source_full_path.exists():
raise HTTPException(status_code=404, detail="源文件或目录不存在")
# 确保目标目录存在
target_full_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source_full_path), str(target_full_path))
return {
"success": True,
"message": "移动成功",
"source_path": source_path,
"target_path": target_path
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"移动失败: {str(e)}")
@router.post("/copy")
async def copy_item(source_path: str, target_path: str):
"""
复制文件或文件夹
Args:
source_path: 源路径
target_path: 目标路径
"""
try:
source_full_path = PROJECTS_DIR / source_path
target_full_path = PROJECTS_DIR / target_path
if not source_full_path.exists():
raise HTTPException(status_code=404, detail="源文件或目录不存在")
# 确保目标目录存在
target_full_path.parent.mkdir(parents=True, exist_ok=True)
if source_full_path.is_file():
shutil.copy2(str(source_full_path), str(target_full_path))
else:
shutil.copytree(str(source_full_path), str(target_full_path))
return {
"success": True,
"message": "复制成功",
"source_path": source_path,
"target_path": target_path
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"复制失败: {str(e)}")
@router.get("/search")
async def search_files(query: str, path: str = "", file_type: Optional[str] = None):
"""
搜索文件
Args:
query: 搜索关键词
path: 搜索路径
file_type: 文件类型过滤
"""
try:
search_path = PROJECTS_DIR / path
if not search_path.exists():
raise HTTPException(status_code=404, detail="搜索路径不存在")
results = []
def scan_for_files(directory: Path):
try:
for item in directory.iterdir():
if item.name.startswith('.'):
continue
# 检查文件名是否包含关键词
if query.lower() in item.name.lower():
# 检查文件类型过滤
if file_type:
if item.suffix.lower() == file_type.lower():
results.append({
"name": item.name,
"path": str(item.relative_to(PROJECTS_DIR)),
"type": "directory" if item.is_dir() else "file"
})
else:
results.append({
"name": item.name,
"path": str(item.relative_to(PROJECTS_DIR)),
"type": "directory" if item.is_dir() else "file"
})
# 递归搜索子目录
if item.is_dir():
scan_for_files(item)
except PermissionError:
pass
scan_for_files(search_path)
return {
"success": True,
"query": query,
"path": path,
"results": results,
"total": len(results)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"搜索失败: {str(e)}")
@router.get("/info/{file_path:path}")
async def get_file_info(file_path: str):
"""
获取文件或文件夹详细信息
Args:
file_path: 文件路径
"""
try:
target_path = PROJECTS_DIR / file_path
if not target_path.exists():
raise HTTPException(status_code=404, detail="路径不存在")
stat = target_path.stat()
info = {
"name": target_path.name,
"path": file_path,
"type": "directory" if target_path.is_dir() else "file",
"size": stat.st_size if target_path.is_file() else 0,
"modified": stat.st_mtime,
"created": stat.st_ctime,
"permissions": oct(stat.st_mode)[-3:]
}
# 如果是文件,添加额外信息
if target_path.is_file():
mime_type, _ = mimetypes.guess_type(str(target_path))
info["mime_type"] = mime_type or "unknown"
# 读取文件内容预览(仅对小文件)
if stat.st_size < 1024 * 1024: # 小于1MB
try:
with open(target_path, 'r', encoding='utf-8') as f:
content = f.read(1000) # 读取前1000字符
info["preview"] = content
except:
info["preview"] = "[二进制文件,无法预览]"
return {
"success": True,
"info": info
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取文件信息失败: {str(e)}")
@router.post("/download-folder-zip")
async def download_folder_as_zip(request: Dict[str, str]):
"""
将文件夹压缩为ZIP并下载
Args:
request: 包含path字段的JSON对象
"""
try:
folder_path = request.get("path", "")
if not folder_path:
raise HTTPException(status_code=400, detail="路径不能为空")
target_path = PROJECTS_DIR / folder_path
if not target_path.exists():
raise HTTPException(status_code=404, detail="文件夹不存在")
if not target_path.is_dir():
raise HTTPException(status_code=400, detail="路径不是文件夹")
# 计算文件夹大小,检查是否过大
total_size = 0
file_count = 0
for file_path in target_path.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
# 限制最大500MB
max_size = 500 * 1024 * 1024
if total_size > max_size:
raise HTTPException(
status_code=413,
detail=f"文件夹过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}"
)
# 限制文件数量
max_files = 10000
if file_count > max_files:
raise HTTPException(
status_code=413,
detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件"
)
# 创建ZIP文件名
folder_name = target_path.name
zip_filename = f"{folder_name}.zip"
# 创建ZIP文件
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
# 添加文件夹中的所有文件
for file_path in target_path.rglob('*'):
if file_path.is_file():
try:
# 计算相对路径,保持文件夹结构
arcname = file_path.relative_to(target_path)
zipf.write(file_path, arcname)
except (OSError, IOError) as e:
# 跳过无法读取的文件,但记录警告
print(f"Warning: Skipping file {file_path}: {e}")
continue
zip_buffer.seek(0)
# 设置响应头
headers = {
'Content-Disposition': f'attachment; filename="{zip_filename}"',
'Content-Type': 'application/zip',
'Content-Length': str(len(zip_buffer.getvalue())),
'Cache-Control': 'no-cache'
}
# 创建流式响应
from fastapi.responses import StreamingResponse
async def generate_zip():
zip_buffer.seek(0)
yield zip_buffer.getvalue()
return StreamingResponse(
generate_zip(),
media_type="application/zip",
headers=headers
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"压缩文件夹失败: {str(e)}")
@router.post("/download-multiple-zip")
async def download_multiple_items_as_zip(request: Dict[str, Any]):
"""
将多个文件和文件夹压缩为ZIP并下载
Args:
request: 包含paths和filename字段的JSON对象
"""
try:
paths = request.get("paths", [])
filename = request.get("filename", "batch_download.zip")
if not paths:
raise HTTPException(status_code=400, detail="请选择要下载的文件")
# 验证所有路径
valid_paths = []
total_size = 0
file_count = 0
for path in paths:
target_path = PROJECTS_DIR / path
if not target_path.exists():
continue # 跳过不存在的文件
if target_path.is_file():
total_size += target_path.stat().st_size
file_count += 1
valid_paths.append(path)
elif target_path.is_dir():
# 计算文件夹大小
for file_path in target_path.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
valid_paths.append(path)
if not valid_paths:
raise HTTPException(status_code=404, detail="没有找到有效的文件")
# 限制大小
max_size = 500 * 1024 * 1024
if total_size > max_size:
raise HTTPException(
status_code=413,
detail=f"选中文件过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}"
)
# 限制文件数量
max_files = 10000
if file_count > max_files:
raise HTTPException(
status_code=413,
detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件"
)
# 创建ZIP文件
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
for path in valid_paths:
target_path = PROJECTS_DIR / path
if target_path.is_file():
# 单个文件
try:
zipf.write(target_path, target_path.name)
except (OSError, IOError) as e:
print(f"Warning: Skipping file {target_path}: {e}")
continue
elif target_path.is_dir():
# 文件夹
for file_path in target_path.rglob('*'):
if file_path.is_file():
try:
# 保持相对路径结构
arcname = f"{target_path.name}/{file_path.relative_to(target_path)}"
zipf.write(file_path, arcname)
except (OSError, IOError) as e:
print(f"Warning: Skipping file {file_path}: {e}")
continue
zip_buffer.seek(0)
# 设置响应头
headers = {
'Content-Disposition': f'attachment; filename="{filename}"',
'Content-Type': 'application/zip',
'Content-Length': str(len(zip_buffer.getvalue())),
'Cache-Control': 'no-cache'
}
# 创建流式响应
from fastapi.responses import StreamingResponse
async def generate_zip():
zip_buffer.seek(0)
yield zip_buffer.getvalue()
return StreamingResponse(
generate_zip(),
media_type="application/zip",
headers=headers
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"批量压缩失败: {str(e)}")
def formatFileSize(bytes_size: int) -> str:
"""格式化文件大小"""
if bytes_size == 0:
return "0 B"
k = 1024
sizes = ["B", "KB", "MB", "GB", "TB"]
i = int(math.floor(math.log(bytes_size, k)))
if i >= len(sizes):
i = len(sizes) - 1
return f"{bytes_size / math.pow(k, i):.1f} {sizes[i]}"
@router.post("/batch-operation")
async def batch_operation(operations: List[Dict[str, Any]]):
"""
批量操作多个文件
Args:
operations: 操作列表每个操作包含type和相应的参数
"""
try:
results = []
for op in operations:
op_type = op.get("type")
op_result = {"type": op_type, "success": False}
try:
if op_type == "delete":
await delete_item(op["path"])
op_result["success"] = True
op_result["message"] = "删除成功"
elif op_type == "move":
await move_item(op["source_path"], op["target_path"])
op_result["success"] = True
op_result["message"] = "移动成功"
elif op_type == "copy":
await copy_item(op["source_path"], op["target_path"])
op_result["success"] = True
op_result["message"] = "复制成功"
else:
op_result["error"] = f"不支持的操作类型: {op_type}"
except Exception as e:
op_result["error"] = str(e)
results.append(op_result)
return {
"success": True,
"results": results,
"total": len(operations),
"successful": sum(1 for r in results if r["success"])
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"批量操作失败: {str(e)}")