419 lines
13 KiB
Python
419 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
文件传输服务 - 主应用程序
|
||
支持文件和文本分享,生成分享口令,15分钟过期
|
||
"""
|
||
import os
|
||
import uuid
|
||
import hashlib
|
||
import mimetypes
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Dict, Any
|
||
|
||
import uvicorn
|
||
from fastapi import FastAPI, UploadFile, File, HTTPException, Request, Form
|
||
from fastapi.responses import FileResponse, HTMLResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
import asyncio
|
||
from contextlib import asynccontextmanager
|
||
|
||
# 配置
|
||
UPLOAD_DIR = Path("uploads")
|
||
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
|
||
EXPIRE_MINUTES = 15
|
||
HOST = os.getenv("HOST", "0.0.0.0")
|
||
PORT = int(os.getenv("PORT", 8000))
|
||
|
||
# 确保目录存在
|
||
UPLOAD_DIR.mkdir(exist_ok=True)
|
||
STATIC_DIR = Path("static")
|
||
STATIC_DIR.mkdir(exist_ok=True)
|
||
|
||
# 内存存储分享记录
|
||
shares: Dict[str, Dict[str, Any]] = {}
|
||
|
||
class TextShare(BaseModel):
|
||
content: str
|
||
filename: str = "shared_text.txt"
|
||
|
||
class ShareResponse(BaseModel):
|
||
code: str
|
||
expires_at: str
|
||
download_url: str
|
||
|
||
class ShareInfo(BaseModel):
|
||
code: str
|
||
filename: str
|
||
file_type: str
|
||
size: int
|
||
created_at: str
|
||
expires_at: str
|
||
is_expired: bool
|
||
|
||
def generate_share_code() -> str:
|
||
"""生成分享口令"""
|
||
return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()[:8].upper()
|
||
|
||
def is_expired(expires_at: datetime) -> bool:
|
||
"""检查是否过期"""
|
||
return datetime.now() > expires_at
|
||
|
||
def cleanup_expired_files():
|
||
"""清理过期文件"""
|
||
expired_codes = []
|
||
|
||
for code, share_info in shares.items():
|
||
if is_expired(share_info["expires_at"]):
|
||
expired_codes.append(code)
|
||
# 删除文件
|
||
if share_info["file_path"] and Path(share_info["file_path"]).exists():
|
||
try:
|
||
os.remove(share_info["file_path"])
|
||
print(f"删除过期文件: {share_info['file_path']}")
|
||
except Exception as e:
|
||
print(f"删除文件失败 {share_info['file_path']}: {e}")
|
||
|
||
# 从内存中删除过期记录
|
||
for code in expired_codes:
|
||
del shares[code]
|
||
print(f"删除过期分享记录: {code}")
|
||
|
||
async def cleanup_task():
|
||
"""定期清理任务"""
|
||
while True:
|
||
try:
|
||
cleanup_expired_files()
|
||
await asyncio.sleep(60) # 每分钟检查一次
|
||
except Exception as e:
|
||
print(f"清理任务错误: {e}")
|
||
await asyncio.sleep(60)
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(_: FastAPI):
|
||
# 启动时创建清理任务
|
||
cleanup_task_handle = asyncio.create_task(cleanup_task())
|
||
try:
|
||
yield
|
||
finally:
|
||
# 关闭时取消清理任务
|
||
cleanup_task_handle.cancel()
|
||
|
||
# 创建FastAPI应用
|
||
app = FastAPI(
|
||
title="文件传输服务",
|
||
description="支持文件和文本分享的临时传输服务",
|
||
version="1.0.0",
|
||
lifespan=lifespan
|
||
)
|
||
|
||
# 添加CORS支持
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# 挂载静态文件目录
|
||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def root():
|
||
"""根路径 - 返回前端页面"""
|
||
try:
|
||
with open("static/index.html", "r", encoding="utf-8") as f:
|
||
return HTMLResponse(content=f.read())
|
||
except FileNotFoundError:
|
||
return {
|
||
"service": "文件传输服务",
|
||
"version": "1.0.0",
|
||
"message": "前端页面不存在,请检查static/index.html文件",
|
||
"features": [
|
||
"文件上传分享",
|
||
"文本分享",
|
||
"口令下载",
|
||
"15分钟过期"
|
||
],
|
||
"endpoints": {
|
||
"upload": "POST /api/upload - 上传文件",
|
||
"share_text": "POST /api/share-text - 分享文本",
|
||
"download": "GET /api/download/{code} - 下载文件",
|
||
"info": "GET /api/info/{code} - 获取分享信息",
|
||
"list": "GET /api/shares - 列出所有分享"
|
||
}
|
||
}
|
||
|
||
@app.get("/api")
|
||
async def api_info():
|
||
"""API信息"""
|
||
return {
|
||
"service": "文件传输服务 API",
|
||
"version": "1.0.0",
|
||
"features": [
|
||
"文件上传分享",
|
||
"文本分享",
|
||
"口令下载",
|
||
"15分钟过期"
|
||
],
|
||
"endpoints": {
|
||
"upload": "POST /api/upload - 上传文件",
|
||
"share_text": "POST /api/share-text - 分享文本(JSON)",
|
||
"share_text_simple": "POST /api/text - 分享文本(简化版) ⭐",
|
||
"share_text_form": "POST /api/share-text-form - 分享文本(表单)",
|
||
"download": "GET /api/download/{code} - 下载文件",
|
||
"info": "GET /api/info/{code} - 获取分享信息",
|
||
"list": "GET /api/shares - 列出所有分享"
|
||
}
|
||
}
|
||
|
||
@app.get("/curl")
|
||
async def curl_guide():
|
||
"""curl命令使用教程页面"""
|
||
try:
|
||
with open("static/curl-guide.html", "r", encoding="utf-8") as f:
|
||
return HTMLResponse(content=f.read())
|
||
except FileNotFoundError:
|
||
return HTMLResponse(content="<h1>curl教程页面不存在</h1><p>请检查static/curl-guide.html文件</p>", status_code=404)
|
||
|
||
@app.post("/api/upload", response_model=ShareResponse)
|
||
async def upload_file(file: UploadFile = File(...)):
|
||
"""上传文件并生成分享口令"""
|
||
|
||
# 检查文件大小
|
||
if file.size and file.size > MAX_FILE_SIZE:
|
||
raise HTTPException(status_code=413, detail=f"文件太大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB")
|
||
|
||
# 生成分享码和文件路径
|
||
share_code = generate_share_code()
|
||
file_extension = Path(file.filename or "").suffix
|
||
safe_filename = f"{share_code}{file_extension}"
|
||
file_path = UPLOAD_DIR / safe_filename
|
||
|
||
try:
|
||
# 保存文件
|
||
content = await file.read()
|
||
|
||
# 再次检查文件大小
|
||
if len(content) > MAX_FILE_SIZE:
|
||
raise HTTPException(status_code=413, detail=f"文件太大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB")
|
||
|
||
with open(file_path, "wb") as f:
|
||
f.write(content)
|
||
|
||
# 保存分享信息
|
||
now = datetime.now()
|
||
expires_at = now + timedelta(minutes=EXPIRE_MINUTES)
|
||
|
||
shares[share_code] = {
|
||
"filename": file.filename or "unknown",
|
||
"file_path": str(file_path),
|
||
"file_type": file.content_type or "application/octet-stream",
|
||
"size": len(content),
|
||
"created_at": now,
|
||
"expires_at": expires_at,
|
||
"is_text": False
|
||
}
|
||
|
||
return ShareResponse(
|
||
code=share_code,
|
||
expires_at=expires_at.isoformat(),
|
||
download_url=f"/api/download/{share_code}"
|
||
)
|
||
|
||
except Exception as e:
|
||
# 清理可能创建的文件
|
||
if file_path.exists():
|
||
file_path.unlink()
|
||
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")
|
||
|
||
@app.post("/api/share-text", response_model=ShareResponse)
|
||
async def share_text(text_share: TextShare):
|
||
"""分享文本内容(JSON格式)"""
|
||
return await _create_text_share(text_share.content, text_share.filename)
|
||
|
||
@app.post("/api/text", response_model=ShareResponse)
|
||
async def share_text_simple(request: Request):
|
||
"""分享文本内容(简化版 - 直接发送文本)"""
|
||
try:
|
||
# 读取请求体作为纯文本
|
||
content = (await request.body()).decode('utf-8')
|
||
|
||
if not content.strip():
|
||
raise HTTPException(status_code=400, detail="文本内容不能为空")
|
||
|
||
return await _create_text_share(content, "shared_text.txt")
|
||
|
||
except UnicodeDecodeError:
|
||
raise HTTPException(status_code=400, detail="文本编码错误,请使用UTF-8编码")
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"分享失败: {str(e)}")
|
||
|
||
@app.post("/api/share-text-form", response_model=ShareResponse)
|
||
async def share_text_form(content: str = Form(...), filename: str = Form("shared_text.txt")):
|
||
"""分享文本内容(表单格式)"""
|
||
if not content or not content.strip():
|
||
raise HTTPException(status_code=400, detail="文本内容不能为空")
|
||
|
||
return await _create_text_share(content, filename)
|
||
|
||
async def _create_text_share(content: str, filename: str) -> ShareResponse:
|
||
"""创建文本分享的通用函数"""
|
||
if not content.strip():
|
||
raise HTTPException(status_code=400, detail="文本内容不能为空")
|
||
|
||
# 生成分享码
|
||
share_code = generate_share_code()
|
||
file_path = UPLOAD_DIR / f"{share_code}.txt"
|
||
|
||
try:
|
||
# 保存文本到文件
|
||
with open(file_path, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
|
||
# 保存分享信息
|
||
now = datetime.now()
|
||
expires_at = now + timedelta(minutes=EXPIRE_MINUTES)
|
||
|
||
shares[share_code] = {
|
||
"filename": filename,
|
||
"file_path": str(file_path),
|
||
"file_type": "text/plain",
|
||
"size": len(content.encode("utf-8")),
|
||
"created_at": now,
|
||
"expires_at": expires_at,
|
||
"is_text": True,
|
||
"content": content
|
||
}
|
||
|
||
return ShareResponse(
|
||
code=share_code,
|
||
expires_at=expires_at.isoformat(),
|
||
download_url=f"/api/download/{share_code}"
|
||
)
|
||
|
||
except Exception as e:
|
||
# 清理可能创建的文件
|
||
if file_path.exists():
|
||
file_path.unlink()
|
||
raise HTTPException(status_code=500, detail=f"分享失败: {str(e)}")
|
||
|
||
@app.get("/api/download/{code}")
|
||
async def download_file(code: str):
|
||
"""通过分享码下载文件"""
|
||
|
||
share_info = shares.get(code)
|
||
if not share_info:
|
||
raise HTTPException(status_code=404, detail="分享码不存在")
|
||
|
||
if is_expired(share_info["expires_at"]):
|
||
# 清理过期文件
|
||
cleanup_expired_files()
|
||
raise HTTPException(status_code=410, detail="分享已过期")
|
||
|
||
file_path = Path(share_info["file_path"])
|
||
if not file_path.exists():
|
||
raise HTTPException(status_code=404, detail="文件不存在")
|
||
|
||
# 获取MIME类型
|
||
mime_type, _ = mimetypes.guess_type(str(file_path))
|
||
if not mime_type:
|
||
mime_type = share_info["file_type"]
|
||
|
||
return FileResponse(
|
||
path=file_path,
|
||
filename=share_info["filename"],
|
||
media_type=mime_type
|
||
)
|
||
|
||
@app.get("/api/info/{code}", response_model=ShareInfo)
|
||
async def get_share_info(code: str):
|
||
"""获取分享信息"""
|
||
|
||
share_info = shares.get(code)
|
||
if not share_info:
|
||
raise HTTPException(status_code=404, detail="分享码不存在")
|
||
|
||
return ShareInfo(
|
||
code=code,
|
||
filename=share_info["filename"],
|
||
file_type=share_info["file_type"],
|
||
size=share_info["size"],
|
||
created_at=share_info["created_at"].isoformat(),
|
||
expires_at=share_info["expires_at"].isoformat(),
|
||
is_expired=is_expired(share_info["expires_at"])
|
||
)
|
||
|
||
@app.get("/api/shares")
|
||
async def list_shares():
|
||
"""列出所有分享(管理用)"""
|
||
result = []
|
||
current_time = datetime.now()
|
||
|
||
for code, share_info in shares.items():
|
||
result.append({
|
||
"code": code,
|
||
"filename": share_info["filename"],
|
||
"file_type": share_info["file_type"],
|
||
"size": share_info["size"],
|
||
"created_at": share_info["created_at"].isoformat(),
|
||
"expires_at": share_info["expires_at"].isoformat(),
|
||
"is_expired": is_expired(share_info["expires_at"]),
|
||
"remaining_minutes": max(0, int((share_info["expires_at"] - current_time).total_seconds() / 60))
|
||
})
|
||
|
||
return {
|
||
"total": len(result),
|
||
"shares": result
|
||
}
|
||
|
||
@app.delete("/api/shares/{code}")
|
||
async def delete_share(code: str):
|
||
"""删除分享"""
|
||
share_info = shares.get(code)
|
||
if not share_info:
|
||
raise HTTPException(status_code=404, detail="分享码不存在")
|
||
|
||
# 删除文件
|
||
file_path = Path(share_info["file_path"])
|
||
if file_path.exists():
|
||
try:
|
||
file_path.unlink()
|
||
except Exception as e:
|
||
print(f"删除文件失败: {e}")
|
||
|
||
# 删除记录
|
||
del shares[code]
|
||
|
||
return {"message": "删除成功", "code": code}
|
||
|
||
@app.post("/api/cleanup")
|
||
async def manual_cleanup():
|
||
"""手动清理过期文件"""
|
||
before_count = len(shares)
|
||
cleanup_expired_files()
|
||
after_count = len(shares)
|
||
|
||
return {
|
||
"message": "清理完成",
|
||
"removed": before_count - after_count,
|
||
"remaining": after_count
|
||
}
|
||
|
||
if __name__ == "__main__":
|
||
print(f"启动文件传输服务...")
|
||
print(f"地址: http://{HOST}:{PORT}")
|
||
print(f"上传目录: {UPLOAD_DIR.absolute()}")
|
||
print(f"最大文件大小: {MAX_FILE_SIZE // 1024 // 1024}MB")
|
||
print(f"过期时间: {EXPIRE_MINUTES}分钟")
|
||
|
||
uvicorn.run(
|
||
"app:app",
|
||
host=HOST,
|
||
port=PORT,
|
||
reload=True,
|
||
log_level="info"
|
||
) |