#!/usr/bin/env python3 """ Dataset management functions for organizing and processing datasets. """ import os import shutil import json import tempfile import zipfile from typing import Dict, List, Optional from pathlib import Path from utils.file_utils import ( download_file, extract_zip_file, get_file_hash, load_processed_files_log, save_processed_files_log, remove_file_or_directory ) from utils.excel_csv_processor import ( is_excel_file, is_csv_file, process_excel_file, process_csv_file ) async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]: """Download or copy dataset files and organize them by key into dataset/{key}/document.txt. Supports zip file extraction and combines content using '# Page' separators.""" if not files: return {} # Set up directories project_dir = os.path.join("projects", unique_id) files_dir = os.path.join(project_dir, "files") dataset_dir = os.path.join(project_dir, "dataset") # Create directories if they don't exist os.makedirs(files_dir, exist_ok=True) os.makedirs(dataset_dir, exist_ok=True) processed_files_by_key = {} def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]: """Extract zip file and return list of extracted txt/md files""" extracted_files = [] try: import zipfile with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) # Find all extracted txt, md, xlsx, xls, and csv files for root, dirs, files in os.walk(extract_dir): for file in files: if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')): extracted_files.append(os.path.join(root, file)) print(f"Extracted {len(extracted_files)} txt/md/xlsx/csv files from {zip_path}") return extracted_files except Exception as e: print(f"Error extracting zip file {zip_path}: {str(e)}") return [] # Process each key and its associated files for key, file_list in files.items(): print(f"Processing key '{key}' with {len(file_list)} files") processed_files_by_key[key] = [] # Create target directory for this key target_dir = os.path.join(dataset_dir, key) os.makedirs(target_dir, exist_ok=True) # Check if files are already processed before doing any work document_file = os.path.join(target_dir, "document.txt") pagination_file = os.path.join(target_dir, "pagination.txt") embeddings_file = os.path.join(target_dir, "document_embeddings.pkl") already_processed = ( os.path.exists(document_file) and os.path.exists(pagination_file) and os.path.exists(embeddings_file) and os.path.getsize(document_file) > 0 and os.path.getsize(pagination_file) > 0 and os.path.getsize(embeddings_file) > 0 ) if already_processed: print(f" Skipping already processed files for {key}") processed_files_by_key[key].append(document_file) continue # Skip to next key # Read and combine all files for this key combined_content = [] pagination_lines = [] # Collect pagination lines from all files all_processed_files = [] for file_path in file_list: # Check if it's a URL (remote file) or local file is_remote = file_path.startswith(('http://', 'https://')) filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}" # Create temporary extraction directory for zip files temp_extract_dir = None files_to_process = [] try: if is_remote: # Handle remote file temp_file = os.path.join(files_dir, filename) print(f"Downloading {file_path} -> {temp_file}") success = await download_file(file_path, temp_file) if not success: print(f"Failed to download {file_path}") continue # Check if it's a zip file if filename.lower().endswith('.zip'): temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_") print(f"Extracting zip to temporary directory: {temp_extract_dir}") extracted_files = extract_zip_file_func(temp_file, temp_extract_dir) files_to_process.extend(extracted_files) # Copy the zip file to project files directory zip_dest = os.path.join(files_dir, filename) shutil.copy2(temp_file, zip_dest) print(f"Copied local zip file: {temp_file} -> {zip_dest}") else: files_to_process.append(temp_file) else: # Handle local file if not os.path.exists(file_path): print(f"Local file not found: {file_path}") continue if filename.lower().endswith('.zip'): # Copy to project directory first local_zip_path = os.path.join(files_dir, filename) shutil.copy2(file_path, local_zip_path) print(f"Copied local zip file: {file_path} -> {local_zip_path}") # Extract zip file temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_") print(f"Extracting local zip to temporary directory: {temp_extract_dir}") extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir) files_to_process.extend(extracted_files) else: # Copy non-zip file directly dest_file = os.path.join(files_dir, filename) shutil.copy2(file_path, dest_file) files_to_process.append(dest_file) print(f"Copied local file: {file_path} -> {dest_file}") # Process all files (extracted from zip or single file) for process_file_path in files_to_process: try: base_filename = os.path.basename(process_file_path) # Check if it's an Excel file if is_excel_file(process_file_path): print(f"Processing Excel file: {base_filename}") document_content, excel_pagination_lines = process_excel_file(process_file_path) if document_content: combined_content.append(f"# Page {base_filename}") combined_content.append(document_content) # Collect pagination lines from Excel files pagination_lines.extend(excel_pagination_lines) # Check if it's a CSV file elif is_csv_file(process_file_path): print(f"Processing CSV file: {base_filename}") document_content, csv_pagination_lines = process_csv_file(process_file_path) if document_content: combined_content.append(f"# Page {base_filename}") combined_content.append(document_content) # Collect pagination lines from CSV files pagination_lines.extend(csv_pagination_lines) # Handle text files (original logic) else: with open(process_file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if content: # Add file content with page separator combined_content.append(f"# Page {base_filename}") combined_content.append(content) except Exception as e: print(f"Failed to read file content from {process_file_path}: {str(e)}") except Exception as e: print(f"Error processing file {file_path}: {str(e)}") finally: # Clean up temporary extraction directory if temp_extract_dir and os.path.exists(temp_extract_dir): try: shutil.rmtree(temp_extract_dir) print(f"Cleaned up temporary directory: {temp_extract_dir}") except Exception as e: print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}") # Write combined content to dataset/{key}/document.txt if combined_content: try: with open(document_file, 'w', encoding='utf-8') as f: f.write('\n\n'.join(combined_content)) print(f"Created combined document: {document_file}") # Generate pagination and embeddings for the combined document try: import sys sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) from embedding import embed_document # Generate pagination file from collected pagination lines # For Excel/CSV files, use the pagination format we collected # For text files, fall back to the original pagination generation if pagination_lines: print(f" Writing pagination data from Excel/CSV files for {key}") with open(pagination_file, 'w', encoding='utf-8') as f: for line in pagination_lines: if line.strip(): f.write(f"{line}\n") print(f" Generated {len(pagination_lines)} pagination lines") else: # For text-only files, use the original pagination generation from embedding import split_document_by_pages print(f" Generating pagination from text files for {key}") pages = split_document_by_pages(str(document_file), str(pagination_file)) print(f" Generated {len(pages)} pages") # Generate embeddings print(f" Generating embeddings for {key}") # Use paragraph chunking strategy with default settings embedding_data = embed_document( str(document_file), str(embeddings_file), chunking_strategy='paragraph' ) if embedding_data: print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks") # Add to processed files only after successful embedding processed_files_by_key[key].append(document_file) else: print(f" Failed to generate embeddings") except Exception as e: print(f" Failed to generate pagination/embeddings for {key}: {str(e)}") except Exception as e: print(f"Failed to write combined document: {str(e)}") # Load existing log processed_log = load_processed_files_log(unique_id) # Update log with newly processed files for key, file_list in files.items(): if key not in processed_log: processed_log[key] = {} for file_path in file_list: filename = os.path.basename(file_path) processed_log[key][filename] = { "original_path": file_path, "processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0), "status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed" } # Save the updated processed log save_processed_files_log(unique_id, processed_log) return processed_files_by_key def generate_dataset_structure(unique_id: str) -> str: """Generate a string representation of the dataset structure""" dataset_dir = os.path.join("projects", unique_id, "dataset") structure = [] def add_directory_contents(dir_path: str, prefix: str = ""): try: items = sorted(os.listdir(dir_path)) for i, item in enumerate(items): item_path = os.path.join(dir_path, item) is_last = i == len(items) - 1 current_prefix = "└── " if is_last else "├── " structure.append(f"{prefix}{current_prefix}{item}") if os.path.isdir(item_path): next_prefix = prefix + (" " if is_last else "│ ") add_directory_contents(item_path, next_prefix) except Exception as e: structure.append(f"{prefix}└── Error: {str(e)}") if os.path.exists(dataset_dir): structure.append(f"dataset/") add_directory_contents(dataset_dir, "") else: structure.append("dataset/ (not found)") return "\n".join(structure) def remove_dataset_directory(unique_id: str, filename_without_ext: str): """Remove a specific dataset directory""" dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext) remove_file_or_directory(dataset_path) def remove_dataset_directory_by_key(unique_id: str, key: str): """Remove dataset directory by key""" dataset_path = os.path.join("projects", unique_id, "dataset", key) remove_file_or_directory(dataset_path)