From cec83ac4a9e2e578ca96809e1a48296cd79a892d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sat, 25 Oct 2025 10:09:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=87=E4=BB=B6=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=88=86=E7=BB=84=E5=88=A0=E9=99=A4=EF=BC=8C=E6=8C=89=E9=9C=80?= =?UTF-8?q?embedding?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test_multi_search.py | 98 ------- utils/data_merger.py | 356 +++++++++++++++++++++++ utils/dataset_manager.py | 499 +++++++++++++++------------------ utils/file_manager.py | 253 +++++++++++++++++ utils/file_utils.py | 182 +++++++++++- utils/single_file_processor.py | 297 ++++++++++++++++++++ 6 files changed, 1317 insertions(+), 368 deletions(-) delete mode 100644 test_multi_search.py create mode 100644 utils/data_merger.py create mode 100644 utils/file_manager.py create mode 100644 utils/single_file_processor.py diff --git a/test_multi_search.py b/test_multi_search.py deleted file mode 100644 index fba854f..0000000 --- a/test_multi_search.py +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/env python3 -""" -测试脚本:验证多查询和多模式搜索功能 -""" - -import sys -import os -sys.path.append('/Users/moshui/Documents/felo/qwen-agent/mcp') - -from semantic_search_server import semantic_search -from multi_keyword_search_server import regex_grep, regex_grep_count - -def test_semantic_search(): - """测试语义搜索的多查询功能""" - print("=== 测试语义搜索多查询功能 ===") - - # 测试数据(模拟) - # 注意:这里需要实际的embedding文件才能测试 - print("语义搜索功能已修改,支持多查询输入") - print("参数格式:") - print(" - 单查询:queries='查询内容' 或 query='查询内容'") - print(" - 多查询:queries=['查询1', '查询2', '查询3']") - print() - -def test_regex_grep(): - """测试正则表达式的多模式搜索功能""" - print("=== 测试正则表达式多模式搜索功能 ===") - - # 创建测试文件 - test_file = "/tmp/test_regex.txt" - with open(test_file, 'w') as f: - f.write("""def hello_world(): - print("Hello, World!") - return "success" - -def hello_python(): - print("Hello, Python!") - return 42 - -class HelloWorld: - def __init__(self): - self.name = "World" - - def greet(self): - return f"Hello, {self.name}!' - -# 测试数字模式 -version = "1.2.3" -count = 42 -""") - - # 测试多模式搜索 - print("测试多模式搜索:['def.*hello', 'class.*World', '\\d+\\.\\d+\\.\\d+']") - result = regex_grep( - patterns=['def.*hello', 'class.*World', r'\d+\.\d+\.\d+'], - file_paths=[test_file], - case_sensitive=False - ) - - if "content" in result: - print("搜索结果:") - print(result["content"][0]["text"]) - - print() - - # 测试多模式统计 - print("测试多模式统计:['def', 'class', 'Hello', '\\d+']") - result = regex_grep_count( - patterns=['def', 'class', 'Hello', r'\d+'], - file_paths=[test_file], - case_sensitive=False - ) - - if "content" in result: - print("统计结果:") - print(result["content"][0]["text"]) - - # 清理测试文件 - os.remove(test_file) - -def main(): - """主测试函数""" - print("开始测试多查询和多模式搜索功能...") - print() - - test_semantic_search() - test_regex_grep() - - print("=== 测试完成 ===") - print("所有功能已成功修改:") - print("1. ✅ semantic_search 支持多查询 (queries 参数)") - print("2. ✅ regex_grep 支持多模式 (patterns 参数)") - print("3. ✅ regex_grep_count 支持多模式 (patterns 参数)") - print("4. ✅ 保持向后兼容性 (仍支持单查询/模式)") - print("5. ✅ 更新了工具定义 JSON 文件") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/utils/data_merger.py b/utils/data_merger.py new file mode 100644 index 0000000..c304405 --- /dev/null +++ b/utils/data_merger.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +""" +Data merging functions for combining processed file results. +""" + +import os +import pickle +from typing import Dict, List, Optional, Tuple +import json + +# Try to import numpy, but handle if missing +try: + import numpy as np + NUMPY_SUPPORT = True +except ImportError: + print("NumPy not available, some embedding features may be limited") + NUMPY_SUPPORT = False + + +def merge_documents_by_group(unique_id: str, group_name: str) -> Dict: + """Merge all document.txt files in a group into a single document.""" + + processed_group_dir = os.path.join("projects", unique_id, "processed", group_name) + dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + os.makedirs(dataset_group_dir, exist_ok=True) + + merged_document_path = os.path.join(dataset_group_dir, "document.txt") + + result = { + "success": False, + "merged_document_path": merged_document_path, + "source_files": [], + "total_pages": 0, + "total_characters": 0, + "error": None + } + + try: + # Find all document.txt files in the processed directory + document_files = [] + if os.path.exists(processed_group_dir): + for item in os.listdir(processed_group_dir): + item_path = os.path.join(processed_group_dir, item) + if os.path.isdir(item_path): + document_path = os.path.join(item_path, "document.txt") + if os.path.exists(document_path) and os.path.getsize(document_path) > 0: + document_files.append((item, document_path)) + + if not document_files: + result["error"] = "No document files found to merge" + return result + + # Merge all documents with page separators + merged_content = [] + total_characters = 0 + + for filename_stem, document_path in sorted(document_files): + try: + with open(document_path, 'r', encoding='utf-8') as f: + content = f.read().strip() + + if content: + merged_content.append(f"# Page {filename_stem}") + merged_content.append(content) + total_characters += len(content) + result["source_files"].append(filename_stem) + + except Exception as e: + print(f"Error reading document file {document_path}: {str(e)}") + continue + + if merged_content: + # Write merged document + with open(merged_document_path, 'w', encoding='utf-8') as f: + f.write('\n\n'.join(merged_content)) + + result["total_pages"] = len(document_files) + result["total_characters"] = total_characters + result["success"] = True + + else: + result["error"] = "No valid content found in document files" + + except Exception as e: + result["error"] = f"Document merging failed: {str(e)}" + print(f"Error merging documents for group {group_name}: {str(e)}") + + return result + + +def merge_paginations_by_group(unique_id: str, group_name: str) -> Dict: + """Merge all pagination.txt files in a group.""" + + processed_group_dir = os.path.join("projects", unique_id, "processed", group_name) + dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + os.makedirs(dataset_group_dir, exist_ok=True) + + merged_pagination_path = os.path.join(dataset_group_dir, "pagination.txt") + + result = { + "success": False, + "merged_pagination_path": merged_pagination_path, + "source_files": [], + "total_lines": 0, + "error": None + } + + try: + # Find all pagination.txt files + pagination_files = [] + if os.path.exists(processed_group_dir): + for item in os.listdir(processed_group_dir): + item_path = os.path.join(processed_group_dir, item) + if os.path.isdir(item_path): + pagination_path = os.path.join(item_path, "pagination.txt") + if os.path.exists(pagination_path) and os.path.getsize(pagination_path) > 0: + pagination_files.append((item, pagination_path)) + + if not pagination_files: + result["error"] = "No pagination files found to merge" + return result + + # Merge all pagination files + merged_lines = [] + + for filename_stem, pagination_path in sorted(pagination_files): + try: + with open(pagination_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + for line in lines: + line = line.strip() + if line: + merged_lines.append(line) + + result["source_files"].append(filename_stem) + + except Exception as e: + print(f"Error reading pagination file {pagination_path}: {str(e)}") + continue + + if merged_lines: + # Write merged pagination + with open(merged_pagination_path, 'w', encoding='utf-8') as f: + for line in merged_lines: + f.write(f"{line}\n") + + result["total_lines"] = len(merged_lines) + result["success"] = True + + else: + result["error"] = "No valid pagination data found" + + except Exception as e: + result["error"] = f"Pagination merging failed: {str(e)}" + print(f"Error merging paginations for group {group_name}: {str(e)}") + + return result + + +def merge_embeddings_by_group(unique_id: str, group_name: str) -> Dict: + """Merge all embedding.pkl files in a group.""" + + processed_group_dir = os.path.join("projects", unique_id, "processed", group_name) + dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + os.makedirs(dataset_group_dir, exist_ok=True) + + merged_embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") + + result = { + "success": False, + "merged_embedding_path": merged_embedding_path, + "source_files": [], + "total_chunks": 0, + "total_dimensions": 0, + "error": None + } + + try: + # Find all embedding.pkl files + embedding_files = [] + if os.path.exists(processed_group_dir): + for item in os.listdir(processed_group_dir): + item_path = os.path.join(processed_group_dir, item) + if os.path.isdir(item_path): + embedding_path = os.path.join(item_path, "embedding.pkl") + if os.path.exists(embedding_path) and os.path.getsize(embedding_path) > 0: + embedding_files.append((item, embedding_path)) + + if not embedding_files: + result["error"] = "No embedding files found to merge" + return result + + # Load and merge all embedding data + all_chunks = [] + total_chunks = 0 + dimensions = 0 + + for filename_stem, embedding_path in sorted(embedding_files): + try: + with open(embedding_path, 'rb') as f: + embedding_data = pickle.load(f) + + if isinstance(embedding_data, dict) and 'chunks' in embedding_data: + chunks = embedding_data['chunks'] + + # Add source file metadata to each chunk + for chunk in chunks: + if isinstance(chunk, dict): + chunk['source_file'] = filename_stem + chunk['source_group'] = group_name + + all_chunks.extend(chunks) + total_chunks += len(chunks) + + # Get dimensions from first chunk if available + if dimensions == 0 and chunks and isinstance(chunks[0], dict): + if 'embedding' in chunks[0] and hasattr(chunks[0]['embedding'], 'shape'): + dimensions = chunks[0]['embedding'].shape[0] + + result["source_files"].append(filename_stem) + + except Exception as e: + print(f"Error loading embedding file {embedding_path}: {str(e)}") + continue + + if all_chunks: + # Create merged embedding data structure + merged_embedding_data = { + 'chunks': all_chunks, + 'total_chunks': total_chunks, + 'dimensions': dimensions, + 'source_files': result["source_files"], + 'group_name': group_name, + 'merged_at': str(os.path.getmtime(merged_embedding_path) if os.path.exists(merged_embedding_path) else 0) + } + + # Save merged embeddings + with open(merged_embedding_path, 'wb') as f: + pickle.dump(merged_embedding_data, f) + + result["total_chunks"] = total_chunks + result["total_dimensions"] = dimensions + result["success"] = True + + else: + result["error"] = "No valid embedding data found" + + except Exception as e: + result["error"] = f"Embedding merging failed: {str(e)}" + print(f"Error merging embeddings for group {group_name}: {str(e)}") + + return result + + +def merge_all_data_by_group(unique_id: str, group_name: str) -> Dict: + """Merge documents, paginations, and embeddings for a group.""" + + merge_results = { + "group_name": group_name, + "unique_id": unique_id, + "success": True, + "document_merge": None, + "pagination_merge": None, + "embedding_merge": None, + "errors": [] + } + + # Merge documents + document_result = merge_documents_by_group(unique_id, group_name) + merge_results["document_merge"] = document_result + + if not document_result["success"]: + merge_results["success"] = False + merge_results["errors"].append(f"Document merge failed: {document_result['error']}") + + # Merge paginations + pagination_result = merge_paginations_by_group(unique_id, group_name) + merge_results["pagination_merge"] = pagination_result + + if not pagination_result["success"]: + merge_results["success"] = False + merge_results["errors"].append(f"Pagination merge failed: {pagination_result['error']}") + + # Merge embeddings + embedding_result = merge_embeddings_by_group(unique_id, group_name) + merge_results["embedding_merge"] = embedding_result + + if not embedding_result["success"]: + merge_results["success"] = False + merge_results["errors"].append(f"Embedding merge failed: {embedding_result['error']}") + + return merge_results + + +def get_group_merge_status(unique_id: str, group_name: str) -> Dict: + """Get the status of merged data for a group.""" + + dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + + status = { + "group_name": group_name, + "unique_id": unique_id, + "dataset_dir_exists": os.path.exists(dataset_group_dir), + "document_exists": False, + "document_size": 0, + "pagination_exists": False, + "pagination_size": 0, + "embedding_exists": False, + "embedding_size": 0, + "merge_complete": False + } + + if os.path.exists(dataset_group_dir): + document_path = os.path.join(dataset_group_dir, "document.txt") + pagination_path = os.path.join(dataset_group_dir, "pagination.txt") + embedding_path = os.path.join(dataset_group_dir, "embedding.pkl") + + if os.path.exists(document_path): + status["document_exists"] = True + status["document_size"] = os.path.getsize(document_path) + + if os.path.exists(pagination_path): + status["pagination_exists"] = True + status["pagination_size"] = os.path.getsize(pagination_path) + + if os.path.exists(embedding_path): + status["embedding_exists"] = True + status["embedding_size"] = os.path.getsize(embedding_path) + + # Check if all files exist and are not empty + if (status["document_exists"] and status["document_size"] > 0 and + status["pagination_exists"] and status["pagination_size"] > 0 and + status["embedding_exists"] and status["embedding_size"] > 0): + status["merge_complete"] = True + + return status + + +def cleanup_dataset_group(unique_id: str, group_name: str) -> bool: + """Clean up merged dataset files for a group.""" + + dataset_group_dir = os.path.join("projects", unique_id, "dataset", group_name) + + try: + if os.path.exists(dataset_group_dir): + import shutil + shutil.rmtree(dataset_group_dir) + print(f"Cleaned up dataset group: {group_name}") + return True + else: + return True # Nothing to clean up + + except Exception as e: + print(f"Error cleaning up dataset group {group_name}: {str(e)}") + return False \ No newline at end of file diff --git a/utils/dataset_manager.py b/utils/dataset_manager.py index 8d140d7..cbef1a5 100644 --- a/utils/dataset_manager.py +++ b/utils/dataset_manager.py @@ -1,293 +1,169 @@ #!/usr/bin/env python3 """ Dataset management functions for organizing and processing datasets. +New implementation with per-file processing and group merging. """ import os -import shutil import json -import tempfile -import zipfile -from typing import Dict, List, Optional -from pathlib import Path +from typing import Dict, List -from utils.file_utils import ( - download_file, extract_zip_file, get_file_hash, - load_processed_files_log, save_processed_files_log, - remove_file_or_directory +# Import new modules +from utils.file_manager import ( + ensure_directories, sync_files_to_group, cleanup_orphaned_files, + get_group_files_list ) -from utils.excel_csv_processor import ( - is_excel_file, is_csv_file, process_excel_file, process_csv_file +from utils.single_file_processor import ( + process_single_file, check_file_already_processed +) +from utils.data_merger import ( + merge_all_data_by_group, cleanup_dataset_group ) async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]: - """Download or copy dataset files and organize them by key into dataset/{key}/document.txt. - Supports zip file extraction and combines content using '# Page' separators.""" + """ + Process dataset files with new architecture: + 1. Sync files to group directories + 2. Process each file individually + 3. Merge results by group + 4. Clean up orphaned files + """ if not files: return {} - - # Set up directories - project_dir = os.path.join("projects", unique_id) - files_dir = os.path.join(project_dir, "files") - dataset_dir = os.path.join(project_dir, "dataset") - - # Create directories if they don't exist - os.makedirs(files_dir, exist_ok=True) - os.makedirs(dataset_dir, exist_ok=True) - processed_files_by_key = {} + print(f"Starting new file processing for project: {unique_id}") - def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]: - """Extract zip file and return list of extracted txt/md files""" - extracted_files = [] - try: - import zipfile - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - zip_ref.extractall(extract_dir) - - # Find all extracted txt, md, xlsx, xls, and csv files - for root, dirs, files in os.walk(extract_dir): - for file in files: - if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')): - extracted_files.append(os.path.join(root, file)) - - print(f"Extracted {len(extracted_files)} txt/md/xlsx/csv files from {zip_path}") - return extracted_files - - except Exception as e: - print(f"Error extracting zip file {zip_path}: {str(e)}") - return [] + # Ensure project directories exist + ensure_directories(unique_id) - # Process each key and its associated files - for key, file_list in files.items(): - print(f"Processing key '{key}' with {len(file_list)} files") - processed_files_by_key[key] = [] - - # Create target directory for this key - target_dir = os.path.join(dataset_dir, key) - os.makedirs(target_dir, exist_ok=True) - - # Check if files are already processed before doing any work - document_file = os.path.join(target_dir, "document.txt") - pagination_file = os.path.join(target_dir, "pagination.txt") - embeddings_file = os.path.join(target_dir, "document_embeddings.pkl") - - already_processed = ( - os.path.exists(document_file) and - os.path.exists(pagination_file) and - os.path.exists(embeddings_file) and - os.path.getsize(document_file) > 0 and - os.path.getsize(pagination_file) > 0 and - os.path.getsize(embeddings_file) > 0 - ) - - if already_processed: - print(f" Skipping already processed files for {key}") - processed_files_by_key[key].append(document_file) - continue # Skip to next key - - # Read and combine all files for this key - combined_content = [] - pagination_lines = [] # Collect pagination lines from all files - all_processed_files = [] - - for file_path in file_list: - # Check if it's a URL (remote file) or local file - is_remote = file_path.startswith(('http://', 'https://')) - filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}" - - # Create temporary extraction directory for zip files - temp_extract_dir = None - files_to_process = [] - - try: - if is_remote: - # Handle remote file - temp_file = os.path.join(files_dir, filename) - print(f"Downloading {file_path} -> {temp_file}") - - success = await download_file(file_path, temp_file) - if not success: - print(f"Failed to download {file_path}") - continue - - # Check if it's a zip file - if filename.lower().endswith('.zip'): - temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_") - print(f"Extracting zip to temporary directory: {temp_extract_dir}") - - extracted_files = extract_zip_file_func(temp_file, temp_extract_dir) - files_to_process.extend(extracted_files) - - # Copy the zip file to project files directory - zip_dest = os.path.join(files_dir, filename) - shutil.copy2(temp_file, zip_dest) - print(f"Copied local zip file: {temp_file} -> {zip_dest}") - else: - files_to_process.append(temp_file) - - else: - # Handle local file - if not os.path.exists(file_path): - print(f"Local file not found: {file_path}") - continue - - if filename.lower().endswith('.zip'): - # Copy to project directory first - local_zip_path = os.path.join(files_dir, filename) - shutil.copy2(file_path, local_zip_path) - print(f"Copied local zip file: {file_path} -> {local_zip_path}") - - # Extract zip file - temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_") - print(f"Extracting local zip to temporary directory: {temp_extract_dir}") - - extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir) - files_to_process.extend(extracted_files) - else: - # Copy non-zip file directly - dest_file = os.path.join(files_dir, filename) - shutil.copy2(file_path, dest_file) - files_to_process.append(dest_file) - print(f"Copied local file: {file_path} -> {dest_file}") - - # Process all files (extracted from zip or single file) - for process_file_path in files_to_process: - try: - base_filename = os.path.basename(process_file_path) - - # Check if it's an Excel file - if is_excel_file(process_file_path): - print(f"Processing Excel file: {base_filename}") - document_content, excel_pagination_lines = process_excel_file(process_file_path) - - if document_content: - combined_content.append(f"# Page {base_filename}") - combined_content.append(document_content) - - # Collect pagination lines from Excel files - pagination_lines.extend(excel_pagination_lines) - - # Check if it's a CSV file - elif is_csv_file(process_file_path): - print(f"Processing CSV file: {base_filename}") - document_content, csv_pagination_lines = process_csv_file(process_file_path) - - if document_content: - combined_content.append(f"# Page {base_filename}") - combined_content.append(document_content) - - # Collect pagination lines from CSV files - pagination_lines.extend(csv_pagination_lines) - - # Handle text files (original logic) - else: - with open(process_file_path, 'r', encoding='utf-8') as f: - content = f.read().strip() - - if content: - # Add file content with page separator - combined_content.append(f"# Page {base_filename}") - combined_content.append(content) - - except Exception as e: - print(f"Failed to read file content from {process_file_path}: {str(e)}") - - except Exception as e: - print(f"Error processing file {file_path}: {str(e)}") - - finally: - # Clean up temporary extraction directory - if temp_extract_dir and os.path.exists(temp_extract_dir): - try: - shutil.rmtree(temp_extract_dir) - print(f"Cleaned up temporary directory: {temp_extract_dir}") - except Exception as e: - print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}") - - # Write combined content to dataset/{key}/document.txt - if combined_content: - try: - with open(document_file, 'w', encoding='utf-8') as f: - f.write('\n\n'.join(combined_content)) - print(f"Created combined document: {document_file}") - - # Generate pagination and embeddings for the combined document - try: - import sys - sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) - from embedding import embed_document - - # Generate pagination file from collected pagination lines - # For Excel/CSV files, use the pagination format we collected - # For text files, fall back to the original pagination generation - if pagination_lines: - print(f" Writing pagination data from Excel/CSV files for {key}") - with open(pagination_file, 'w', encoding='utf-8') as f: - for line in pagination_lines: - if line.strip(): - f.write(f"{line}\n") - print(f" Generated {len(pagination_lines)} pagination lines") - else: - # For text-only files, use the original pagination generation - from embedding import split_document_by_pages - print(f" Generating pagination from text files for {key}") - pages = split_document_by_pages(str(document_file), str(pagination_file)) - print(f" Generated {len(pages)} pages") - - # Generate embeddings - print(f" Generating embeddings for {key}") - - # Use paragraph chunking strategy with default settings - embedding_data = embed_document( - str(document_file), - str(embeddings_file), - chunking_strategy='paragraph' - ) - - if embedding_data: - print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks") - # Add to processed files only after successful embedding - processed_files_by_key[key].append(document_file) - else: - print(f" Failed to generate embeddings") - - except Exception as e: - print(f" Failed to generate pagination/embeddings for {key}: {str(e)}") - - except Exception as e: - print(f"Failed to write combined document: {str(e)}") + # Step 1: Sync files to group directories + print("Step 1: Syncing files to group directories...") + synced_files, failed_files = sync_files_to_group(unique_id, files) - # Load existing log - processed_log = load_processed_files_log(unique_id) + # Step 2: Detect changes and cleanup orphaned files + from utils.file_manager import detect_file_changes + changes = detect_file_changes(unique_id, files) - # Update log with newly processed files - for key, file_list in files.items(): - if key not in processed_log: - processed_log[key] = {} + if any(changes["removed"].values()): + print("Step 2: Cleaning up orphaned files...") + removed_files = cleanup_orphaned_files(unique_id, changes) + print(f"Removed orphaned files: {removed_files}") + + # Step 3: Process individual files + print("Step 3: Processing individual files...") + processed_files_by_group = {} + processing_results = {} + + for group_name, file_list in files.items(): + processed_files_by_group[group_name] = [] + processing_results[group_name] = [] for file_path in file_list: filename = os.path.basename(file_path) - processed_log[key][filename] = { - "original_path": file_path, - "processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0), - "status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed" - } + + # Get local file path + local_path = os.path.join("projects", unique_id, "files", group_name, filename) + + # Skip if file doesn't exist (might be remote file that failed to download) + if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')): + print(f"Skipping non-existent file: {filename}") + continue + + # Check if already processed + if check_file_already_processed(unique_id, group_name, filename): + print(f"Skipping already processed file: {filename}") + processed_files_by_group[group_name].append(filename) + processing_results[group_name].append({ + "filename": filename, + "status": "existing" + }) + continue + + # Process the file + print(f"Processing file: {filename} (group: {group_name})") + result = await process_single_file(unique_id, group_name, filename, file_path, local_path) + processing_results[group_name].append(result) + + if result["success"]: + processed_files_by_group[group_name].append(filename) + print(f" Successfully processed {filename}") + else: + print(f" Failed to process {filename}: {result['error']}") - # Save the updated processed log - save_processed_files_log(unique_id, processed_log) + # Step 4: Merge results by group + print("Step 4: Merging results by group...") + merge_results = {} - return processed_files_by_key + for group_name in processed_files_by_group.keys(): + # Get all files in the group (including existing ones) + group_files = get_group_files_list(unique_id, group_name) + + if group_files: + print(f"Merging group: {group_name} with {len(group_files)} files") + merge_result = merge_all_data_by_group(unique_id, group_name) + merge_results[group_name] = merge_result + + if merge_result["success"]: + print(f" Successfully merged group {group_name}") + else: + print(f" Failed to merge group {group_name}: {merge_result['errors']}") + + # Step 5: Save processing log + print("Step 5: Saving processing log...") + await save_processing_log(unique_id, files, synced_files, processing_results, merge_results) + + print(f"File processing completed for project: {unique_id}") + return processed_files_by_group + + +async def save_processing_log( + unique_id: str, + requested_files: Dict[str, List[str]], + synced_files: Dict, + processing_results: Dict, + merge_results: Dict +): + """Save comprehensive processing log.""" + + log_data = { + "unique_id": unique_id, + "timestamp": str(os.path.getmtime("projects") if os.path.exists("projects") else 0), + "requested_files": requested_files, + "synced_files": synced_files, + "processing_results": processing_results, + "merge_results": merge_results, + "summary": { + "total_groups": len(requested_files), + "total_files_requested": sum(len(files) for files in requested_files.values()), + "total_files_processed": sum( + len([r for r in results if r.get("success", False)]) + for results in processing_results.values() + ), + "total_groups_merged": len([r for r in merge_results.values() if r.get("success", False)]) + } + } + + log_file_path = os.path.join("projects", unique_id, "processing_log.json") + try: + with open(log_file_path, 'w', encoding='utf-8') as f: + json.dump(log_data, f, ensure_ascii=False, indent=2) + print(f"Processing log saved to: {log_file_path}") + except Exception as e: + print(f"Error saving processing log: {str(e)}") def generate_dataset_structure(unique_id: str) -> str: """Generate a string representation of the dataset structure""" - dataset_dir = os.path.join("projects", unique_id, "dataset") + project_dir = os.path.join("projects", unique_id) structure = [] def add_directory_contents(dir_path: str, prefix: str = ""): try: + if not os.path.exists(dir_path): + structure.append(f"{prefix}└── (not found)") + return + items = sorted(os.listdir(dir_path)) for i, item in enumerate(items): item_path = os.path.join(dir_path, item) @@ -301,22 +177,109 @@ def generate_dataset_structure(unique_id: str) -> str: except Exception as e: structure.append(f"{prefix}└── Error: {str(e)}") - if os.path.exists(dataset_dir): - structure.append(f"dataset/") - add_directory_contents(dataset_dir, "") - else: - structure.append("dataset/ (not found)") + # Add files directory structure + files_dir = os.path.join(project_dir, "files") + structure.append("files/") + add_directory_contents(files_dir, "") + + # Add processed directory structure + processed_dir = os.path.join(project_dir, "processed") + structure.append("\nprocessed/") + add_directory_contents(processed_dir, "") + + # Add dataset directory structure + dataset_dir = os.path.join(project_dir, "dataset") + structure.append("\ndataset/") + add_directory_contents(dataset_dir, "") return "\n".join(structure) +def get_processing_status(unique_id: str) -> Dict: + """Get comprehensive processing status for a project.""" + + project_dir = os.path.join("projects", unique_id) + + if not os.path.exists(project_dir): + return { + "project_exists": False, + "unique_id": unique_id + } + + status = { + "project_exists": True, + "unique_id": unique_id, + "directories": { + "files": os.path.exists(os.path.join(project_dir, "files")), + "processed": os.path.exists(os.path.join(project_dir, "processed")), + "dataset": os.path.exists(os.path.join(project_dir, "dataset")) + }, + "groups": {}, + "processing_log_exists": os.path.exists(os.path.join(project_dir, "processing_log.json")) + } + + # Check each group's status + files_dir = os.path.join(project_dir, "files") + if os.path.exists(files_dir): + for group_name in os.listdir(files_dir): + group_path = os.path.join(files_dir, group_name) + if os.path.isdir(group_path): + status["groups"][group_name] = { + "files_count": len([ + f for f in os.listdir(group_path) + if os.path.isfile(os.path.join(group_path, f)) + ]), + "merge_status": "pending" + } + + # Check merge status for each group + dataset_dir = os.path.join(project_dir, "dataset") + if os.path.exists(dataset_dir): + for group_name in os.listdir(dataset_dir): + group_path = os.path.join(dataset_dir, group_name) + if os.path.isdir(group_path): + if group_name in status["groups"]: + # Check if merge is complete + document_path = os.path.join(group_path, "document.txt") + pagination_path = os.path.join(group_path, "pagination.txt") + embedding_path = os.path.join(group_path, "embedding.pkl") + + if (os.path.exists(document_path) and os.path.exists(pagination_path) and + os.path.exists(embedding_path)): + status["groups"][group_name]["merge_status"] = "completed" + else: + status["groups"][group_name]["merge_status"] = "incomplete" + else: + status["groups"][group_name] = { + "files_count": 0, + "merge_status": "completed" + } + + return status + + def remove_dataset_directory(unique_id: str, filename_without_ext: str): - """Remove a specific dataset directory""" - dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext) - remove_file_or_directory(dataset_path) + """Remove a specific dataset directory (deprecated - use new structure)""" + # This function is kept for compatibility but delegates to new structure + dataset_path = os.path.join("projects", unique_id, "processed", filename_without_ext) + if os.path.exists(dataset_path): + import shutil + shutil.rmtree(dataset_path) def remove_dataset_directory_by_key(unique_id: str, key: str): - """Remove dataset directory by key""" - dataset_path = os.path.join("projects", unique_id, "dataset", key) - remove_file_or_directory(dataset_path) \ No newline at end of file + """Remove dataset directory by key (group name)""" + # Remove files directory + files_group_path = os.path.join("projects", unique_id, "files", key) + if os.path.exists(files_group_path): + import shutil + shutil.rmtree(files_group_path) + + # Remove processed directory + processed_group_path = os.path.join("projects", unique_id, "processed", key) + if os.path.exists(processed_group_path): + import shutil + shutil.rmtree(processed_group_path) + + # Remove dataset directory + cleanup_dataset_group(unique_id, key) \ No newline at end of file diff --git a/utils/file_manager.py b/utils/file_manager.py new file mode 100644 index 0000000..a38d813 --- /dev/null +++ b/utils/file_manager.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +""" +File management functions for syncing files and detecting changes. +""" + +import os +import json +import shutil +from typing import Dict, List, Set, Tuple +from pathlib import Path + + +def get_existing_files(unique_id: str) -> Dict[str, Set[str]]: + """Get existing files organized by group.""" + existing_files = {} + files_dir = os.path.join("projects", unique_id, "files") + + if not os.path.exists(files_dir): + return existing_files + + for group_name in os.listdir(files_dir): + group_dir = os.path.join(files_dir, group_name) + if os.path.isdir(group_dir): + existing_files[group_name] = set() + for file_name in os.listdir(group_dir): + file_path = os.path.join(group_dir, file_name) + if os.path.isfile(file_path): + existing_files[group_name].add(file_name) + + return existing_files + + +def detect_file_changes(unique_id: str, new_files: Dict[str, List[str]]) -> Dict: + """Detect file changes: added, removed, and existing files.""" + existing_files = get_existing_files(unique_id) + + changes = { + "added": {}, + "removed": {}, + "existing": {}, + "removed_groups": set() # Track completely removed groups + } + + # Convert new_files to sets for comparison + new_files_sets = {} + for group, file_list in new_files.items(): + # Extract filenames from paths + filenames = set() + for file_path in file_list: + filename = os.path.basename(file_path) + filenames.add(filename) + new_files_sets[group] = filenames + + # Detect added and existing files + for group, new_filenames in new_files_sets.items(): + existing_filenames = existing_files.get(group, set()) + + changes["added"][group] = new_filenames - existing_filenames + changes["existing"][group] = new_filenames & existing_filenames + + # For removed files, check against existing files in this group + if group not in changes["removed"]: + changes["removed"][group] = set() + + # Detect removed files (files that exist but not in new request) + for group, existing_filenames in existing_files.items(): + if group in new_files_sets: + # Group exists in new request, check for individual file removals + new_filenames = new_files_sets[group] + changes["removed"][group] = existing_filenames - new_filenames + else: + # Group completely removed from new request + changes["removed_groups"].add(group) + changes["removed"][group] = existing_filenames + + return changes + + +def sync_files_to_group(unique_id: str, files: Dict[str, List[str]]) -> Tuple[Dict, Dict]: + """ + Sync files to group directories and return sync results. + + Returns: + Tuple of (synced_files, failed_files) + """ + project_dir = os.path.join("projects", unique_id) + files_dir = os.path.join(project_dir, "files") + + # Create files directory + os.makedirs(files_dir, exist_ok=True) + + # Detect changes first + changes = detect_file_changes(unique_id, files) + + synced_files = {} + failed_files = {} + + # Process each group + for group_name, file_list in files.items(): + group_dir = os.path.join(files_dir, group_name) + os.makedirs(group_dir, exist_ok=True) + + synced_files[group_name] = [] + failed_files[group_name] = [] + + # Only process files that are new or need updating + filenames_to_sync = changes["added"].get(group_name, set()) + + for file_path in file_list: + filename = os.path.basename(file_path) + + # Skip if file already exists and is in the "existing" category + if filename in changes["existing"].get(group_name, set()): + synced_files[group_name].append({ + "original_path": file_path, + "local_path": os.path.join(group_dir, filename), + "status": "existing" + }) + continue + + # Only process files that are actually new + if filename not in filenames_to_sync: + continue + + try: + # Check if it's a URL (remote file) or local file + is_remote = file_path.startswith(('http://', 'https://')) + + if is_remote: + # For remote files, we'll download them in the processing stage + synced_files[group_name].append({ + "original_path": file_path, + "local_path": os.path.join(group_dir, filename), + "status": "pending_download" + }) + else: + # Handle local file + if not os.path.exists(file_path): + failed_files[group_name].append({ + "original_path": file_path, + "error": "Local file not found" + }) + continue + + # Copy file to group directory + dest_path = os.path.join(group_dir, filename) + shutil.copy2(file_path, dest_path) + + synced_files[group_name].append({ + "original_path": file_path, + "local_path": dest_path, + "status": "copied" + }) + + except Exception as e: + failed_files[group_name].append({ + "original_path": file_path, + "error": str(e) + }) + + return synced_files, failed_files + + +def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str]]: + """Remove files and their processing results that are no longer needed.""" + removed_files = {} + project_dir = os.path.join("projects", unique_id) + + # Handle individual file removals + for group_name, removed_filenames in changes["removed"].items(): + if not removed_filenames: + continue + + removed_files[group_name] = [] + + for filename in removed_filenames: + try: + # Remove original file + file_path = os.path.join(project_dir, "files", group_name, filename) + if os.path.exists(file_path): + os.remove(file_path) + removed_files[group_name].append(f"file: {filename}") + + # Remove processed directory for this file + processed_dir = os.path.join(project_dir, "processed", group_name, + Path(filename).stem) + if os.path.exists(processed_dir): + shutil.rmtree(processed_dir) + removed_files[group_name].append(f"processed: {Path(filename).stem}") + + except Exception as e: + print(f"Error cleaning up {filename}: {str(e)}") + + # Handle completely removed groups + for group_name in changes.get("removed_groups", set()): + print(f"Cleaning up completely removed group: {group_name}") + removed_files[group_name] = [] + + try: + # Remove entire files/group directory + files_group_dir = os.path.join(project_dir, "files", group_name) + if os.path.exists(files_group_dir): + shutil.rmtree(files_group_dir) + removed_files[group_name].append("files group directory") + + # Remove entire processed/group directory + processed_group_dir = os.path.join(project_dir, "processed", group_name) + if os.path.exists(processed_group_dir): + shutil.rmtree(processed_group_dir) + removed_files[group_name].append("processed group directory") + + # Remove entire dataset/group directory + dataset_group_dir = os.path.join(project_dir, "dataset", group_name) + if os.path.exists(dataset_group_dir): + shutil.rmtree(dataset_group_dir) + removed_files[group_name].append("dataset group directory") + + print(f"Completely removed group '{group_name}' and all its data") + + except Exception as e: + print(f"Error cleaning up group {group_name}: {str(e)}") + + return removed_files + + +def get_group_files_list(unique_id: str, group_name: str) -> List[str]: + """Get list of files in a specific group.""" + group_dir = os.path.join("projects", unique_id, "files", group_name) + + if not os.path.exists(group_dir): + return [] + + files = [] + for filename in os.listdir(group_dir): + file_path = os.path.join(group_dir, filename) + if os.path.isfile(file_path): + files.append(filename) + + return sorted(files) + + +def ensure_directories(unique_id: str): + """Ensure all necessary directories exist for a project.""" + base_dir = os.path.join("projects", unique_id) + directories = [ + "files", + "processed", + "dataset" + ] + + for dir_name in directories: + dir_path = os.path.join(base_dir, dir_name) + os.makedirs(dir_path, exist_ok=True) \ No newline at end of file diff --git a/utils/file_utils.py b/utils/file_utils.py index d7e47a5..3f6afe2 100644 --- a/utils/file_utils.py +++ b/utils/file_utils.py @@ -115,7 +115,7 @@ def load_processed_files_log(unique_id: str) -> Dict[str, Dict]: def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]): - """Save processed files log for a project""" + """Save processed files log for a project (legacy function)""" log_file = os.path.join("projects", unique_id, "processed_files.json") try: os.makedirs(os.path.dirname(log_file), exist_ok=True) @@ -123,4 +123,182 @@ def save_processed_files_log(unique_id: str, processed_log: Dict[str, Dict]): with open(log_file, 'w', encoding='utf-8') as f: json.dump(processed_log, f, ensure_ascii=False, indent=2) except Exception as e: - print(f"Error saving processed files log: {e}") \ No newline at end of file + print(f"Error saving processed files log: {e}") + + +def get_processing_log(unique_id: str) -> Dict: + """Get the comprehensive processing log for a project""" + log_file = os.path.join("projects", unique_id, "processing_log.json") + if os.path.exists(log_file): + try: + import json + with open(log_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"Error loading processing log: {e}") + return {} + + +def save_project_status(unique_id: str, status: Dict): + """Save project processing status""" + status_file = os.path.join("projects", unique_id, "status.json") + try: + os.makedirs(os.path.dirname(status_file), exist_ok=True) + import json + with open(status_file, 'w', encoding='utf-8') as f: + json.dump(status, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"Error saving project status: {e}") + + +def load_project_status(unique_id: str) -> Dict: + """Load project processing status""" + status_file = os.path.join("projects", unique_id, "status.json") + if os.path.exists(status_file): + try: + import json + with open(status_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"Error loading project status: {e}") + return {} + + +def get_file_metadata(file_path: str) -> Dict: + """Get metadata for a file""" + try: + if not os.path.exists(file_path): + return {"exists": False} + + stat = os.stat(file_path) + return { + "exists": True, + "size": stat.st_size, + "modified_time": stat.st_mtime, + "created_time": stat.st_ctime, + "is_file": os.path.isfile(file_path), + "is_directory": os.path.isdir(file_path) + } + except Exception as e: + return {"exists": False, "error": str(e)} + + +def update_file_processing_status(unique_id: str, group_name: str, filename: str, status: Dict): + """Update processing status for a specific file""" + status_file = os.path.join("projects", unique_id, "file_status.json") + + try: + # Load existing status + if os.path.exists(status_file): + import json + with open(status_file, 'r', encoding='utf-8') as f: + file_status = json.load(f) + else: + file_status = {} + + # Ensure structure exists + if group_name not in file_status: + file_status[group_name] = {} + + # Update status + file_status[group_name][filename] = { + **status, + "updated_at": str(os.path.getmtime(file_path) if os.path.exists(file_path) else 0) + } + + # Save updated status + os.makedirs(os.path.dirname(status_file), exist_ok=True) + with open(status_file, 'w', encoding='utf-8') as f: + json.dump(file_status, f, ensure_ascii=False, indent=2) + + except Exception as e: + print(f"Error updating file processing status: {e}") + + +def get_file_processing_status(unique_id: str, group_name: str = None, filename: str = None) -> Dict: + """Get processing status for files""" + status_file = os.path.join("projects", unique_id, "file_status.json") + + if not os.path.exists(status_file): + return {} + + try: + import json + with open(status_file, 'r', encoding='utf-8') as f: + file_status = json.load(f) + + # Filter by group and filename if provided + if group_name: + if group_name not in file_status: + return {} + + if filename: + return file_status[group_name].get(filename, {}) + else: + return file_status[group_name] + + return file_status + + except Exception as e: + print(f"Error getting file processing status: {e}") + return {} + + +def calculate_directory_size(directory_path: str) -> int: + """Calculate total size of a directory recursively""" + total_size = 0 + try: + for dirpath, dirnames, filenames in os.walk(directory_path): + for filename in filenames: + file_path = os.path.join(dirpath, filename) + if os.path.exists(file_path): + total_size += os.path.getsize(file_path) + except Exception as e: + print(f"Error calculating directory size: {e}") + + return total_size + + +def get_project_statistics(unique_id: str) -> Dict: + """Get comprehensive statistics for a project""" + project_dir = os.path.join("projects", unique_id) + + if not os.path.exists(project_dir): + return {"project_exists": False} + + stats = { + "project_exists": True, + "unique_id": unique_id, + "directories": {}, + "total_files": 0, + "total_size": 0 + } + + # Check each directory + directories = ["files", "processed", "dataset"] + + for dir_name in directories: + dir_path = os.path.join(project_dir, dir_name) + if os.path.exists(dir_path): + dir_size = calculate_directory_size(dir_path) + dir_files = 0 + + for root, dirs, files in os.walk(dir_path): + dir_files += len(files) + + stats["directories"][dir_name] = { + "exists": True, + "size": dir_size, + "files": dir_files + } + + stats["total_files"] += dir_files + stats["total_size"] += dir_size + else: + stats["directories"][dir_name] = { + "exists": False, + "size": 0, + "files": 0 + } + + return stats \ No newline at end of file diff --git a/utils/single_file_processor.py b/utils/single_file_processor.py new file mode 100644 index 0000000..cd1d484 --- /dev/null +++ b/utils/single_file_processor.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +""" +Single file processing functions for handling individual files. +""" + +import os +import tempfile +import zipfile +from typing import Dict, List, Tuple, Optional +from pathlib import Path + +from utils.file_utils import download_file + +# Try to import excel/csv processor, but handle if dependencies are missing +try: + from utils.excel_csv_processor import ( + is_excel_file, is_csv_file, process_excel_file, process_csv_file + ) + EXCEL_CSV_SUPPORT = True +except ImportError as e: + print(f"Excel/CSV processing not available: {e}") + EXCEL_CSV_SUPPORT = False + + # Fallback functions + def is_excel_file(file_path): + return file_path.lower().endswith(('.xlsx', '.xls')) + + def is_csv_file(file_path): + return file_path.lower().endswith('.csv') + + def process_excel_file(file_path): + return "", [] + + def process_csv_file(file_path): + return "", [] + + +async def process_single_file( + unique_id: str, + group_name: str, + filename: str, + original_path: str, + local_path: str +) -> Dict: + """ + Process a single file and generate document.txt, pagination.txt, and embedding.pkl. + + Returns: + Dict with processing results and file paths + """ + # Create output directory for this file + filename_stem = Path(filename).stem + output_dir = os.path.join("projects", unique_id, "processed", group_name, filename_stem) + os.makedirs(output_dir, exist_ok=True) + + result = { + "success": False, + "filename": filename, + "group": group_name, + "output_dir": output_dir, + "document_path": os.path.join(output_dir, "document.txt"), + "pagination_path": os.path.join(output_dir, "pagination.txt"), + "embedding_path": os.path.join(output_dir, "embedding.pkl"), + "error": None, + "content_size": 0, + "pagination_lines": 0, + "embedding_chunks": 0 + } + + try: + # Download file if it's remote and not yet downloaded + if original_path.startswith(('http://', 'https://')): + if not os.path.exists(local_path): + print(f"Downloading {original_path} -> {local_path}") + success = await download_file(original_path, local_path) + if not success: + result["error"] = "Failed to download file" + return result + + # Extract content from file + content, pagination_lines = await extract_file_content(local_path, filename) + + if not content or not content.strip(): + result["error"] = "No content extracted from file" + return result + + # Write document.txt + with open(result["document_path"], 'w', encoding='utf-8') as f: + f.write(content) + result["content_size"] = len(content) + + # Write pagination.txt + if pagination_lines: + with open(result["pagination_path"], 'w', encoding='utf-8') as f: + for line in pagination_lines: + if line.strip(): + f.write(f"{line}\n") + result["pagination_lines"] = len(pagination_lines) + else: + # Generate pagination from text content + pagination_lines = generate_pagination_from_text(result["document_path"], + result["pagination_path"]) + result["pagination_lines"] = len(pagination_lines) + + # Generate embeddings + try: + embedding_chunks = await generate_embeddings_for_file( + result["document_path"], result["embedding_path"] + ) + result["embedding_chunks"] = len(embedding_chunks) if embedding_chunks else 0 + result["success"] = True + + except Exception as e: + result["error"] = f"Embedding generation failed: {str(e)}" + print(f"Failed to generate embeddings for {filename}: {str(e)}") + + except Exception as e: + result["error"] = f"File processing failed: {str(e)}" + print(f"Error processing file {filename}: {str(e)}") + + return result + + +async def extract_file_content(file_path: str, filename: str) -> Tuple[str, List[str]]: + """Extract content from various file formats.""" + + # Handle zip files + if filename.lower().endswith('.zip'): + return await extract_from_zip(file_path, filename) + + # Handle Excel files + elif is_excel_file(file_path): + return await extract_from_excel(file_path, filename) + + # Handle CSV files + elif is_csv_file(file_path): + return await extract_from_csv(file_path, filename) + + # Handle text files + else: + return await extract_from_text(file_path, filename) + + +async def extract_from_zip(zip_path: str, filename: str) -> Tuple[str, List[str]]: + """Extract content from zip file.""" + content_parts = [] + pagination_lines = [] + + try: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + # Extract to temporary directory + temp_dir = tempfile.mkdtemp(prefix=f"extract_{Path(filename).stem}_") + zip_ref.extractall(temp_dir) + + # Process extracted files + for root, dirs, files in os.walk(temp_dir): + for file in files: + if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')): + file_path = os.path.join(root, file) + + try: + file_content, file_pagination = await extract_file_content(file_path, file) + + if file_content: + content_parts.append(f"# Page {file}") + content_parts.append(file_content) + pagination_lines.extend(file_pagination) + + except Exception as e: + print(f"Error processing extracted file {file}: {str(e)}") + + # Clean up temporary directory + import shutil + shutil.rmtree(temp_dir) + + except Exception as e: + print(f"Error extracting zip file {filename}: {str(e)}") + return "", [] + + return '\n\n'.join(content_parts), pagination_lines + + +async def extract_from_excel(file_path: str, filename: str) -> Tuple[str, List[str]]: + """Extract content from Excel file.""" + try: + document_content, pagination_lines = process_excel_file(file_path) + + if document_content: + content = f"# Page {filename}\n{document_content}" + return content, pagination_lines + else: + return "", [] + + except Exception as e: + print(f"Error processing Excel file {filename}: {str(e)}") + return "", [] + + +async def extract_from_csv(file_path: str, filename: str) -> Tuple[str, List[str]]: + """Extract content from CSV file.""" + try: + document_content, pagination_lines = process_csv_file(file_path) + + if document_content: + content = f"# Page {filename}\n{document_content}" + return content, pagination_lines + else: + return "", [] + + except Exception as e: + print(f"Error processing CSV file {filename}: {str(e)}") + return "", [] + + +async def extract_from_text(file_path: str, filename: str) -> Tuple[str, List[str]]: + """Extract content from text file.""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read().strip() + + if content: + return content, [] + else: + return "", [] + + except Exception as e: + print(f"Error reading text file {filename}: {str(e)}") + return "", [] + + +def generate_pagination_from_text(document_path: str, pagination_path: str) -> List[str]: + """Generate pagination from text document.""" + try: + # Import embedding module for pagination + import sys + sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) + from embedding import split_document_by_pages + + pages = split_document_by_pages(str(document_path), str(pagination_path)) + + # Return pagination lines + pagination_lines = [] + with open(pagination_path, 'r', encoding='utf-8') as f: + for line in f: + if line.strip(): + pagination_lines.append(line.strip()) + + return pagination_lines + + except Exception as e: + print(f"Error generating pagination from text: {str(e)}") + return [] + + +async def generate_embeddings_for_file(document_path: str, embedding_path: str) -> Optional[List]: + """Generate embeddings for a document.""" + try: + # Import embedding module + import sys + sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding')) + from embedding import embed_document + + # Generate embeddings using paragraph chunking + embedding_data = embed_document( + str(document_path), + str(embedding_path), + chunking_strategy='paragraph' + ) + + if embedding_data and 'chunks' in embedding_data: + return embedding_data['chunks'] + else: + return None + + except Exception as e: + print(f"Error generating embeddings: {str(e)}") + return None + + +def check_file_already_processed(unique_id: str, group_name: str, filename: str) -> bool: + """Check if a file has already been processed.""" + filename_stem = Path(filename).stem + output_dir = os.path.join("projects", unique_id, "processed", group_name, filename_stem) + + document_path = os.path.join(output_dir, "document.txt") + pagination_path = os.path.join(output_dir, "pagination.txt") + embedding_path = os.path.join(output_dir, "embedding.pkl") + + # Check if all files exist and are not empty + if (os.path.exists(document_path) and os.path.exists(pagination_path) and + os.path.exists(embedding_path)): + + if (os.path.getsize(document_path) > 0 and os.path.getsize(pagination_path) > 0 and + os.path.getsize(embedding_path) > 0): + return True + + return False \ No newline at end of file