update file_manager
This commit is contained in:
parent
fe6c4f77d7
commit
e2b4ed64ef
@ -9,7 +9,7 @@ import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from file_manager_api import router as file_manager_router
|
||||
from routes.file_manager import router as file_manager_router
|
||||
from utils.logger import logger
|
||||
|
||||
# Import route modules
|
||||
|
||||
@ -817,7 +817,7 @@
|
||||
<script>
|
||||
// 全局变量
|
||||
const API_BASE = `${window.location.protocol}//${window.location.host}/api/v1`;
|
||||
const FILE_API_BASE = `${window.location.protocol}//${window.location.host}/api/v1/files`;
|
||||
const FILE_API_BASE = `${window.location.protocol}//${window.location.host}/api/v1/file-manager`;
|
||||
let isLoggedIn = false;
|
||||
let currentPage = 'dashboard';
|
||||
let refreshInterval = null;
|
||||
|
||||
691
routes/file_manager.py
Normal file
691
routes/file_manager.py
Normal file
@ -0,0 +1,691 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
文件管理API - WebDAV的HTTP API替代方案
|
||||
提供RESTful接口来管理projects文件夹
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Query
|
||||
from fastapi.responses import FileResponse, StreamingResponse
|
||||
import mimetypes
|
||||
import json
|
||||
import zipfile
|
||||
import tempfile
|
||||
import io
|
||||
import math
|
||||
|
||||
router = APIRouter(prefix="/api/v1/file-manager", tags=["file_management"])
|
||||
|
||||
PROJECTS_DIR = Path("projects")
|
||||
PROJECTS_DIR.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
@router.get("/list")
|
||||
async def list_files(path: str = "", recursive: bool = False):
|
||||
"""
|
||||
列出目录内容
|
||||
|
||||
Args:
|
||||
path: 相对路径,空字符串表示根目录
|
||||
recursive: 是否递归列出所有子目录
|
||||
"""
|
||||
try:
|
||||
target_path = PROJECTS_DIR / path
|
||||
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="路径不存在")
|
||||
|
||||
if not target_path.is_dir():
|
||||
raise HTTPException(status_code=400, detail="路径不是目录")
|
||||
|
||||
def scan_directory(directory: Path, base_path: Path = PROJECTS_DIR) -> List[Dict[str, Any]]:
|
||||
items = []
|
||||
try:
|
||||
for item in directory.iterdir():
|
||||
# 跳过隐藏文件
|
||||
if item.name.startswith('.'):
|
||||
continue
|
||||
|
||||
relative_path = item.relative_to(base_path)
|
||||
stat = item.stat()
|
||||
|
||||
item_info = {
|
||||
"name": item.name,
|
||||
"path": str(relative_path),
|
||||
"type": "directory" if item.is_dir() else "file",
|
||||
"size": stat.st_size if item.is_file() else 0,
|
||||
"modified": stat.st_mtime,
|
||||
"created": stat.st_ctime
|
||||
}
|
||||
|
||||
items.append(item_info)
|
||||
|
||||
# 递归扫描子目录
|
||||
if recursive and item.is_dir():
|
||||
items.extend(scan_directory(item, base_path))
|
||||
|
||||
except PermissionError:
|
||||
pass
|
||||
return items
|
||||
|
||||
items = scan_directory(target_path)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"path": path,
|
||||
"items": items,
|
||||
"total": len(items)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"列出目录失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/upload")
|
||||
async def upload_file(file: UploadFile = File(...), path: str = Form("")):
|
||||
"""
|
||||
上传文件
|
||||
|
||||
Args:
|
||||
file: 上传的文件
|
||||
path: 目标路径(相对于projects目录)
|
||||
"""
|
||||
try:
|
||||
target_dir = PROJECTS_DIR / path
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
file_path = target_dir / file.filename
|
||||
|
||||
# 如果文件已存在,检查是否覆盖
|
||||
if file_path.exists():
|
||||
# 可以添加版本控制或重命名逻辑
|
||||
pass
|
||||
|
||||
with open(file_path, "wb") as buffer:
|
||||
content = await file.read()
|
||||
buffer.write(content)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "文件上传成功",
|
||||
"filename": file.filename,
|
||||
"path": str(Path(path) / file.filename),
|
||||
"size": len(content)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}")
|
||||
|
||||
|
||||
@router.get("/download/{file_path:path}")
|
||||
async def download_file(file_path: str):
|
||||
"""
|
||||
下载文件
|
||||
|
||||
Args:
|
||||
file_path: 文件相对路径
|
||||
"""
|
||||
try:
|
||||
target_path = PROJECTS_DIR / file_path
|
||||
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="文件不存在")
|
||||
|
||||
if not target_path.is_file():
|
||||
raise HTTPException(status_code=400, detail="不是文件")
|
||||
|
||||
# 猜测MIME类型
|
||||
mime_type, _ = mimetypes.guess_type(str(target_path))
|
||||
if mime_type is None:
|
||||
mime_type = "application/octet-stream"
|
||||
|
||||
return FileResponse(
|
||||
path=str(target_path),
|
||||
filename=target_path.name,
|
||||
media_type=mime_type
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"文件下载失败: {str(e)}")
|
||||
|
||||
|
||||
@router.delete("/delete")
|
||||
async def delete_item(path: str):
|
||||
"""
|
||||
删除文件或目录
|
||||
|
||||
Args:
|
||||
path: 要删除的路径
|
||||
"""
|
||||
try:
|
||||
target_path = PROJECTS_DIR / path
|
||||
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="路径不存在")
|
||||
|
||||
if target_path.is_file():
|
||||
target_path.unlink()
|
||||
elif target_path.is_dir():
|
||||
shutil.rmtree(target_path)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"{'文件' if target_path.is_file() else '目录'}删除成功",
|
||||
"path": path
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/create-folder")
|
||||
async def create_folder(path: str, name: str):
|
||||
"""
|
||||
创建文件夹
|
||||
|
||||
Args:
|
||||
path: 父目录路径
|
||||
name: 新文件夹名称
|
||||
"""
|
||||
try:
|
||||
parent_path = PROJECTS_DIR / path
|
||||
parent_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
new_folder = parent_path / name
|
||||
|
||||
if new_folder.exists():
|
||||
raise HTTPException(status_code=400, detail="文件夹已存在")
|
||||
|
||||
new_folder.mkdir()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "文件夹创建成功",
|
||||
"path": str(Path(path) / name)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"创建文件夹失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/rename")
|
||||
async def rename_item(old_path: str, new_name: str):
|
||||
"""
|
||||
重命名文件或文件夹
|
||||
|
||||
Args:
|
||||
old_path: 原路径
|
||||
new_name: 新名称
|
||||
"""
|
||||
try:
|
||||
old_full_path = PROJECTS_DIR / old_path
|
||||
|
||||
if not old_full_path.exists():
|
||||
raise HTTPException(status_code=404, detail="文件或目录不存在")
|
||||
|
||||
new_full_path = old_full_path.parent / new_name
|
||||
|
||||
if new_full_path.exists():
|
||||
raise HTTPException(status_code=400, detail="目标名称已存在")
|
||||
|
||||
old_full_path.rename(new_full_path)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "重命名成功",
|
||||
"old_path": old_path,
|
||||
"new_path": str(new_full_path.relative_to(PROJECTS_DIR))
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"重命名失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/move")
|
||||
async def move_item(source_path: str, target_path: str):
|
||||
"""
|
||||
移动文件或文件夹
|
||||
|
||||
Args:
|
||||
source_path: 源路径
|
||||
target_path: 目标路径
|
||||
"""
|
||||
try:
|
||||
source_full_path = PROJECTS_DIR / source_path
|
||||
target_full_path = PROJECTS_DIR / target_path
|
||||
|
||||
if not source_full_path.exists():
|
||||
raise HTTPException(status_code=404, detail="源文件或目录不存在")
|
||||
|
||||
# 确保目标目录存在
|
||||
target_full_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
shutil.move(str(source_full_path), str(target_full_path))
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "移动成功",
|
||||
"source_path": source_path,
|
||||
"target_path": target_path
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"移动失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/copy")
|
||||
async def copy_item(source_path: str, target_path: str):
|
||||
"""
|
||||
复制文件或文件夹
|
||||
|
||||
Args:
|
||||
source_path: 源路径
|
||||
target_path: 目标路径
|
||||
"""
|
||||
try:
|
||||
source_full_path = PROJECTS_DIR / source_path
|
||||
target_full_path = PROJECTS_DIR / target_path
|
||||
|
||||
if not source_full_path.exists():
|
||||
raise HTTPException(status_code=404, detail="源文件或目录不存在")
|
||||
|
||||
# 确保目标目录存在
|
||||
target_full_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if source_full_path.is_file():
|
||||
shutil.copy2(str(source_full_path), str(target_full_path))
|
||||
else:
|
||||
shutil.copytree(str(source_full_path), str(target_full_path))
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "复制成功",
|
||||
"source_path": source_path,
|
||||
"target_path": target_path
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"复制失败: {str(e)}")
|
||||
|
||||
|
||||
@router.get("/search")
|
||||
async def search_files(query: str, path: str = "", file_type: Optional[str] = None):
|
||||
"""
|
||||
搜索文件
|
||||
|
||||
Args:
|
||||
query: 搜索关键词
|
||||
path: 搜索路径
|
||||
file_type: 文件类型过滤
|
||||
"""
|
||||
try:
|
||||
search_path = PROJECTS_DIR / path
|
||||
|
||||
if not search_path.exists():
|
||||
raise HTTPException(status_code=404, detail="搜索路径不存在")
|
||||
|
||||
results = []
|
||||
|
||||
def scan_for_files(directory: Path):
|
||||
try:
|
||||
for item in directory.iterdir():
|
||||
if item.name.startswith('.'):
|
||||
continue
|
||||
|
||||
# 检查文件名是否包含关键词
|
||||
if query.lower() in item.name.lower():
|
||||
# 检查文件类型过滤
|
||||
if file_type:
|
||||
if item.suffix.lower() == file_type.lower():
|
||||
results.append({
|
||||
"name": item.name,
|
||||
"path": str(item.relative_to(PROJECTS_DIR)),
|
||||
"type": "directory" if item.is_dir() else "file"
|
||||
})
|
||||
else:
|
||||
results.append({
|
||||
"name": item.name,
|
||||
"path": str(item.relative_to(PROJECTS_DIR)),
|
||||
"type": "directory" if item.is_dir() else "file"
|
||||
})
|
||||
|
||||
# 递归搜索子目录
|
||||
if item.is_dir():
|
||||
scan_for_files(item)
|
||||
|
||||
except PermissionError:
|
||||
pass
|
||||
|
||||
scan_for_files(search_path)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"query": query,
|
||||
"path": path,
|
||||
"results": results,
|
||||
"total": len(results)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"搜索失败: {str(e)}")
|
||||
|
||||
|
||||
@router.get("/info/{file_path:path}")
|
||||
async def get_file_info(file_path: str):
|
||||
"""
|
||||
获取文件或文件夹详细信息
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
"""
|
||||
try:
|
||||
target_path = PROJECTS_DIR / file_path
|
||||
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="路径不存在")
|
||||
|
||||
stat = target_path.stat()
|
||||
|
||||
info = {
|
||||
"name": target_path.name,
|
||||
"path": file_path,
|
||||
"type": "directory" if target_path.is_dir() else "file",
|
||||
"size": stat.st_size if target_path.is_file() else 0,
|
||||
"modified": stat.st_mtime,
|
||||
"created": stat.st_ctime,
|
||||
"permissions": oct(stat.st_mode)[-3:]
|
||||
}
|
||||
|
||||
# 如果是文件,添加额外信息
|
||||
if target_path.is_file():
|
||||
mime_type, _ = mimetypes.guess_type(str(target_path))
|
||||
info["mime_type"] = mime_type or "unknown"
|
||||
|
||||
# 读取文件内容预览(仅对小文件)
|
||||
if stat.st_size < 1024 * 1024: # 小于1MB
|
||||
try:
|
||||
with open(target_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read(1000) # 读取前1000字符
|
||||
info["preview"] = content
|
||||
except:
|
||||
info["preview"] = "[二进制文件,无法预览]"
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"info": info
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"获取文件信息失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/download-folder-zip")
|
||||
async def download_folder_as_zip(request: Dict[str, str]):
|
||||
"""
|
||||
将文件夹压缩为ZIP并下载
|
||||
|
||||
Args:
|
||||
request: 包含path字段的JSON对象
|
||||
"""
|
||||
try:
|
||||
folder_path = request.get("path", "")
|
||||
|
||||
if not folder_path:
|
||||
raise HTTPException(status_code=400, detail="路径不能为空")
|
||||
|
||||
target_path = PROJECTS_DIR / folder_path
|
||||
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="文件夹不存在")
|
||||
|
||||
if not target_path.is_dir():
|
||||
raise HTTPException(status_code=400, detail="路径不是文件夹")
|
||||
|
||||
# 计算文件夹大小,检查是否过大
|
||||
total_size = 0
|
||||
file_count = 0
|
||||
for file_path in target_path.rglob('*'):
|
||||
if file_path.is_file():
|
||||
total_size += file_path.stat().st_size
|
||||
file_count += 1
|
||||
|
||||
# 限制最大500MB
|
||||
max_size = 500 * 1024 * 1024
|
||||
if total_size > max_size:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"文件夹过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}"
|
||||
)
|
||||
|
||||
# 限制文件数量
|
||||
max_files = 10000
|
||||
if file_count > max_files:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件"
|
||||
)
|
||||
|
||||
# 创建ZIP文件名
|
||||
folder_name = target_path.name
|
||||
zip_filename = f"{folder_name}.zip"
|
||||
|
||||
# 创建ZIP文件
|
||||
zip_buffer = io.BytesIO()
|
||||
|
||||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
|
||||
# 添加文件夹中的所有文件
|
||||
for file_path in target_path.rglob('*'):
|
||||
if file_path.is_file():
|
||||
try:
|
||||
# 计算相对路径,保持文件夹结构
|
||||
arcname = file_path.relative_to(target_path)
|
||||
zipf.write(file_path, arcname)
|
||||
except (OSError, IOError) as e:
|
||||
# 跳过无法读取的文件,但记录警告
|
||||
print(f"Warning: Skipping file {file_path}: {e}")
|
||||
continue
|
||||
|
||||
zip_buffer.seek(0)
|
||||
|
||||
# 设置响应头
|
||||
headers = {
|
||||
'Content-Disposition': f'attachment; filename="{zip_filename}"',
|
||||
'Content-Type': 'application/zip',
|
||||
'Content-Length': str(len(zip_buffer.getvalue())),
|
||||
'Cache-Control': 'no-cache'
|
||||
}
|
||||
|
||||
# 创建流式响应
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
async def generate_zip():
|
||||
zip_buffer.seek(0)
|
||||
yield zip_buffer.getvalue()
|
||||
|
||||
return StreamingResponse(
|
||||
generate_zip(),
|
||||
media_type="application/zip",
|
||||
headers=headers
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"压缩文件夹失败: {str(e)}")
|
||||
|
||||
|
||||
@router.post("/download-multiple-zip")
|
||||
async def download_multiple_items_as_zip(request: Dict[str, Any]):
|
||||
"""
|
||||
将多个文件和文件夹压缩为ZIP并下载
|
||||
|
||||
Args:
|
||||
request: 包含paths和filename字段的JSON对象
|
||||
"""
|
||||
try:
|
||||
paths = request.get("paths", [])
|
||||
filename = request.get("filename", "batch_download.zip")
|
||||
|
||||
if not paths:
|
||||
raise HTTPException(status_code=400, detail="请选择要下载的文件")
|
||||
|
||||
# 验证所有路径
|
||||
valid_paths = []
|
||||
total_size = 0
|
||||
file_count = 0
|
||||
|
||||
for path in paths:
|
||||
target_path = PROJECTS_DIR / path
|
||||
|
||||
if not target_path.exists():
|
||||
continue # 跳过不存在的文件
|
||||
|
||||
if target_path.is_file():
|
||||
total_size += target_path.stat().st_size
|
||||
file_count += 1
|
||||
valid_paths.append(path)
|
||||
elif target_path.is_dir():
|
||||
# 计算文件夹大小
|
||||
for file_path in target_path.rglob('*'):
|
||||
if file_path.is_file():
|
||||
total_size += file_path.stat().st_size
|
||||
file_count += 1
|
||||
valid_paths.append(path)
|
||||
|
||||
if not valid_paths:
|
||||
raise HTTPException(status_code=404, detail="没有找到有效的文件")
|
||||
|
||||
# 限制大小
|
||||
max_size = 500 * 1024 * 1024
|
||||
if total_size > max_size:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"选中文件过大 ({formatFileSize(total_size)}),最大支持 {formatFileSize(max_size)}"
|
||||
)
|
||||
|
||||
# 限制文件数量
|
||||
max_files = 10000
|
||||
if file_count > max_files:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"文件数量过多 ({file_count}),最大支持 {max_files} 个文件"
|
||||
)
|
||||
|
||||
# 创建ZIP文件
|
||||
zip_buffer = io.BytesIO()
|
||||
|
||||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
|
||||
for path in valid_paths:
|
||||
target_path = PROJECTS_DIR / path
|
||||
|
||||
if target_path.is_file():
|
||||
# 单个文件
|
||||
try:
|
||||
zipf.write(target_path, target_path.name)
|
||||
except (OSError, IOError) as e:
|
||||
print(f"Warning: Skipping file {target_path}: {e}")
|
||||
continue
|
||||
elif target_path.is_dir():
|
||||
# 文件夹
|
||||
for file_path in target_path.rglob('*'):
|
||||
if file_path.is_file():
|
||||
try:
|
||||
# 保持相对路径结构
|
||||
arcname = f"{target_path.name}/{file_path.relative_to(target_path)}"
|
||||
zipf.write(file_path, arcname)
|
||||
except (OSError, IOError) as e:
|
||||
print(f"Warning: Skipping file {file_path}: {e}")
|
||||
continue
|
||||
|
||||
zip_buffer.seek(0)
|
||||
|
||||
# 设置响应头
|
||||
headers = {
|
||||
'Content-Disposition': f'attachment; filename="{filename}"',
|
||||
'Content-Type': 'application/zip',
|
||||
'Content-Length': str(len(zip_buffer.getvalue())),
|
||||
'Cache-Control': 'no-cache'
|
||||
}
|
||||
|
||||
# 创建流式响应
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
async def generate_zip():
|
||||
zip_buffer.seek(0)
|
||||
yield zip_buffer.getvalue()
|
||||
|
||||
return StreamingResponse(
|
||||
generate_zip(),
|
||||
media_type="application/zip",
|
||||
headers=headers
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"批量压缩失败: {str(e)}")
|
||||
|
||||
|
||||
def formatFileSize(bytes_size: int) -> str:
|
||||
"""格式化文件大小"""
|
||||
if bytes_size == 0:
|
||||
return "0 B"
|
||||
|
||||
k = 1024
|
||||
sizes = ["B", "KB", "MB", "GB", "TB"]
|
||||
i = int(math.floor(math.log(bytes_size, k)))
|
||||
|
||||
if i >= len(sizes):
|
||||
i = len(sizes) - 1
|
||||
|
||||
return f"{bytes_size / math.pow(k, i):.1f} {sizes[i]}"
|
||||
|
||||
|
||||
@router.post("/batch-operation")
|
||||
async def batch_operation(operations: List[Dict[str, Any]]):
|
||||
"""
|
||||
批量操作多个文件
|
||||
|
||||
Args:
|
||||
operations: 操作列表,每个操作包含type和相应的参数
|
||||
"""
|
||||
try:
|
||||
results = []
|
||||
|
||||
for op in operations:
|
||||
op_type = op.get("type")
|
||||
op_result = {"type": op_type, "success": False}
|
||||
|
||||
try:
|
||||
if op_type == "delete":
|
||||
await delete_item(op["path"])
|
||||
op_result["success"] = True
|
||||
op_result["message"] = "删除成功"
|
||||
elif op_type == "move":
|
||||
await move_item(op["source_path"], op["target_path"])
|
||||
op_result["success"] = True
|
||||
op_result["message"] = "移动成功"
|
||||
elif op_type == "copy":
|
||||
await copy_item(op["source_path"], op["target_path"])
|
||||
op_result["success"] = True
|
||||
op_result["message"] = "复制成功"
|
||||
else:
|
||||
op_result["error"] = f"不支持的操作类型: {op_type}"
|
||||
|
||||
except Exception as e:
|
||||
op_result["error"] = str(e)
|
||||
|
||||
results.append(op_result)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"results": results,
|
||||
"total": len(operations),
|
||||
"successful": sum(1 for r in results if r["success"])
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"批量操作失败: {str(e)}")
|
||||
66
test_routes.py
Normal file
66
test_routes.py
Normal file
@ -0,0 +1,66 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
简单的路由配置测试脚本
|
||||
"""
|
||||
|
||||
def test_route_configurations():
|
||||
"""测试路由配置是否正确"""
|
||||
|
||||
# 测试文件内容
|
||||
test_files = {
|
||||
'fastapi_app.py': [
|
||||
'from routes.file_manager import router as file_manager_router',
|
||||
'app.include_router(file_manager_router)'
|
||||
],
|
||||
'routes/file_manager.py': [
|
||||
'router = APIRouter(prefix="/api/v1/file-manager"'
|
||||
],
|
||||
'public/admin.html': [
|
||||
"const FILE_API_BASE = `${window.location.protocol}//${window.location.host}/api/v1/file-manager`"
|
||||
]
|
||||
}
|
||||
|
||||
print("🔍 检查路由配置...")
|
||||
print("=" * 50)
|
||||
|
||||
all_good = True
|
||||
|
||||
for file_path, expected_lines in test_files.items():
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
print(f"\n📄 {file_path}:")
|
||||
|
||||
for expected_line in expected_lines:
|
||||
if expected_line in content:
|
||||
print(f" ✅ {expected_line}")
|
||||
else:
|
||||
print(f" ❌ 缺失: {expected_line}")
|
||||
all_good = False
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f" ❌ 文件不存在: {file_path}")
|
||||
all_good = False
|
||||
except Exception as e:
|
||||
print(f" ❌ 读取错误: {e}")
|
||||
all_good = False
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
if all_good:
|
||||
print("✅ 所有路由配置检查通过!")
|
||||
print("📋 API 端点总结:")
|
||||
print(" • 聊天API: /api/v1/chat/completions, /api/v2/chat/completions")
|
||||
print(" • 文件处理: /api/v1/files/*")
|
||||
print(" • 项目管理: /api/v1/projects/*")
|
||||
print(" • 系统管理: /api/health, /api/v1/system/*")
|
||||
print(" • 文件管理器: /api/v1/file-manager/* (新)")
|
||||
print("\n📱 管理后台文件:")
|
||||
print(" • public/admin.html (已更新 FILE_API_BASE)")
|
||||
else:
|
||||
print("❌ 发现配置问题,需要修复")
|
||||
|
||||
return all_good
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_route_configurations()
|
||||
Loading…
Reference in New Issue
Block a user