#!/usr/bin/env python3 """ 文件传输服务 - 主应用程序 支持文件和文本分享,生成分享口令,15分钟过期 """ import os import uuid import hashlib import mimetypes from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any import uvicorn from fastapi import FastAPI, UploadFile, File, HTTPException, Request, Form from fastapi.responses import FileResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import asyncio from contextlib import asynccontextmanager # 配置 UPLOAD_DIR = Path("uploads") MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB EXPIRE_MINUTES = 15 HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", 8000)) # 确保目录存在 UPLOAD_DIR.mkdir(exist_ok=True) STATIC_DIR = Path("static") STATIC_DIR.mkdir(exist_ok=True) # 内存存储分享记录 shares: Dict[str, Dict[str, Any]] = {} class TextShare(BaseModel): content: str filename: str = "shared_text.txt" class ShareResponse(BaseModel): code: str expires_at: str download_url: str class ShareInfo(BaseModel): code: str filename: str file_type: str size: int created_at: str expires_at: str is_expired: bool def generate_share_code() -> str: """生成分享口令""" return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()[:8].upper() def is_expired(expires_at: datetime) -> bool: """检查是否过期""" return datetime.now() > expires_at def cleanup_expired_files(): """清理过期文件""" expired_codes = [] for code, share_info in shares.items(): if is_expired(share_info["expires_at"]): expired_codes.append(code) # 删除文件 if share_info["file_path"] and Path(share_info["file_path"]).exists(): try: os.remove(share_info["file_path"]) print(f"删除过期文件: {share_info['file_path']}") except Exception as e: print(f"删除文件失败 {share_info['file_path']}: {e}") # 从内存中删除过期记录 for code in expired_codes: del shares[code] print(f"删除过期分享记录: {code}") async def cleanup_task(): """定期清理任务""" while True: try: cleanup_expired_files() await asyncio.sleep(60) # 每分钟检查一次 except Exception as e: print(f"清理任务错误: {e}") await asyncio.sleep(60) @asynccontextmanager async def lifespan(_: FastAPI): # 启动时创建清理任务 cleanup_task_handle = asyncio.create_task(cleanup_task()) try: yield finally: # 关闭时取消清理任务 cleanup_task_handle.cancel() # 创建FastAPI应用 app = FastAPI( title="文件传输服务", description="支持文件和文本分享的临时传输服务", version="1.0.0", lifespan=lifespan ) # 添加CORS支持 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 挂载静态文件目录 app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/", response_class=HTMLResponse) async def root(): """根路径 - 返回前端页面""" try: with open("static/index.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) except FileNotFoundError: return { "service": "文件传输服务", "version": "1.0.0", "message": "前端页面不存在,请检查static/index.html文件", "features": [ "文件上传分享", "文本分享", "口令下载", "15分钟过期" ], "endpoints": { "upload": "POST /api/upload - 上传文件", "share_text": "POST /api/share-text - 分享文本", "download": "GET /api/download/{code} - 下载文件", "info": "GET /api/info/{code} - 获取分享信息", "list": "GET /api/shares - 列出所有分享" } } @app.get("/api") async def api_info(): """API信息""" return { "service": "文件传输服务 API", "version": "1.0.0", "features": [ "文件上传分享", "文本分享", "口令下载", "15分钟过期" ], "endpoints": { "upload": "POST /api/upload - 上传文件", "share_text": "POST /api/share-text - 分享文本(JSON)", "share_text_simple": "POST /api/text - 分享文本(简化版) ⭐", "share_text_form": "POST /api/share-text-form - 分享文本(表单)", "download": "GET /api/download/{code} - 下载文件", "info": "GET /api/info/{code} - 获取分享信息", "list": "GET /api/shares - 列出所有分享" } } @app.get("/curl") async def curl_guide(): """curl命令使用教程页面""" try: with open("static/curl-guide.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) except FileNotFoundError: return HTMLResponse(content="
请检查static/curl-guide.html文件
", status_code=404) @app.post("/api/upload", response_model=ShareResponse) async def upload_file(file: UploadFile = File(...)): """上传文件并生成分享口令""" # 检查文件大小 if file.size and file.size > MAX_FILE_SIZE: raise HTTPException(status_code=413, detail=f"文件太大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB") # 生成分享码和文件路径 share_code = generate_share_code() file_extension = Path(file.filename or "").suffix safe_filename = f"{share_code}{file_extension}" file_path = UPLOAD_DIR / safe_filename try: # 保存文件 content = await file.read() # 再次检查文件大小 if len(content) > MAX_FILE_SIZE: raise HTTPException(status_code=413, detail=f"文件太大,最大支持 {MAX_FILE_SIZE // 1024 // 1024}MB") with open(file_path, "wb") as f: f.write(content) # 保存分享信息 now = datetime.now() expires_at = now + timedelta(minutes=EXPIRE_MINUTES) shares[share_code] = { "filename": file.filename or "unknown", "file_path": str(file_path), "file_type": file.content_type or "application/octet-stream", "size": len(content), "created_at": now, "expires_at": expires_at, "is_text": False } return ShareResponse( code=share_code, expires_at=expires_at.isoformat(), download_url=f"/api/download/{share_code}" ) except Exception as e: # 清理可能创建的文件 if file_path.exists(): file_path.unlink() raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}") @app.post("/api/share-text", response_model=ShareResponse) async def share_text(text_share: TextShare): """分享文本内容(JSON格式)""" return await _create_text_share(text_share.content, text_share.filename) @app.post("/api/text", response_model=ShareResponse) async def share_text_simple(request: Request): """分享文本内容(简化版 - 直接发送文本)""" try: # 读取请求体作为纯文本 content = (await request.body()).decode('utf-8') if not content.strip(): raise HTTPException(status_code=400, detail="文本内容不能为空") return await _create_text_share(content, "shared_text.txt") except UnicodeDecodeError: raise HTTPException(status_code=400, detail="文本编码错误,请使用UTF-8编码") except Exception as e: raise HTTPException(status_code=500, detail=f"分享失败: {str(e)}") @app.post("/api/share-text-form", response_model=ShareResponse) async def share_text_form(content: str = Form(...), filename: str = Form("shared_text.txt")): """分享文本内容(表单格式)""" if not content or not content.strip(): raise HTTPException(status_code=400, detail="文本内容不能为空") return await _create_text_share(content, filename) async def _create_text_share(content: str, filename: str) -> ShareResponse: """创建文本分享的通用函数""" if not content.strip(): raise HTTPException(status_code=400, detail="文本内容不能为空") # 生成分享码 share_code = generate_share_code() file_path = UPLOAD_DIR / f"{share_code}.txt" try: # 保存文本到文件 with open(file_path, "w", encoding="utf-8") as f: f.write(content) # 保存分享信息 now = datetime.now() expires_at = now + timedelta(minutes=EXPIRE_MINUTES) shares[share_code] = { "filename": filename, "file_path": str(file_path), "file_type": "text/plain", "size": len(content.encode("utf-8")), "created_at": now, "expires_at": expires_at, "is_text": True, "content": content } return ShareResponse( code=share_code, expires_at=expires_at.isoformat(), download_url=f"/api/download/{share_code}" ) except Exception as e: # 清理可能创建的文件 if file_path.exists(): file_path.unlink() raise HTTPException(status_code=500, detail=f"分享失败: {str(e)}") @app.get("/api/download/{code}") async def download_file(code: str): """通过分享码下载文件""" share_info = shares.get(code) if not share_info: raise HTTPException(status_code=404, detail="分享码不存在") if is_expired(share_info["expires_at"]): # 清理过期文件 cleanup_expired_files() raise HTTPException(status_code=410, detail="分享已过期") file_path = Path(share_info["file_path"]) if not file_path.exists(): raise HTTPException(status_code=404, detail="文件不存在") # 获取MIME类型 mime_type, _ = mimetypes.guess_type(str(file_path)) if not mime_type: mime_type = share_info["file_type"] return FileResponse( path=file_path, filename=share_info["filename"], media_type=mime_type ) @app.get("/api/info/{code}", response_model=ShareInfo) async def get_share_info(code: str): """获取分享信息""" share_info = shares.get(code) if not share_info: raise HTTPException(status_code=404, detail="分享码不存在") return ShareInfo( code=code, filename=share_info["filename"], file_type=share_info["file_type"], size=share_info["size"], created_at=share_info["created_at"].isoformat(), expires_at=share_info["expires_at"].isoformat(), is_expired=is_expired(share_info["expires_at"]) ) @app.get("/api/shares") async def list_shares(): """列出所有分享(管理用)""" result = [] current_time = datetime.now() for code, share_info in shares.items(): result.append({ "code": code, "filename": share_info["filename"], "file_type": share_info["file_type"], "size": share_info["size"], "created_at": share_info["created_at"].isoformat(), "expires_at": share_info["expires_at"].isoformat(), "is_expired": is_expired(share_info["expires_at"]), "remaining_minutes": max(0, int((share_info["expires_at"] - current_time).total_seconds() / 60)) }) return { "total": len(result), "shares": result } @app.delete("/api/shares/{code}") async def delete_share(code: str): """删除分享""" share_info = shares.get(code) if not share_info: raise HTTPException(status_code=404, detail="分享码不存在") # 删除文件 file_path = Path(share_info["file_path"]) if file_path.exists(): try: file_path.unlink() except Exception as e: print(f"删除文件失败: {e}") # 删除记录 del shares[code] return {"message": "删除成功", "code": code} @app.post("/api/cleanup") async def manual_cleanup(): """手动清理过期文件""" before_count = len(shares) cleanup_expired_files() after_count = len(shares) return { "message": "清理完成", "removed": before_count - after_count, "remaining": after_count } if __name__ == "__main__": print(f"启动文件传输服务...") print(f"地址: http://{HOST}:{PORT}") print(f"上传目录: {UPLOAD_DIR.absolute()}") print(f"最大文件大小: {MAX_FILE_SIZE // 1024 // 1024}MB") print(f"过期时间: {EXPIRE_MINUTES}分钟") uvicorn.run( "app:app", host=HOST, port=PORT, reload=True, log_level="info" )