#!/usr/bin/env python3 """ Dataset management functions for organizing and processing datasets. """ import os import shutil import json import tempfile import zipfile from typing import Dict, List, Optional from pathlib import Path from utils.file_utils import ( download_file, extract_zip_file, get_file_hash, load_processed_files_log, save_processed_files_log, remove_file_or_directory ) async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]: """Download or copy dataset files and organize them by key into dataset/{key}/document.txt. Supports zip file extraction and combines content using '# Page' separators.""" if not files: return {} # Set up directories project_dir = os.path.join("projects", unique_id) files_dir = os.path.join(project_dir, "files") dataset_dir = os.path.join(project_dir, "dataset") # Create directories if they don't exist os.makedirs(files_dir, exist_ok=True) os.makedirs(dataset_dir, exist_ok=True) processed_files_by_key = {} def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]: """Extract zip file and return list of extracted txt/md files""" extracted_files = [] try: import zipfile with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) # Find all extracted txt and md files for root, dirs, files in os.walk(extract_dir): for file in files: if file.lower().endswith(('.txt', '.md')): extracted_files.append(os.path.join(root, file)) print(f"Extracted {len(extracted_files)} txt/md files from {zip_path}") return extracted_files except Exception as e: print(f"Error extracting zip file {zip_path}: {str(e)}") return [] # Process each key and its associated files for key, file_list in files.items(): print(f"Processing key '{key}' with {len(file_list)} files") processed_files_by_key[key] = [] # Create target directory for this key target_dir = os.path.join(dataset_dir, key) os.makedirs(target_dir, exist_ok=True) # Check if files are already processed before doing any work document_file = os.path.join(target_dir, "document.txt") pagination_file = os.path.join(target_dir, "pagination.txt") embeddings_file = os.path.join(target_dir, "document_embeddings.pkl") already_processed = ( os.path.exists(document_file) and os.path.exists(pagination_file) and os.path.exists(embeddings_file) and os.path.getsize(document_file) > 0 and os.path.getsize(pagination_file) > 0 and os.path.getsize(embeddings_file) > 0 ) if already_processed: print(f" Skipping already processed files for {key}") processed_files_by_key[key].append(document_file) continue # Skip to next key # Read and combine all files for this key combined_content = [] all_processed_files = [] for file_path in file_list: # Check if it's a URL (remote file) or local file is_remote = file_path.startswith(('http://', 'https://')) filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}" # Create temporary extraction directory for zip files temp_extract_dir = None files_to_process = [] try: if is_remote: # Handle remote file temp_file = os.path.join(files_dir, filename) print(f"Downloading {file_path} -> {temp_file}") success = await download_file(file_path, temp_file) if not success: print(f"Failed to download {file_path}") continue # Check if it's a zip file if filename.lower().endswith('.zip'): temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_") print(f"Extracting zip to temporary directory: {temp_extract_dir}") extracted_files = extract_zip_file_func(temp_file, temp_extract_dir) files_to_process.extend(extracted_files) # Copy the zip file to project files directory zip_dest = os.path.join(files_dir, filename) shutil.copy2(temp_file, zip_dest) print(f"Copied local zip file: {temp_file} -> {zip_dest}") else: files_to_process.append(temp_file) else: # Handle local file if not os.path.exists(file_path): print(f"Local file not found: {file_path}") continue if filename.lower().endswith('.zip'): # Copy to project directory first local_zip_path = os.path.join(files_dir, filename) shutil.copy2(file_path, local_zip_path) print(f"Copied local zip file: {file_path} -> {local_zip_path}") # Extract zip file temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_") print(f"Extracting local zip to temporary directory: {temp_extract_dir}") extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir) files_to_process.extend(extracted_files) else: # Copy non-zip file directly dest_file = os.path.join(files_dir, filename) shutil.copy2(file_path, dest_file) files_to_process.append(dest_file) print(f"Copied local file: {file_path} -> {dest_file}") # Process all files (extracted from zip or single file) for process_file_path in files_to_process: try: with open(process_file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if content: # Add file content with page separator base_filename = os.path.basename(process_file_path) combined_content.append(f"# Page {base_filename}") combined_content.append(content) except Exception as e: print(f"Failed to read file content from {process_file_path}: {str(e)}") except Exception as e: print(f"Error processing file {file_path}: {str(e)}") finally: # Clean up temporary extraction directory if temp_extract_dir and os.path.exists(temp_extract_dir): try: shutil.rmtree(temp_extract_dir) print(f"Cleaned up temporary directory: {temp_extract_dir}") except Exception as e: print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}") # Write combined content to dataset/{key}/document.txt if combined_content: try: with open(document_file, 'w', encoding='utf-8') as f: f.write('\n\n'.join(combined_content)) print(f"Created combined document: {document_file}") # Generate pagination and embeddings for the combined document try: import sys sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) from embedding import split_document_by_pages, embed_document # Generate pagination print(f" Generating pagination for {key}") pages = split_document_by_pages(str(document_file), str(pagination_file)) print(f" Generated {len(pages)} pages") # Generate embeddings print(f" Generating embeddings for {key}") local_model_path = "./models/paraphrase-multilingual-MiniLM-L12-v2" if not os.path.exists(local_model_path): local_model_path = None # Fallback to HuggingFace model # Use paragraph chunking strategy with default settings embedding_data = embed_document( str(document_file), str(embeddings_file), chunking_strategy='paragraph', model_path=local_model_path ) if embedding_data: print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks") # Add to processed files only after successful embedding processed_files_by_key[key].append(document_file) else: print(f" Failed to generate embeddings") except Exception as e: print(f" Failed to generate pagination/embeddings for {key}: {str(e)}") except Exception as e: print(f"Failed to write combined document: {str(e)}") # Load existing log processed_log = load_processed_files_log(unique_id) # Update log with newly processed files for key, file_list in files.items(): if key not in processed_log: processed_log[key] = {} for file_path in file_list: filename = os.path.basename(file_path) processed_log[key][filename] = { "original_path": file_path, "processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0), "status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed" } # Save the updated processed log save_processed_files_log(unique_id, processed_log) return processed_files_by_key def generate_dataset_structure(unique_id: str) -> str: """Generate a string representation of the dataset structure""" dataset_dir = os.path.join("projects", unique_id, "dataset") structure = [] def add_directory_contents(dir_path: str, prefix: str = ""): try: items = sorted(os.listdir(dir_path)) for i, item in enumerate(items): item_path = os.path.join(dir_path, item) is_last = i == len(items) - 1 current_prefix = "└── " if is_last else "├── " structure.append(f"{prefix}{current_prefix}{item}") if os.path.isdir(item_path): next_prefix = prefix + (" " if is_last else "│ ") add_directory_contents(item_path, next_prefix) except Exception as e: structure.append(f"{prefix}└── Error: {str(e)}") if os.path.exists(dataset_dir): structure.append(f"dataset/") add_directory_contents(dataset_dir, "") else: structure.append("dataset/ (not found)") return "\n".join(structure) def remove_dataset_directory(unique_id: str, filename_without_ext: str): """Remove a specific dataset directory""" dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext) remove_file_or_directory(dataset_path) def remove_dataset_directory_by_key(unique_id: str, key: str): """Remove dataset directory by key""" dataset_path = os.path.join("projects", unique_id, "dataset", key) remove_file_or_directory(dataset_path)