#!/usr/bin/env python3 """ ZIP项目处理器 负责处理从URL下载ZIP文件并解压到项目目录的功能 """ import os import hashlib import zipfile import requests import tempfile from typing import Optional from urllib.parse import urlparse from pathlib import Path class ZipProjectHandler: """ZIP项目处理器""" def __init__(self, projects_dir: str = "./projects"): self.projects_dir = Path(projects_dir).resolve() self.projects_dir.mkdir(exist_ok=True) self.cache_dir = self.projects_dir / "_cache" self.cache_dir.mkdir(exist_ok=True) def _get_url_hash(self, url: str) -> str: """获取URL的哈希值用于缓存""" return hashlib.md5(url.encode('utf-8')).hexdigest()[:16] def _is_valid_url(self, url: str) -> bool: """验证URL是否有效""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def _download_file(self, url: str, local_path: str) -> bool: """下载文件到本地路径""" try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return True except Exception as e: print(f"下载文件失败: {e}") return False def _extract_zip(self, zip_path: str, extract_to: str) -> bool: """解压ZIP文件到指定目录""" try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_to) return True except Exception as e: print(f"解压ZIP文件失败: {e}") return False def get_project_from_zip(self, zip_url: str) -> Optional[str]: """ 从ZIP URL获取项目数据 Args: zip_url: ZIP文件的URL Returns: Optional[str]: 成功时返回项目目录路径,失败时返回None """ if not self._is_valid_url(zip_url): print(f"无效的URL: {zip_url}") return None # 检查缓存 url_hash = self._get_url_hash(zip_url) cached_project_dir = self.projects_dir / url_hash if cached_project_dir.exists(): print(f"使用缓存的项目目录: {cached_project_dir}") return str(cached_project_dir) # 下载ZIP文件 zip_filename = f"{url_hash}.zip" zip_path = self.cache_dir / zip_filename if not zip_path.exists(): print(f"下载ZIP文件: {zip_url}") if not self._download_file(zip_url, str(zip_path)): return None else: print(f"使用缓存的ZIP文件: {zip_path}") # 解压到项目目录 print(f"解压ZIP文件到: {cached_project_dir}") if not self._extract_zip(str(zip_path), str(cached_project_dir)): return None print(f"项目准备完成: {cached_project_dir}") return str(cached_project_dir) def cleanup_cache(self): """清理缓存目录""" try: import shutil if self.cache_dir.exists(): shutil.rmtree(self.cache_dir) self.cache_dir.mkdir(exist_ok=True) print("缓存清理完成") except Exception as e: print(f"清理缓存失败: {e}") # 全局ZIP项目处理器实例 zip_handler = ZipProjectHandler()