195 lines
6.8 KiB
Python
195 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
ZIP项目处理器
|
||
负责处理从URL下载ZIP文件并解压到项目目录的功能
|
||
"""
|
||
|
||
import os
|
||
import hashlib
|
||
import zipfile
|
||
import requests
|
||
import tempfile
|
||
import logging
|
||
from typing import List, Optional
|
||
from urllib.parse import urlparse
|
||
from pathlib import Path
|
||
|
||
# 配置日志
|
||
logger = logging.getLogger('app')
|
||
|
||
|
||
class ZipProjectHandler:
|
||
"""ZIP项目处理器"""
|
||
|
||
def __init__(self, projects_dir: str = "./projects"):
|
||
self.projects_dir = Path(projects_dir).resolve()
|
||
self.projects_dir.mkdir(exist_ok=True)
|
||
self.cache_dir = self.projects_dir / "_cache"
|
||
self.cache_dir.mkdir(exist_ok=True)
|
||
|
||
def _get_url_hash(self, url: str) -> str:
|
||
"""获取URL的哈希值用于缓存"""
|
||
return hashlib.md5(url.encode('utf-8')).hexdigest()[:16]
|
||
|
||
def _is_valid_url_or_path(self, path: str) -> bool:
|
||
"""验证URL或本地路径是否有效"""
|
||
# 首先尝试作为URL验证
|
||
try:
|
||
result = urlparse(path)
|
||
if all([result.scheme, result.netloc]):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
# 然后尝试作为本地路径验证
|
||
try:
|
||
return Path(path).exists()
|
||
except Exception:
|
||
return False
|
||
|
||
def _download_file(self, url: str, local_path: str) -> bool:
|
||
"""下载文件到本地路径"""
|
||
try:
|
||
response = requests.get(url, stream=True, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
with open(local_path, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"下载文件失败: {e}")
|
||
return False
|
||
|
||
def _copy_local_file(self, local_path: str, target_path: str) -> bool:
|
||
"""复制本地文件到目标路径"""
|
||
try:
|
||
import shutil
|
||
shutil.copy2(local_path, target_path)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"复制本地文件失败: {e}")
|
||
return False
|
||
|
||
def _extract_zip(self, zip_path: str, extract_to: str) -> bool:
|
||
"""解压ZIP文件到指定目录"""
|
||
try:
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
zip_ref.extractall(extract_to)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"解压ZIP文件失败: {e}")
|
||
return False
|
||
|
||
def get_project_from_zip(self, zip_url: str, unique_id: Optional[str] = None) -> Optional[str]:
|
||
"""
|
||
从ZIP URL或本地路径获取项目数据
|
||
|
||
Args:
|
||
zip_url: ZIP文件的URL或本地相对路径
|
||
unique_id: 可选的唯一标识符,用作文件夹名称
|
||
|
||
Returns:
|
||
Optional[str]: 成功时返回项目目录路径,失败时返回None
|
||
"""
|
||
if not self._is_valid_url_or_path(zip_url):
|
||
logger.error(f"无效的URL或路径: {zip_url}")
|
||
return None
|
||
|
||
# 使用unique_id作为目录名,如果没有则使用url_hash
|
||
if unique_id:
|
||
project_dir_name = unique_id
|
||
# 当使用unique_id时,不检查缓存,直接重新解压以确保项目结构正确
|
||
cached_project_dir = self.projects_dir / project_dir_name
|
||
else:
|
||
project_dir_name = self._get_url_hash(zip_url)
|
||
cached_project_dir = self.projects_dir / project_dir_name
|
||
|
||
if cached_project_dir.exists() and not unique_id:
|
||
logger.info(f"使用缓存的项目目录: {cached_project_dir}")
|
||
return str(cached_project_dir)
|
||
|
||
# 下载或复制ZIP文件
|
||
url_hash = self._get_url_hash(zip_url)
|
||
# 当使用unique_id时,使用unique_id作为ZIP文件名前缀以避免冲突
|
||
if unique_id:
|
||
zip_filename = f"{unique_id}_{url_hash}.zip"
|
||
else:
|
||
zip_filename = f"{url_hash}.zip"
|
||
zip_path = self.cache_dir / zip_filename
|
||
|
||
if not zip_path.exists():
|
||
# 判断是URL还是本地路径
|
||
try:
|
||
result = urlparse(zip_url)
|
||
is_url = all([result.scheme, result.netloc])
|
||
except Exception:
|
||
is_url = False
|
||
|
||
if is_url:
|
||
logger.info(f"下载ZIP文件: {zip_url}")
|
||
if not self._download_file(zip_url, str(zip_path)):
|
||
return None
|
||
else:
|
||
logger.info(f"复制本地ZIP文件: {zip_url}")
|
||
# 解析相对路径
|
||
local_path = Path(zip_url).resolve()
|
||
if not self._copy_local_file(str(local_path), str(zip_path)):
|
||
return None
|
||
else:
|
||
logger.info(f"使用缓存的ZIP文件: {zip_path}")
|
||
|
||
# 解压到项目目录
|
||
logger.info(f"解压ZIP文件到: {cached_project_dir}")
|
||
if not self._extract_zip(str(zip_path), str(cached_project_dir)):
|
||
return None
|
||
|
||
logger.info(f"项目准备完成: {cached_project_dir}")
|
||
return str(cached_project_dir)
|
||
|
||
def collect_document_files(self, project_dir: str) -> List[str]:
|
||
"""
|
||
收集项目目录下所有的 document.txt 文件
|
||
|
||
Args:
|
||
project_dir: 项目目录路径
|
||
|
||
Returns:
|
||
List[str]: 所有 document.txt 文件的完整路径列表
|
||
"""
|
||
document_files = []
|
||
project_path = Path(project_dir)
|
||
|
||
if not project_path.exists():
|
||
logger.error(f"项目目录不存在: {project_dir}")
|
||
return document_files
|
||
|
||
# 递归搜索所有 document.txt 文件
|
||
for file_path in project_path.rglob("document.txt"):
|
||
if file_path.is_file():
|
||
document_files.append(str(file_path))
|
||
|
||
logger.info(f"在项目目录 {project_dir} 中找到 {len(document_files)} 个 document.txt 文件")
|
||
for file_path in document_files[:5]: # 只打印前5个文件路径作为示例
|
||
logger.info(f" - {file_path}")
|
||
if len(document_files) > 5:
|
||
logger.info(f" ... 还有 {len(document_files) - 5} 个文件")
|
||
|
||
return document_files
|
||
|
||
def cleanup_cache(self):
|
||
"""清理缓存目录"""
|
||
try:
|
||
import shutil
|
||
if self.cache_dir.exists():
|
||
shutil.rmtree(self.cache_dir)
|
||
self.cache_dir.mkdir(exist_ok=True)
|
||
logger.info("缓存清理完成")
|
||
except Exception as e:
|
||
logger.error(f"清理缓存失败: {e}")
|
||
|
||
|
||
# 全局ZIP项目处理器实例
|
||
zip_handler = ZipProjectHandler() |