catalog-agent/utils/file_utils.py
2025-10-17 22:04:10 +08:00

126 lines
4.3 KiB
Python

#!/usr/bin/env python3
"""
File utility functions for file processing, downloading, and management.
"""
import os
import hashlib
import aiofiles
import aiohttp
import shutil
import zipfile
import tempfile
from typing import Dict, List, Optional
from pathlib import Path
async def download_file(url: str, destination_path: str) -> bool:
"""Download file from URL asynchronously"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
async with aiofiles.open(destination_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
return True
else:
print(f"Failed to download {url}, status code: {response.status}")
return False
except Exception as e:
print(f"Error downloading {url}: {str(e)}")
return False
def get_file_hash(file_path: str) -> str:
"""Calculate MD5 hash for a file path/URL"""
return hashlib.md5(file_path.encode('utf-8')).hexdigest()
def remove_file_or_directory(path: str):
"""Remove file or directory recursively"""
try:
if os.path.exists(path):
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
shutil.rmtree(path)
print(f"Removed: {path}")
else:
print(f"Path does not exist: {path}")
except Exception as e:
print(f"Error removing {path}: {str(e)}")
def extract_zip_file(zip_path: str, extract_dir: str) -> List[str]:
"""Extract zip file and return list of extracted txt/md files"""
extracted_files = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Find all extracted txt and md files
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith(('.txt', '.md')):
extracted_files.append(os.path.join(root, file))
print(f"Extracted {len(extracted_files)} txt/md files from {zip_path}")
return extracted_files
except Exception as e:
print(f"Error extracting zip file {zip_path}: {str(e)}")
return []
def get_document_preview(document_path: str, max_lines: int = 10) -> str:
"""Get preview of document content"""
try:
with open(document_path, 'r', encoding='utf-8') as f:
lines = []
for i, line in enumerate(f):
if i >= max_lines:
break
lines.append(line.rstrip())
return '\n'.join(lines)
except Exception as e:
return f"Error reading document: {str(e)}"
def is_file_already_processed(target_file: Path, pagination_file: Path, embeddings_file: Path) -> bool:
"""Check if a file has already been processed (document.txt, pagination.txt, and embeddings exist)"""
if not target_file.exists():
return False
# Check if pagination and embeddings files exist and are not empty
if pagination_file.exists() and embeddings_file.exists():
# Check file sizes to ensure they're not empty
if pagination_file.stat().st_size > 0 and embeddings_file.stat().st_size > 0:
return True
return False
def load_processed_files_log(unique_id: str) -> Dict[str, Dict]:
"""Load processed files log for a project"""
log_file = os.path.join("projects", unique_id, "processed_files.json")
if os.path.exists(log_file):
try:
import json
with open(log_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading processed files log: {e}")
return {}
def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]):
"""Save processed files log for a project"""
log_file = os.path.join("projects", unique_id, "processed_files.json")
try:
os.makedirs(os.path.dirname(log_file), exist_ok=True)
import json
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(processed_log, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error saving processed files log: {e}")