qwen_agent/zip_project_handler.py
2025-11-27 21:50:03 +08:00

195 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ZIP项目处理器
负责处理从URL下载ZIP文件并解压到项目目录的功能
"""
import os
import hashlib
import zipfile
import requests
import tempfile
import logging
from typing import List, Optional
from urllib.parse import urlparse
from pathlib import Path
# 配置日志
logger = logging.getLogger('app')
class ZipProjectHandler:
"""ZIP项目处理器"""
def __init__(self, projects_dir: str = "./projects"):
self.projects_dir = Path(projects_dir).resolve()
self.projects_dir.mkdir(exist_ok=True)
self.cache_dir = self.projects_dir / "_cache"
self.cache_dir.mkdir(exist_ok=True)
def _get_url_hash(self, url: str) -> str:
"""获取URL的哈希值用于缓存"""
return hashlib.md5(url.encode('utf-8')).hexdigest()[:16]
def _is_valid_url_or_path(self, path: str) -> bool:
"""验证URL或本地路径是否有效"""
# 首先尝试作为URL验证
try:
result = urlparse(path)
if all([result.scheme, result.netloc]):
return True
except Exception:
pass
# 然后尝试作为本地路径验证
try:
return Path(path).exists()
except Exception:
return False
def _download_file(self, url: str, local_path: str) -> bool:
"""下载文件到本地路径"""
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
except Exception as e:
logger.error(f"下载文件失败: {e}")
return False
def _copy_local_file(self, local_path: str, target_path: str) -> bool:
"""复制本地文件到目标路径"""
try:
import shutil
shutil.copy2(local_path, target_path)
return True
except Exception as e:
logger.error(f"复制本地文件失败: {e}")
return False
def _extract_zip(self, zip_path: str, extract_to: str) -> bool:
"""解压ZIP文件到指定目录"""
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
return True
except Exception as e:
logger.error(f"解压ZIP文件失败: {e}")
return False
def get_project_from_zip(self, zip_url: str, unique_id: Optional[str] = None) -> Optional[str]:
"""
从ZIP URL或本地路径获取项目数据
Args:
zip_url: ZIP文件的URL或本地相对路径
unique_id: 可选的唯一标识符,用作文件夹名称
Returns:
Optional[str]: 成功时返回项目目录路径失败时返回None
"""
if not self._is_valid_url_or_path(zip_url):
logger.error(f"无效的URL或路径: {zip_url}")
return None
# 使用unique_id作为目录名如果没有则使用url_hash
if unique_id:
project_dir_name = unique_id
# 当使用unique_id时不检查缓存直接重新解压以确保项目结构正确
cached_project_dir = self.projects_dir / project_dir_name
else:
project_dir_name = self._get_url_hash(zip_url)
cached_project_dir = self.projects_dir / project_dir_name
if cached_project_dir.exists() and not unique_id:
logger.info(f"使用缓存的项目目录: {cached_project_dir}")
return str(cached_project_dir)
# 下载或复制ZIP文件
url_hash = self._get_url_hash(zip_url)
# 当使用unique_id时使用unique_id作为ZIP文件名前缀以避免冲突
if unique_id:
zip_filename = f"{unique_id}_{url_hash}.zip"
else:
zip_filename = f"{url_hash}.zip"
zip_path = self.cache_dir / zip_filename
if not zip_path.exists():
# 判断是URL还是本地路径
try:
result = urlparse(zip_url)
is_url = all([result.scheme, result.netloc])
except Exception:
is_url = False
if is_url:
logger.info(f"下载ZIP文件: {zip_url}")
if not self._download_file(zip_url, str(zip_path)):
return None
else:
logger.info(f"复制本地ZIP文件: {zip_url}")
# 解析相对路径
local_path = Path(zip_url).resolve()
if not self._copy_local_file(str(local_path), str(zip_path)):
return None
else:
logger.info(f"使用缓存的ZIP文件: {zip_path}")
# 解压到项目目录
logger.info(f"解压ZIP文件到: {cached_project_dir}")
if not self._extract_zip(str(zip_path), str(cached_project_dir)):
return None
logger.info(f"项目准备完成: {cached_project_dir}")
return str(cached_project_dir)
def collect_document_files(self, project_dir: str) -> List[str]:
"""
收集项目目录下所有的 document.txt 文件
Args:
project_dir: 项目目录路径
Returns:
List[str]: 所有 document.txt 文件的完整路径列表
"""
document_files = []
project_path = Path(project_dir)
if not project_path.exists():
logger.error(f"项目目录不存在: {project_dir}")
return document_files
# 递归搜索所有 document.txt 文件
for file_path in project_path.rglob("document.txt"):
if file_path.is_file():
document_files.append(str(file_path))
logger.info(f"在项目目录 {project_dir} 中找到 {len(document_files)} 个 document.txt 文件")
for file_path in document_files[:5]: # 只打印前5个文件路径作为示例
logger.info(f" - {file_path}")
if len(document_files) > 5:
logger.info(f" ... 还有 {len(document_files) - 5} 个文件")
return document_files
def cleanup_cache(self):
"""清理缓存目录"""
try:
import shutil
if self.cache_dir.exists():
shutil.rmtree(self.cache_dir)
self.cache_dir.mkdir(exist_ok=True)
logger.info("缓存清理完成")
except Exception as e:
logger.error(f"清理缓存失败: {e}")
# 全局ZIP项目处理器实例
zip_handler = ZipProjectHandler()