#!/usr/bin/env python3 """ 文件传输服务 - 命令行工具 支持文件上传、文本分享和下载功能 """ import os import sys import json from pathlib import Path from typing import Optional import click import httpx from rich.console import Console from rich.table import Table from rich.progress import Progress, SpinnerColumn, TextColumn from rich.panel import Panel from rich.prompt import Prompt console = Console() # 默认服务器配置 DEFAULT_SERVER = "http://localhost:8000" SERVER_URL = os.getenv("FILESHARE_SERVER", DEFAULT_SERVER) class FileShareClient: def __init__(self, server_url: str = SERVER_URL): self.server_url = server_url.rstrip("/") self.client = httpx.Client(timeout=60.0) def upload_file(self, file_path: Path) -> dict: """上传文件""" if not file_path.exists(): raise FileNotFoundError(f"文件不存在: {file_path}") with open(file_path, "rb") as f: files = {"file": (file_path.name, f, "application/octet-stream")} response = self.client.post(f"{self.server_url}/api/upload", files=files) if response.status_code == 200: return response.json() else: raise Exception(f"上传失败: {response.status_code} - {response.text}") def share_text(self, content: str, filename: str = "shared_text.txt") -> dict: """分享文本""" data = {"content": content, "filename": filename} response = self.client.post(f"{self.server_url}/api/share-text", json=data) if response.status_code == 200: return response.json() else: raise Exception(f"分享失败: {response.status_code} - {response.text}") def download_file(self, code: str, output_dir: Path = Path(".")) -> Path: """下载文件""" # 先获取文件信息 info_response = self.client.get(f"{self.server_url}/api/info/{code}") if info_response.status_code != 200: raise Exception(f"获取文件信息失败: {info_response.status_code}") file_info = info_response.json() filename = file_info["filename"] # 下载文件 response = self.client.get(f"{self.server_url}/api/download/{code}") if response.status_code != 200: raise Exception(f"下载失败: {response.status_code} - {response.text}") # 保存文件 output_path = output_dir / filename # 如果文件存在,添加序号 counter = 1 original_path = output_path while output_path.exists(): stem = original_path.stem suffix = original_path.suffix output_path = output_dir / f"{stem}_{counter}{suffix}" counter += 1 with open(output_path, "wb") as f: f.write(response.content) return output_path def get_info(self, code: str) -> dict: """获取分享信息""" response = self.client.get(f"{self.server_url}/api/info/{code}") if response.status_code == 200: return response.json() else: raise Exception(f"获取信息失败: {response.status_code} - {response.text}") def list_shares(self) -> dict: """列出所有分享""" response = self.client.get(f"{self.server_url}/api/shares") if response.status_code == 200: return response.json() else: raise Exception(f"获取列表失败: {response.status_code} - {response.text}") @click.group() @click.option("--server", default=SERVER_URL, help="服务器地址") @click.pass_context def cli(ctx, server): """文件传输服务命令行工具""" ctx.ensure_object(dict) ctx.obj['client'] = FileShareClient(server) ctx.obj['server'] = server @cli.command() @click.argument("file_path", type=click.Path(exists=True, path_type=Path)) @click.pass_context def upload(ctx, file_path: Path): """上传文件并获取分享码""" client = ctx.obj['client'] try: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task(f"上传文件 {file_path.name}...", total=None) result = client.upload_file(file_path) # 显示结果 panel = Panel.fit( f"[green]✓ 文件上传成功![/green]\n\n" f"[bold]分享码:[/bold] [yellow]{result['code']}[/yellow]\n" f"[bold]过期时间:[/bold] {result['expires_at']}\n" f"[bold]下载链接:[/bold] {ctx.obj['server']}{result['download_url']}", title="上传成功", border_style="green" ) console.print(panel) console.print(f"\n[dim]使用命令下载: [bold]python cli.py download {result['code']}[/bold][/dim]") except Exception as e: console.print(f"[red]❌ 上传失败: {e}[/red]") sys.exit(1) @cli.command() @click.option("--text", "-t", help="要分享的文本内容") @click.option("--file", "-f", "text_file", type=click.Path(exists=True, path_type=Path), help="要分享的文本文件") @click.option("--filename", default="shared_text.txt", help="分享文件名") @click.pass_context def share_text(ctx, text: Optional[str], text_file: Optional[Path], filename: str): """分享文本内容""" client = ctx.obj['client'] # 确定文本内容 if text_file: try: with open(text_file, "r", encoding="utf-8") as f: content = f.read() if not filename.endswith(".txt") and text_file.suffix: filename = f"shared_{text_file.name}" except Exception as e: console.print(f"[red]❌ 读取文件失败: {e}[/red]") sys.exit(1) elif text: content = text else: # 交互式输入 console.print("[blue]请输入要分享的文本内容(按Ctrl+D或Ctrl+Z结束):[/blue]") lines = [] try: while True: line = input() lines.append(line) except EOFError: content = "\n".join(lines) if not content.strip(): console.print("[red]❌ 文本内容不能为空[/red]") sys.exit(1) try: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("分享文本...", total=None) result = client.share_text(content, filename) # 显示结果 panel = Panel.fit( f"[green]✓ 文本分享成功![/green]\n\n" f"[bold]分享码:[/bold] [yellow]{result['code']}[/yellow]\n" f"[bold]文件名:[/bold] {filename}\n" f"[bold]过期时间:[/bold] {result['expires_at']}\n" f"[bold]下载链接:[/bold] {ctx.obj['server']}{result['download_url']}", title="分享成功", border_style="green" ) console.print(panel) console.print(f"\n[dim]使用命令下载: [bold]python cli.py download {result['code']}[/bold][/dim]") except Exception as e: console.print(f"[red]❌ 分享失败: {e}[/red]") sys.exit(1) @cli.command() @click.argument("code") @click.option("--output", "-o", type=click.Path(path_type=Path), default=".", help="输出目录") @click.pass_context def download(ctx, code: str, output: Path): """通过分享码下载文件""" client = ctx.obj['client'] try: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task(f"下载分享码 {code}...", total=None) output_path = client.download_file(code, output) # 显示结果 file_size = output_path.stat().st_size panel = Panel.fit( f"[green]✓ 文件下载成功![/green]\n\n" f"[bold]文件路径:[/bold] {output_path.absolute()}\n" f"[bold]文件大小:[/bold] {file_size:,} 字节", title="下载成功", border_style="green" ) console.print(panel) except Exception as e: console.print(f"[red]❌ 下载失败: {e}[/red]") sys.exit(1) @cli.command() @click.argument("code") @click.pass_context def info(ctx, code: str): """获取分享信息""" client = ctx.obj['client'] try: result = client.get_info(code) # 计算文件大小显示 size = result['size'] if size < 1024: size_str = f"{size} B" elif size < 1024 * 1024: size_str = f"{size / 1024:.1f} KB" else: size_str = f"{size / (1024 * 1024):.1f} MB" # 显示信息 status = "[red]已过期[/red]" if result['is_expired'] else "[green]有效[/green]" panel = Panel.fit( f"[bold]分享码:[/bold] [yellow]{result['code']}[/yellow]\n" f"[bold]文件名:[/bold] {result['filename']}\n" f"[bold]文件类型:[/bold] {result['file_type']}\n" f"[bold]文件大小:[/bold] {size_str}\n" f"[bold]创建时间:[/bold] {result['created_at']}\n" f"[bold]过期时间:[/bold] {result['expires_at']}\n" f"[bold]状态:[/bold] {status}", title="分享信息", border_style="blue" ) console.print(panel) except Exception as e: console.print(f"[red]❌ 获取信息失败: {e}[/red]") sys.exit(1) @cli.command() @click.pass_context def list(ctx): """列出所有分享""" client = ctx.obj['client'] try: result = client.list_shares() if result['total'] == 0: console.print("[yellow]没有找到任何分享[/yellow]") return # 创建表格 table = Table(title=f"所有分享 (共 {result['total']} 个)") table.add_column("分享码", style="yellow", no_wrap=True) table.add_column("文件名", style="blue") table.add_column("类型", style="green") table.add_column("大小", justify="right") table.add_column("剩余时间", justify="center") table.add_column("状态", justify="center") for share in result['shares']: # 计算文件大小显示 size = share['size'] if size < 1024: size_str = f"{size}B" elif size < 1024 * 1024: size_str = f"{size / 1024:.1f}K" else: size_str = f"{size / (1024 * 1024):.1f}M" # 剩余时间 remaining = share.get('remaining_minutes', 0) if remaining > 0: remaining_str = f"{remaining}分钟" else: remaining_str = "已过期" # 状态 status = "[red]过期[/red]" if share['is_expired'] else "[green]有效[/green]" table.add_row( share['code'], share['filename'][:30] + ("..." if len(share['filename']) > 30 else ""), share['file_type'].split('/')[-1] if '/' in share['file_type'] else share['file_type'], size_str, remaining_str, status ) console.print(table) console.print(f"\n[dim]使用 'python cli.py info <分享码>' 查看详细信息[/dim]") except Exception as e: console.print(f"[red]❌ 获取列表失败: {e}[/red]") sys.exit(1) @cli.command() @click.pass_context def server_info(ctx): """获取服务器信息""" client = ctx.obj['client'] try: response = client.client.get(f"{client.server_url}/") if response.status_code == 200: info = response.json() panel = Panel.fit( f"[bold]服务名称:[/bold] {info['service']}\n" f"[bold]版本:[/bold] {info['version']}\n" f"[bold]服务器地址:[/bold] {client.server_url}\n\n" f"[bold]功能特性:[/bold]\n" + "\n".join([f" • {feature}" for feature in info['features']]), title="服务器信息", border_style="blue" ) console.print(panel) else: console.print(f"[red]❌ 无法连接到服务器: {response.status_code}[/red]") except Exception as e: console.print(f"[red]❌ 连接服务器失败: {e}[/red]") console.print(f"[dim]请确保服务器正在运行: {client.server_url}[/dim]") sys.exit(1) if __name__ == "__main__": cli()