fileshare/app.py
2025-08-10 12:57:17 +08:00

419 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
文件传输服务 - 主应用程序
支持文件和文本分享生成分享口令15分钟过期
"""
import os
import uuid
import hashlib
import mimetypes
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any
import uvicorn
from fastapi import FastAPI, UploadFile, File, HTTPException, Request, Form
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import asyncio
from contextlib import asynccontextmanager
# 配置
UPLOAD_DIR = Path("uploads")
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB
EXPIRE_MINUTES = 15
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", 8000))
# 确保目录存在
UPLOAD_DIR.mkdir(exist_ok=True)
STATIC_DIR = Path("static")
STATIC_DIR.mkdir(exist_ok=True)
# 内存存储分享记录
shares: Dict[str, Dict[str, Any]] = {}
class TextShare(BaseModel):
content: str
filename: str = "shared_text.txt"
class ShareResponse(BaseModel):
code: str
expires_at: str
download_url: str
class ShareInfo(BaseModel):
code: str
filename: str
file_type: str
size: int
created_at: str
expires_at: str
is_expired: bool
def generate_share_code() -> str:
"""生成分享口令"""
return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()[:8].upper()
def is_expired(expires_at: datetime) -> bool:
"""检查是否过期"""
return datetime.now() > expires_at
def cleanup_expired_files():
"""清理过期文件"""
expired_codes = []
for code, share_info in shares.items():
if is_expired(share_info["expires_at"]):
expired_codes.append(code)
# 删除文件
if share_info["file_path"] and Path(share_info["file_path"]).exists():
try:
os.remove(share_info["file_path"])
print(f"删除过期文件: {share_info['file_path']}")
except Exception as e:
print(f"删除文件失败 {share_info['file_path']}: {e}")
# 从内存中删除过期记录
for code in expired_codes:
del shares[code]
print(f"删除过期分享记录: {code}")
async def cleanup_task():
"""定期清理任务"""
while True:
try:
cleanup_expired_files()
await asyncio.sleep(60) # 每分钟检查一次
except Exception as e:
print(f"清理任务错误: {e}")
await asyncio.sleep(60)
@asynccontextmanager
async def lifespan(_: FastAPI):
# 启动时创建清理任务
cleanup_task_handle = asyncio.create_task(cleanup_task())
try:
yield
finally:
# 关闭时取消清理任务
cleanup_task_handle.cancel()
# 创建FastAPI应用
app = FastAPI(
title="文件传输服务",
description="支持文件和文本分享的临时传输服务",
version="1.0.0",
lifespan=lifespan
)
# 添加CORS支持
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def root():
"""根路径 - 返回前端页面"""
try:
with open("static/index.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
except FileNotFoundError:
return {
"service": "文件传输服务",
"version": "1.0.0",
"message": "前端页面不存在请检查static/index.html文件",
"features": [
"文件上传分享",
"文本分享",
"口令下载",
"15分钟过期"
],
"endpoints": {
"upload": "POST /api/upload - 上传文件",
"share_text": "POST /api/share-text - 分享文本",
"download": "GET /api/download/{code} - 下载文件",
"info": "GET /api/info/{code} - 获取分享信息",
"list": "GET /api/shares - 列出所有分享"
}
}
@app.get("/api")
async def api_info():
"""API信息"""
return {
"service": "文件传输服务 API",
"version": "1.0.0",
"features": [
"文件上传分享",
"文本分享",
"口令下载",
"15分钟过期"
],
"endpoints": {
"upload": "POST /api/upload - 上传文件",
"share_text": "POST /api/share-text - 分享文本(JSON)",
"share_text_simple": "POST /api/text - 分享文本(简化版) ⭐",
"share_text_form": "POST /api/share-text-form - 分享文本(表单)",
"download": "GET /api/download/{code} - 下载文件",
"info": "GET /api/info/{code} - 获取分享信息",
"list": "GET /api/shares - 列出所有分享"
}
}
@app.get("/curl")
async def curl_guide():
"""curl命令使用教程页面"""
try:
with open("static/curl-guide.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
except FileNotFoundError:
return HTMLResponse(content="<h1>curl教程页面不存在</h1><p>请检查static/curl-guide.html文件</p>", status_code=404)
@app.post("/api/upload", response_model=ShareResponse)
async def upload_file(file: UploadFile = File(...)):
"""上传文件并生成分享口令"""
# 检查文件大小
if file.size and file.size > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail=f"文件太大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB")
# 生成分享码和文件路径
share_code = generate_share_code()
file_extension = Path(file.filename or "").suffix
safe_filename = f"{share_code}{file_extension}"
file_path = UPLOAD_DIR / safe_filename
try:
# 保存文件
content = await file.read()
# 再次检查文件大小
if len(content) > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail=f"文件太大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB")
with open(file_path, "wb") as f:
f.write(content)
# 保存分享信息
now = datetime.now()
expires_at = now + timedelta(minutes=EXPIRE_MINUTES)
shares[share_code] = {
"filename": file.filename or "unknown",
"file_path": str(file_path),
"file_type": file.content_type or "application/octet-stream",
"size": len(content),
"created_at": now,
"expires_at": expires_at,
"is_text": False
}
return ShareResponse(
code=share_code,
expires_at=expires_at.isoformat(),
download_url=f"/api/download/{share_code}"
)
except Exception as e:
# 清理可能创建的文件
if file_path.exists():
file_path.unlink()
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")
@app.post("/api/share-text", response_model=ShareResponse)
async def share_text(text_share: TextShare):
"""分享文本内容JSON格式"""
return await _create_text_share(text_share.content, text_share.filename)
@app.post("/api/text", response_model=ShareResponse)
async def share_text_simple(request: Request):
"""分享文本内容(简化版 - 直接发送文本)"""
try:
# 读取请求体作为纯文本
content = (await request.body()).decode('utf-8')
if not content.strip():
raise HTTPException(status_code=400, detail="文本内容不能为空")
return await _create_text_share(content, "shared_text.txt")
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="文本编码错误请使用UTF-8编码")
except Exception as e:
raise HTTPException(status_code=500, detail=f"分享失败: {str(e)}")
@app.post("/api/share-text-form", response_model=ShareResponse)
async def share_text_form(content: str = Form(...), filename: str = Form("shared_text.txt")):
"""分享文本内容(表单格式)"""
if not content or not content.strip():
raise HTTPException(status_code=400, detail="文本内容不能为空")
return await _create_text_share(content, filename)
async def _create_text_share(content: str, filename: str) -> ShareResponse:
"""创建文本分享的通用函数"""
if not content.strip():
raise HTTPException(status_code=400, detail="文本内容不能为空")
# 生成分享码
share_code = generate_share_code()
file_path = UPLOAD_DIR / f"{share_code}.txt"
try:
# 保存文本到文件
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
# 保存分享信息
now = datetime.now()
expires_at = now + timedelta(minutes=EXPIRE_MINUTES)
shares[share_code] = {
"filename": filename,
"file_path": str(file_path),
"file_type": "text/plain",
"size": len(content.encode("utf-8")),
"created_at": now,
"expires_at": expires_at,
"is_text": True,
"content": content
}
return ShareResponse(
code=share_code,
expires_at=expires_at.isoformat(),
download_url=f"/api/download/{share_code}"
)
except Exception as e:
# 清理可能创建的文件
if file_path.exists():
file_path.unlink()
raise HTTPException(status_code=500, detail=f"分享失败: {str(e)}")
@app.get("/api/download/{code}")
async def download_file(code: str):
"""通过分享码下载文件"""
share_info = shares.get(code)
if not share_info:
raise HTTPException(status_code=404, detail="分享码不存在")
if is_expired(share_info["expires_at"]):
# 清理过期文件
cleanup_expired_files()
raise HTTPException(status_code=410, detail="分享已过期")
file_path = Path(share_info["file_path"])
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
# 获取MIME类型
mime_type, _ = mimetypes.guess_type(str(file_path))
if not mime_type:
mime_type = share_info["file_type"]
return FileResponse(
path=file_path,
filename=share_info["filename"],
media_type=mime_type
)
@app.get("/api/info/{code}", response_model=ShareInfo)
async def get_share_info(code: str):
"""获取分享信息"""
share_info = shares.get(code)
if not share_info:
raise HTTPException(status_code=404, detail="分享码不存在")
return ShareInfo(
code=code,
filename=share_info["filename"],
file_type=share_info["file_type"],
size=share_info["size"],
created_at=share_info["created_at"].isoformat(),
expires_at=share_info["expires_at"].isoformat(),
is_expired=is_expired(share_info["expires_at"])
)
@app.get("/api/shares")
async def list_shares():
"""列出所有分享(管理用)"""
result = []
current_time = datetime.now()
for code, share_info in shares.items():
result.append({
"code": code,
"filename": share_info["filename"],
"file_type": share_info["file_type"],
"size": share_info["size"],
"created_at": share_info["created_at"].isoformat(),
"expires_at": share_info["expires_at"].isoformat(),
"is_expired": is_expired(share_info["expires_at"]),
"remaining_minutes": max(0, int((share_info["expires_at"] - current_time).total_seconds() / 60))
})
return {
"total": len(result),
"shares": result
}
@app.delete("/api/shares/{code}")
async def delete_share(code: str):
"""删除分享"""
share_info = shares.get(code)
if not share_info:
raise HTTPException(status_code=404, detail="分享码不存在")
# 删除文件
file_path = Path(share_info["file_path"])
if file_path.exists():
try:
file_path.unlink()
except Exception as e:
print(f"删除文件失败: {e}")
# 删除记录
del shares[code]
return {"message": "删除成功", "code": code}
@app.post("/api/cleanup")
async def manual_cleanup():
"""手动清理过期文件"""
before_count = len(shares)
cleanup_expired_files()
after_count = len(shares)
return {
"message": "清理完成",
"removed": before_count - after_count,
"remaining": after_count
}
if __name__ == "__main__":
print(f"启动文件传输服务...")
print(f"地址: http://{HOST}:{PORT}")
print(f"上传目录: {UPLOAD_DIR.absolute()}")
print(f"最大文件大小: {MAX_FILE_SIZE // 1024 // 1024}MB")
print(f"过期时间: {EXPIRE_MINUTES}分钟")
uvicorn.run(
"app:app",
host=HOST,
port=PORT,
reload=True,
log_level="info"
)