#!/usr/bin/env python3 """ ZIP项目处理器 负责处理从URL下载ZIP文件并解压到项目目录的功能 """ import os import hashlib import zipfile import requests import tempfile from typing import List, Optional from urllib.parse import urlparse from pathlib import Path class ZipProjectHandler: """ZIP项目处理器""" def __init__(self, projects_dir: str = "./projects"): self.projects_dir = Path(projects_dir).resolve() self.projects_dir.mkdir(exist_ok=True) self.cache_dir = self.projects_dir / "_cache" self.cache_dir.mkdir(exist_ok=True) def _get_url_hash(self, url: str) -> str: """获取URL的哈希值用于缓存""" return hashlib.md5(url.encode('utf-8')).hexdigest()[:16] def _is_valid_url(self, url: str) -> bool: """验证URL是否有效""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def _download_file(self, url: str, local_path: str) -> bool: """下载文件到本地路径""" try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return True except Exception as e: print(f"下载文件失败: {e}") return False def _extract_zip(self, zip_path: str, extract_to: str) -> bool: """解压ZIP文件到指定目录""" try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_to) return True except Exception as e: print(f"解压ZIP文件失败: {e}") return False def get_project_from_zip(self, zip_url: str) -> Optional[str]: """ 从ZIP URL获取项目数据 Args: zip_url: ZIP文件的URL Returns: Optional[str]: 成功时返回项目目录路径,失败时返回None """ if not self._is_valid_url(zip_url): print(f"无效的URL: {zip_url}") return None # 检查缓存 url_hash = self._get_url_hash(zip_url) cached_project_dir = self.projects_dir / url_hash if cached_project_dir.exists(): print(f"使用缓存的项目目录: {cached_project_dir}") return str(cached_project_dir) # 下载ZIP文件 zip_filename = f"{url_hash}.zip" zip_path = self.cache_dir / zip_filename if not zip_path.exists(): print(f"下载ZIP文件: {zip_url}") if not self._download_file(zip_url, str(zip_path)): return None else: print(f"使用缓存的ZIP文件: {zip_path}") # 解压到项目目录 print(f"解压ZIP文件到: {cached_project_dir}") if not self._extract_zip(str(zip_path), str(cached_project_dir)): return None print(f"项目准备完成: {cached_project_dir}") return str(cached_project_dir) def collect_document_files(self, project_dir: str) -> List[str]: """ 收集项目目录下所有的 document.txt 文件 Args: project_dir: 项目目录路径 Returns: List[str]: 所有 document.txt 文件的完整路径列表 """ document_files = [] project_path = Path(project_dir) if not project_path.exists(): print(f"项目目录不存在: {project_dir}") return document_files # 递归搜索所有 document.txt 文件 for file_path in project_path.rglob("document.txt"): if file_path.is_file(): document_files.append(str(file_path)) print(f"在项目目录 {project_dir} 中找到 {len(document_files)} 个 document.txt 文件") for file_path in document_files[:5]: # 只打印前5个文件路径作为示例 print(f" - {file_path}") if len(document_files) > 5: print(f" ... 还有 {len(document_files) - 5} 个文件") return document_files def cleanup_cache(self): """清理缓存目录""" try: import shutil if self.cache_dir.exists(): shutil.rmtree(self.cache_dir) self.cache_dir.mkdir(exist_ok=True) print("缓存清理完成") except Exception as e: print(f"清理缓存失败: {e}") # 全局ZIP项目处理器实例 zip_handler = ZipProjectHandler()