283 lines
12 KiB
Python
283 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Dataset management functions for organizing and processing datasets.
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import json
|
|
import tempfile
|
|
import zipfile
|
|
from typing import Dict, List, Optional
|
|
from pathlib import Path
|
|
|
|
from utils.file_utils import (
|
|
download_file, extract_zip_file, get_file_hash,
|
|
load_processed_files_log, save_processed_files_log,
|
|
remove_file_or_directory
|
|
)
|
|
|
|
|
|
async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]:
|
|
"""Download or copy dataset files and organize them by key into dataset/{key}/document.txt.
|
|
Supports zip file extraction and combines content using '# Page' separators."""
|
|
if not files:
|
|
return {}
|
|
|
|
# Set up directories
|
|
project_dir = os.path.join("projects", unique_id)
|
|
files_dir = os.path.join(project_dir, "files")
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
|
|
# Create directories if they don't exist
|
|
os.makedirs(files_dir, exist_ok=True)
|
|
os.makedirs(dataset_dir, exist_ok=True)
|
|
|
|
processed_files_by_key = {}
|
|
|
|
def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]:
|
|
"""Extract zip file and return list of extracted txt/md files"""
|
|
extracted_files = []
|
|
try:
|
|
import zipfile
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_dir)
|
|
|
|
# Find all extracted txt and md files
|
|
for root, dirs, files in os.walk(extract_dir):
|
|
for file in files:
|
|
if file.lower().endswith(('.txt', '.md')):
|
|
extracted_files.append(os.path.join(root, file))
|
|
|
|
print(f"Extracted {len(extracted_files)} txt/md files from {zip_path}")
|
|
return extracted_files
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting zip file {zip_path}: {str(e)}")
|
|
return []
|
|
|
|
# Process each key and its associated files
|
|
for key, file_list in files.items():
|
|
print(f"Processing key '{key}' with {len(file_list)} files")
|
|
processed_files_by_key[key] = []
|
|
|
|
# Create target directory for this key
|
|
target_dir = os.path.join(dataset_dir, key)
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
# Check if files are already processed before doing any work
|
|
document_file = os.path.join(target_dir, "document.txt")
|
|
pagination_file = os.path.join(target_dir, "pagination.txt")
|
|
embeddings_file = os.path.join(target_dir, "document_embeddings.pkl")
|
|
|
|
already_processed = (
|
|
os.path.exists(document_file) and
|
|
os.path.exists(pagination_file) and
|
|
os.path.exists(embeddings_file) and
|
|
os.path.getsize(document_file) > 0 and
|
|
os.path.getsize(pagination_file) > 0 and
|
|
os.path.getsize(embeddings_file) > 0
|
|
)
|
|
|
|
if already_processed:
|
|
print(f" Skipping already processed files for {key}")
|
|
processed_files_by_key[key].append(document_file)
|
|
continue # Skip to next key
|
|
|
|
# Read and combine all files for this key
|
|
combined_content = []
|
|
all_processed_files = []
|
|
|
|
for file_path in file_list:
|
|
# Check if it's a URL (remote file) or local file
|
|
is_remote = file_path.startswith(('http://', 'https://'))
|
|
filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}"
|
|
|
|
# Create temporary extraction directory for zip files
|
|
temp_extract_dir = None
|
|
files_to_process = []
|
|
|
|
try:
|
|
if is_remote:
|
|
# Handle remote file
|
|
temp_file = os.path.join(files_dir, filename)
|
|
print(f"Downloading {file_path} -> {temp_file}")
|
|
|
|
success = await download_file(file_path, temp_file)
|
|
if not success:
|
|
print(f"Failed to download {file_path}")
|
|
continue
|
|
|
|
# Check if it's a zip file
|
|
if filename.lower().endswith('.zip'):
|
|
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
|
|
print(f"Extracting zip to temporary directory: {temp_extract_dir}")
|
|
|
|
extracted_files = extract_zip_file_func(temp_file, temp_extract_dir)
|
|
files_to_process.extend(extracted_files)
|
|
|
|
# Copy the zip file to project files directory
|
|
zip_dest = os.path.join(files_dir, filename)
|
|
shutil.copy2(temp_file, zip_dest)
|
|
print(f"Copied local zip file: {temp_file} -> {zip_dest}")
|
|
else:
|
|
files_to_process.append(temp_file)
|
|
|
|
else:
|
|
# Handle local file
|
|
if not os.path.exists(file_path):
|
|
print(f"Local file not found: {file_path}")
|
|
continue
|
|
|
|
if filename.lower().endswith('.zip'):
|
|
# Copy to project directory first
|
|
local_zip_path = os.path.join(files_dir, filename)
|
|
shutil.copy2(file_path, local_zip_path)
|
|
print(f"Copied local zip file: {file_path} -> {local_zip_path}")
|
|
|
|
# Extract zip file
|
|
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
|
|
print(f"Extracting local zip to temporary directory: {temp_extract_dir}")
|
|
|
|
extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir)
|
|
files_to_process.extend(extracted_files)
|
|
else:
|
|
# Copy non-zip file directly
|
|
dest_file = os.path.join(files_dir, filename)
|
|
shutil.copy2(file_path, dest_file)
|
|
files_to_process.append(dest_file)
|
|
print(f"Copied local file: {file_path} -> {dest_file}")
|
|
|
|
# Process all files (extracted from zip or single file)
|
|
for process_file_path in files_to_process:
|
|
try:
|
|
with open(process_file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read().strip()
|
|
|
|
if content:
|
|
# Add file content with page separator
|
|
base_filename = os.path.basename(process_file_path)
|
|
combined_content.append(f"# Page {base_filename}")
|
|
combined_content.append(content)
|
|
|
|
except Exception as e:
|
|
print(f"Failed to read file content from {process_file_path}: {str(e)}")
|
|
|
|
except Exception as e:
|
|
print(f"Error processing file {file_path}: {str(e)}")
|
|
|
|
finally:
|
|
# Clean up temporary extraction directory
|
|
if temp_extract_dir and os.path.exists(temp_extract_dir):
|
|
try:
|
|
shutil.rmtree(temp_extract_dir)
|
|
print(f"Cleaned up temporary directory: {temp_extract_dir}")
|
|
except Exception as e:
|
|
print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}")
|
|
|
|
# Write combined content to dataset/{key}/document.txt
|
|
if combined_content:
|
|
try:
|
|
with open(document_file, 'w', encoding='utf-8') as f:
|
|
f.write('\n\n'.join(combined_content))
|
|
print(f"Created combined document: {document_file}")
|
|
|
|
# Generate pagination and embeddings for the combined document
|
|
try:
|
|
import sys
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding'))
|
|
from embedding import split_document_by_pages, embed_document
|
|
|
|
# Generate pagination
|
|
print(f" Generating pagination for {key}")
|
|
pages = split_document_by_pages(str(document_file), str(pagination_file))
|
|
print(f" Generated {len(pages)} pages")
|
|
|
|
# Generate embeddings
|
|
print(f" Generating embeddings for {key}")
|
|
local_model_path = "./models/paraphrase-multilingual-MiniLM-L12-v2"
|
|
if not os.path.exists(local_model_path):
|
|
local_model_path = None # Fallback to HuggingFace model
|
|
|
|
# Use paragraph chunking strategy with default settings
|
|
embedding_data = embed_document(
|
|
str(document_file),
|
|
str(embeddings_file),
|
|
chunking_strategy='paragraph',
|
|
model_path=local_model_path
|
|
)
|
|
|
|
if embedding_data:
|
|
print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks")
|
|
# Add to processed files only after successful embedding
|
|
processed_files_by_key[key].append(document_file)
|
|
else:
|
|
print(f" Failed to generate embeddings")
|
|
|
|
except Exception as e:
|
|
print(f" Failed to generate pagination/embeddings for {key}: {str(e)}")
|
|
|
|
except Exception as e:
|
|
print(f"Failed to write combined document: {str(e)}")
|
|
|
|
# Load existing log
|
|
processed_log = load_processed_files_log(unique_id)
|
|
|
|
# Update log with newly processed files
|
|
for key, file_list in files.items():
|
|
if key not in processed_log:
|
|
processed_log[key] = {}
|
|
|
|
for file_path in file_list:
|
|
filename = os.path.basename(file_path)
|
|
processed_log[key][filename] = {
|
|
"original_path": file_path,
|
|
"processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0),
|
|
"status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed"
|
|
}
|
|
|
|
# Save the updated processed log
|
|
save_processed_files_log(unique_id, processed_log)
|
|
|
|
return processed_files_by_key
|
|
|
|
|
|
def generate_dataset_structure(unique_id: str) -> str:
|
|
"""Generate a string representation of the dataset structure"""
|
|
dataset_dir = os.path.join("projects", unique_id, "dataset")
|
|
structure = []
|
|
|
|
def add_directory_contents(dir_path: str, prefix: str = ""):
|
|
try:
|
|
items = sorted(os.listdir(dir_path))
|
|
for i, item in enumerate(items):
|
|
item_path = os.path.join(dir_path, item)
|
|
is_last = i == len(items) - 1
|
|
current_prefix = "└── " if is_last else "├── "
|
|
structure.append(f"{prefix}{current_prefix}{item}")
|
|
|
|
if os.path.isdir(item_path):
|
|
next_prefix = prefix + (" " if is_last else "│ ")
|
|
add_directory_contents(item_path, next_prefix)
|
|
except Exception as e:
|
|
structure.append(f"{prefix}└── Error: {str(e)}")
|
|
|
|
if os.path.exists(dataset_dir):
|
|
structure.append(f"dataset/")
|
|
add_directory_contents(dataset_dir, "")
|
|
else:
|
|
structure.append("dataset/ (not found)")
|
|
|
|
return "\n".join(structure)
|
|
|
|
|
|
def remove_dataset_directory(unique_id: str, filename_without_ext: str):
|
|
"""Remove a specific dataset directory"""
|
|
dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext)
|
|
remove_file_or_directory(dataset_path)
|
|
|
|
|
|
def remove_dataset_directory_by_key(unique_id: str, key: str):
|
|
"""Remove dataset directory by key"""
|
|
dataset_path = os.path.join("projects", unique_id, "dataset", key)
|
|
remove_file_or_directory(dataset_path) |