#!/usr/bin/env python3 """ Dataset management functions for organizing and processing datasets. New implementation with per-file processing and group merging. """ import os import json from typing import Dict, List # Import new modules from utils.file_manager import ( ensure_directories, sync_files_to_group, cleanup_orphaned_files, get_group_files_list ) from utils.single_file_processor import ( process_single_file, check_file_already_processed ) from utils.data_merger import ( merge_all_data_by_group, cleanup_dataset_group ) async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]: """ Process dataset files with new architecture: 1. Sync files to group directories 2. Process each file individually 3. Merge results by group 4. Clean up orphaned files """ if not files: return {} print(f"Starting new file processing for project: {unique_id}") # Ensure project directories exist ensure_directories(unique_id) # Step 1: Sync files to group directories print("Step 1: Syncing files to group directories...") synced_files, failed_files = sync_files_to_group(unique_id, files) # Step 2: Detect changes and cleanup orphaned files from utils.file_manager import detect_file_changes changes = detect_file_changes(unique_id, files) if any(changes["removed"].values()): print("Step 2: Cleaning up orphaned files...") removed_files = cleanup_orphaned_files(unique_id, changes) print(f"Removed orphaned files: {removed_files}") # Step 3: Process individual files print("Step 3: Processing individual files...") processed_files_by_group = {} processing_results = {} for group_name, file_list in files.items(): processed_files_by_group[group_name] = [] processing_results[group_name] = [] for file_path in file_list: filename = os.path.basename(file_path) # Get local file path local_path = os.path.join("projects", "data", unique_id, "files", group_name, filename) # Skip if file doesn't exist (might be remote file that failed to download) if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')): print(f"Skipping non-existent file: {filename}") continue # Check if already processed if check_file_already_processed(unique_id, group_name, filename): print(f"Skipping already processed file: {filename}") processed_files_by_group[group_name].append(filename) processing_results[group_name].append({ "filename": filename, "status": "existing" }) continue # Process the file print(f"Processing file: {filename} (group: {group_name})") result = await process_single_file(unique_id, group_name, filename, file_path, local_path) processing_results[group_name].append(result) if result["success"]: processed_files_by_group[group_name].append(filename) print(f" Successfully processed {filename}") else: print(f" Failed to process {filename}: {result['error']}") # Step 4: Merge results by group print("Step 4: Merging results by group...") merge_results = {} for group_name in processed_files_by_group.keys(): # Get all files in the group (including existing ones) group_files = get_group_files_list(unique_id, group_name) if group_files: print(f"Merging group: {group_name} with {len(group_files)} files") merge_result = merge_all_data_by_group(unique_id, group_name) merge_results[group_name] = merge_result if merge_result["success"]: print(f" Successfully merged group {group_name}") else: print(f" Failed to merge group {group_name}: {merge_result['errors']}") # Step 5: Save processing log print("Step 5: Saving processing log...") await save_processing_log(unique_id, files, synced_files, processing_results, merge_results) print(f"File processing completed for project: {unique_id}") return processed_files_by_group async def save_processing_log( unique_id: str, requested_files: Dict[str, List[str]], synced_files: Dict, processing_results: Dict, merge_results: Dict ): """Save comprehensive processing log.""" log_data = { "unique_id": unique_id, "timestamp": str(os.path.getmtime("projects") if os.path.exists("projects") else 0), "requested_files": requested_files, "synced_files": synced_files, "processing_results": processing_results, "merge_results": merge_results, "summary": { "total_groups": len(requested_files), "total_files_requested": sum(len(files) for files in requested_files.values()), "total_files_processed": sum( len([r for r in results if r.get("success", False)]) for results in processing_results.values() ), "total_groups_merged": len([r for r in merge_results.values() if r.get("success", False)]) } } log_file_path = os.path.join("projects", "data", unique_id, "processing_log.json") try: with open(log_file_path, 'w', encoding='utf-8') as f: json.dump(log_data, f, ensure_ascii=False, indent=2) print(f"Processing log saved to: {log_file_path}") except Exception as e: print(f"Error saving processing log: {str(e)}") def generate_dataset_structure(unique_id: str) -> str: """Generate a string representation of the dataset structure""" project_dir = os.path.join("projects", "data", unique_id) structure = [] def add_directory_contents(dir_path: str, prefix: str = ""): try: if not os.path.exists(dir_path): structure.append(f"{prefix}└── (not found)") return items = sorted(os.listdir(dir_path)) for i, item in enumerate(items): item_path = os.path.join(dir_path, item) is_last = i == len(items) - 1 current_prefix = "└── " if is_last else "├── " structure.append(f"{prefix}{current_prefix}{item}") if os.path.isdir(item_path): next_prefix = prefix + (" " if is_last else "│ ") add_directory_contents(item_path, next_prefix) except Exception as e: structure.append(f"{prefix}└── Error: {str(e)}") # Add files directory structure files_dir = os.path.join(project_dir, "files") structure.append("files/") add_directory_contents(files_dir, "") # Add processed directory structure processed_dir = os.path.join(project_dir, "processed") structure.append("\nprocessed/") add_directory_contents(processed_dir, "") # Add dataset directory structure dataset_dir = os.path.join(project_dir, "dataset") structure.append("\ndataset/") add_directory_contents(dataset_dir, "") return "\n".join(structure) def get_processing_status(unique_id: str) -> Dict: """Get comprehensive processing status for a project.""" project_dir = os.path.join("projects", "data", unique_id) if not os.path.exists(project_dir): return { "project_exists": False, "unique_id": unique_id } status = { "project_exists": True, "unique_id": unique_id, "directories": { "files": os.path.exists(os.path.join(project_dir, "files")), "processed": os.path.exists(os.path.join(project_dir, "processed")), "dataset": os.path.exists(os.path.join(project_dir, "dataset")) }, "groups": {}, "processing_log_exists": os.path.exists(os.path.join(project_dir, "processing_log.json")) } # Check each group's status files_dir = os.path.join(project_dir, "files") if os.path.exists(files_dir): for group_name in os.listdir(files_dir): group_path = os.path.join(files_dir, group_name) if os.path.isdir(group_path): status["groups"][group_name] = { "files_count": len([ f for f in os.listdir(group_path) if os.path.isfile(os.path.join(group_path, f)) ]), "merge_status": "pending" } # Check merge status for each group dataset_dir = os.path.join(project_dir, "dataset") if os.path.exists(dataset_dir): for group_name in os.listdir(dataset_dir): group_path = os.path.join(dataset_dir, group_name) if os.path.isdir(group_path): if group_name in status["groups"]: # Check if merge is complete document_path = os.path.join(group_path, "document.txt") pagination_path = os.path.join(group_path, "pagination.txt") embedding_path = os.path.join(group_path, "embedding.pkl") if (os.path.exists(document_path) and os.path.exists(pagination_path) and os.path.exists(embedding_path)): status["groups"][group_name]["merge_status"] = "completed" else: status["groups"][group_name]["merge_status"] = "incomplete" else: status["groups"][group_name] = { "files_count": 0, "merge_status": "completed" } return status def remove_dataset_directory(unique_id: str, filename_without_ext: str): """Remove a specific dataset directory (deprecated - use new structure)""" # This function is kept for compatibility but delegates to new structure dataset_path = os.path.join("projects", "data", unique_id, "processed", filename_without_ext) if os.path.exists(dataset_path): import shutil shutil.rmtree(dataset_path) def remove_dataset_directory_by_key(unique_id: str, key: str): """Remove dataset directory by key (group name)""" # Remove files directory files_group_path = os.path.join("projects", "data", unique_id, "files", key) if os.path.exists(files_group_path): import shutil shutil.rmtree(files_group_path) # Remove processed directory processed_group_path = os.path.join("projects", "data", unique_id, "processed", key) if os.path.exists(processed_group_path): import shutil shutil.rmtree(processed_group_path) # Remove dataset directory cleanup_dataset_group(unique_id, key)