#!/usr/bin/env python3 """ Data merging functions for combining processed file results. """ import os import pickle from typing import Dict, List, Optional, Tuple import json # Try to import numpy, but handle if missing try: import numpy as np NUMPY_SUPPORT = True except ImportError: print("NumPy not available, some embedding features may be limited") NUMPY_SUPPORT = False def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: """Merge all document.txt files in a group into a single document.""" processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) os.makedirs(dataset_group_dir, exist_ok=True) merged_document_path = os.path.join(dataset_group_dir, "document.txt") result = { "success": False, "merged_document_path": merged_document_path, "source_files": [], "total_pages": 0, "total_characters": 0, "error": None } try: # Find all document.txt files in the processed directory document_files = [] if os.path.exists(processed_group_dir): for item in os.listdir(processed_group_dir): item_path = os.path.join(processed_group_dir, item) if os.path.isdir(item_path): document_path = os.path.join(item_path, "document.txt") if os.path.exists(document_path) and os.path.getsize(document_path) > 0: document_files.append((item, document_path)) if not document_files: result["error"] = "No document files found to merge" return result # Merge all documents with page separators merged_content = [] total_characters = 0 for filename_stem, document_path in sorted(document_files): try: with open(document_path, 'r', encoding='utf-8') as f: content = f.read().strip() if content: merged_content.append(f"# Page {filename_stem}") merged_content.append(content) total_characters += len(content) result["source_files"].append(filename_stem) except Exception as e: print(f"Error reading document file {document_path}: {str(e)}") continue if merged_content: # Write merged document with open(merged_document_path, 'w', encoding='utf-8') as f: f.write('\n\n'.join(merged_content)) result["total_pages"] = len(document_files) result["total_characters"] = total_characters result["success"] = True else: result["error"] = "No valid content found in document files" except Exception as e: result["error"] = f"Document merging failed: {str(e)}" print(f"Error merging documents for group {group_name}: {str(e)}") return result def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: """Merge all pagination.txt files in a group.""" processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) os.makedirs(dataset_group_dir, exist_ok=True) merged_pagination_path = os.path.join(dataset_group_dir, "pagination.txt") result = { "success": False, "merged_pagination_path": merged_pagination_path, "source_files": [], "total_lines": 0, "error": None } try: # Find all pagination.txt files pagination_files = [] if os.path.exists(processed_group_dir): for item in os.listdir(processed_group_dir): item_path = os.path.join(processed_group_dir, item) if os.path.isdir(item_path): pagination_path = os.path.join(item_path, "pagination.txt") if os.path.exists(pagination_path) and os.path.getsize(pagination_path) > 0: pagination_files.append((item, pagination_path)) if not pagination_files: result["error"] = "No pagination files found to merge" return result # Merge all pagination files merged_lines = [] for filename_stem, pagination_path in sorted(pagination_files): try: with open(pagination_path, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: line = line.strip() if line: merged_lines.append(line) result["source_files"].append(filename_stem) except Exception as e: print(f"Error reading pagination file {pagination_path}: {str(e)}") continue if merged_lines: # Write merged pagination with open(merged_pagination_path, 'w', encoding='utf-8') as f: for line in merged_lines: f.write(f"{line}\n") result["total_lines"] = len(merged_lines) result["success"] = True else: result["error"] = "No valid pagination data found" except Exception as e: result["error"] = f"Pagination merging failed: {str(e)}" print(f"Error merging paginations for group {group_name}: {str(e)}") return result def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: """Merge all embedding.pkl files in a group.""" processed_group_dir = os.path.join("projects", "data", unique_id, "processed", group_name) dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) os.makedirs(dataset_group_dir, exist_ok=True) merged_embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") result = { "success": False, "merged_embedding_path": merged_embedding_path, "source_files": [], "total_chunks": 0, "total_dimensions": 0, "error": None } try: # Find all embedding.pkl files embedding_files = [] if os.path.exists(processed_group_dir): for item in os.listdir(processed_group_dir): item_path = os.path.join(processed_group_dir, item) if os.path.isdir(item_path): embedding_path = os.path.join(item_path, "embedding.pkl") if os.path.exists(embedding_path) and os.path.getsize(embedding_path) > 0: embedding_files.append((item, embedding_path)) if not embedding_files: result["error"] = "No embedding files found to merge" return result # Load and merge all embedding data all_chunks = [] total_chunks = 0 dimensions = 0 for filename_stem, embedding_path in sorted(embedding_files): try: with open(embedding_path, 'rb') as f: embedding_data = pickle.load(f) if isinstance(embedding_data, dict) and 'chunks' in embedding_data: chunks = embedding_data['chunks'] # Add source file metadata to each chunk for chunk in chunks: if isinstance(chunk, dict): chunk['source_file'] = filename_stem chunk['source_group'] = group_name all_chunks.extend(chunks) total_chunks += len(chunks) # Get dimensions from first chunk if available if dimensions == 0 and chunks and isinstance(chunks[0], dict): if 'embedding' in chunks[0] and hasattr(chunks[0]['embedding'], 'shape'): dimensions = chunks[0]['embedding'].shape[0] result["source_files"].append(filename_stem) except Exception as e: print(f"Error loading embedding file {embedding_path}: {str(e)}") continue if all_chunks: # Create merged embedding data structure merged_embedding_data = { 'chunks': all_chunks, 'total_chunks': total_chunks, 'dimensions': dimensions, 'source_files': result["source_files"], 'group_name': group_name, 'merged_at': str(os.path.getmtime(merged_embedding_path) if os.path.exists(merged_embedding_path) else 0) } # Save merged embeddings with open(merged_embedding_path, 'wb') as f: pickle.dump(merged_embedding_data, f) result["total_chunks"] = total_chunks result["total_dimensions"] = dimensions result["success"] = True else: result["error"] = "No valid embedding data found" except Exception as e: result["error"] = f"Embedding merging failed: {str(e)}" print(f"Error merging embeddings for group {group_name}: {str(e)}") return result def merge_all_data_by_group(unique_id: str, group_name: str) -> Dict: """Merge documents, paginations, and embeddings for a group.""" merge_results = { "group_name": group_name, "unique_id": unique_id, "success": True, "document_merge": None, "pagination_merge": None, "embedding_merge": None, "errors": [] } # Merge documents document_result = merge_documents_by_group(unique_id, group_name) merge_results["document_merge"] = document_result if not document_result["success"]: merge_results["success"] = False merge_results["errors"].append(f"Document merge failed: {document_result['error']}") # Merge paginations pagination_result = merge_paginations_by_group(unique_id, group_name) merge_results["pagination_merge"] = pagination_result if not pagination_result["success"]: merge_results["success"] = False merge_results["errors"].append(f"Pagination merge failed: {pagination_result['error']}") # Merge embeddings embedding_result = merge_embeddings_by_group(unique_id, group_name) merge_results["embedding_merge"] = embedding_result if not embedding_result["success"]: merge_results["success"] = False merge_results["errors"].append(f"Embedding merge failed: {embedding_result['error']}") return merge_results def get_group_merge_status(unique_id: str, group_name: str) -> Dict: """Get the status of merged data for a group.""" dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) status = { "group_name": group_name, "unique_id": unique_id, "dataset_dir_exists": os.path.exists(dataset_group_dir), "document_exists": False, "document_size": 0, "pagination_exists": False, "pagination_size": 0, "embedding_exists": False, "embedding_size": 0, "merge_complete": False } if os.path.exists(dataset_group_dir): document_path = os.path.join(dataset_group_dir, "document.txt") pagination_path = os.path.join(dataset_group_dir, "pagination.txt") embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") if os.path.exists(document_path): status["document_exists"] = True status["document_size"] = os.path.getsize(document_path) if os.path.exists(pagination_path): status["pagination_exists"] = True status["pagination_size"] = os.path.getsize(pagination_path) if os.path.exists(embedding_path): status["embedding_exists"] = True status["embedding_size"] = os.path.getsize(embedding_path) # Check if all files exist and are not empty if (status["document_exists"] and status["document_size"] > 0 and status["pagination_exists"] and status["pagination_size"] > 0 and status["embedding_exists"] and status["embedding_size"] > 0): status["merge_complete"] = True return status def cleanup_dataset_group(unique_id: str, group_name: str) -> bool: """Clean up merged dataset files for a group.""" dataset_group_dir = os.path.join("projects", "data", unique_id, "dataset", group_name) try: if os.path.exists(dataset_group_dir): import shutil shutil.rmtree(dataset_group_dir) print(f"Cleaned up dataset group: {group_name}") return True else: return True # Nothing to clean up except Exception as e: print(f"Error cleaning up dataset group {group_name}: {str(e)}") return False