126 lines
4.3 KiB
Python
126 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
File utility functions for file processing, downloading, and management.
|
|
"""
|
|
|
|
import os
|
|
import hashlib
|
|
import aiofiles
|
|
import aiohttp
|
|
import shutil
|
|
import zipfile
|
|
import tempfile
|
|
from typing import Dict, List, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
async def download_file(url: str, destination_path: str) -> bool:
|
|
"""Download file from URL asynchronously"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
async with aiofiles.open(destination_path, 'wb') as f:
|
|
async for chunk in response.content.iter_chunked(8192):
|
|
await f.write(chunk)
|
|
return True
|
|
else:
|
|
print(f"Failed to download {url}, status code: {response.status}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error downloading {url}: {str(e)}")
|
|
return False
|
|
|
|
|
|
def get_file_hash(file_path: str) -> str:
|
|
"""Calculate MD5 hash for a file path/URL"""
|
|
return hashlib.md5(file_path.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def remove_file_or_directory(path: str):
|
|
"""Remove file or directory recursively"""
|
|
try:
|
|
if os.path.exists(path):
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
elif os.path.isdir(path):
|
|
shutil.rmtree(path)
|
|
print(f"Removed: {path}")
|
|
else:
|
|
print(f"Path does not exist: {path}")
|
|
except Exception as e:
|
|
print(f"Error removing {path}: {str(e)}")
|
|
|
|
|
|
def extract_zip_file(zip_path: str, extract_dir: str) -> List[str]:
|
|
"""Extract zip file and return list of extracted txt/md files"""
|
|
extracted_files = []
|
|
try:
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_dir)
|
|
|
|
# Find all extracted txt and md files
|
|
for root, dirs, files in os.walk(extract_dir):
|
|
for file in files:
|
|
if file.lower().endswith(('.txt', '.md')):
|
|
extracted_files.append(os.path.join(root, file))
|
|
|
|
print(f"Extracted {len(extracted_files)} txt/md files from {zip_path}")
|
|
return extracted_files
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting zip file {zip_path}: {str(e)}")
|
|
return []
|
|
|
|
|
|
def get_document_preview(document_path: str, max_lines: int = 10) -> str:
|
|
"""Get preview of document content"""
|
|
try:
|
|
with open(document_path, 'r', encoding='utf-8') as f:
|
|
lines = []
|
|
for i, line in enumerate(f):
|
|
if i >= max_lines:
|
|
break
|
|
lines.append(line.rstrip())
|
|
return '\n'.join(lines)
|
|
except Exception as e:
|
|
return f"Error reading document: {str(e)}"
|
|
|
|
|
|
def is_file_already_processed(target_file: Path, pagination_file: Path, embeddings_file: Path) -> bool:
|
|
"""Check if a file has already been processed (document.txt, pagination.txt, and embeddings exist)"""
|
|
if not target_file.exists():
|
|
return False
|
|
|
|
# Check if pagination and embeddings files exist and are not empty
|
|
if pagination_file.exists() and embeddings_file.exists():
|
|
# Check file sizes to ensure they're not empty
|
|
if pagination_file.stat().st_size > 0 and embeddings_file.stat().st_size > 0:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def load_processed_files_log(unique_id: str) -> Dict[str, Dict]:
|
|
"""Load processed files log for a project"""
|
|
log_file = os.path.join("projects", unique_id, "processed_files.json")
|
|
if os.path.exists(log_file):
|
|
try:
|
|
import json
|
|
with open(log_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading processed files log: {e}")
|
|
return {}
|
|
|
|
|
|
def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]):
|
|
"""Save processed files log for a project"""
|
|
log_file = os.path.join("projects", unique_id, "processed_files.json")
|
|
try:
|
|
os.makedirs(os.path.dirname(log_file), exist_ok=True)
|
|
import json
|
|
with open(log_file, 'w', encoding='utf-8') as f:
|
|
json.dump(processed_log, f, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
print(f"Error saving processed files log: {e}") |