#!/usr/bin/env python3 """ Excel and CSV file processor for converting data to document.txt and pagination.txt formats. """ import os import pandas as pd import logging from typing import List, Dict, Any, Tuple # Configure logging logger = logging.getLogger('app') def read_excel_sheets(file_path: str) -> Dict[str, List[Dict[str, Any]]]: """ Read all sheets from an Excel file. Args: file_path: Path to the Excel file Returns: Dict: Keys are sheet names, values are lists of row data dictionaries """ try: # Read all sheets excel_file = pd.ExcelFile(file_path) sheets_data = {} for sheet_name in excel_file.sheet_names: try: # Read data for each sheet df = pd.read_excel(file_path, sheet_name=sheet_name) # Convert to list of dicts, skipping NaN values sheet_data = [] for _, row in df.iterrows(): # Convert NaN values to empty strings row_dict = {} for col in df.columns: value = row[col] if pd.isna(value): value = "" elif isinstance(value, (int, float)): value = str(value) else: value = str(value).strip() row_dict[str(col)] = value # Only add non-empty rows if any(v.strip() for v in row_dict.values()): sheet_data.append(row_dict) sheets_data[sheet_name] = sheet_data logger.info(f"Read Excel sheet '{sheet_name}': {len(sheet_data)} rows of data") except Exception as e: logger.error(f"Failed to read Excel sheet '{sheet_name}': {str(e)}") continue return sheets_data except Exception as e: logger.error(f"Failed to read Excel file: {str(e)}") return {} def read_csv_file(file_path: str, encoding: str = 'utf-8') -> List[Dict[str, Any]]: """ Read a CSV file. Args: file_path: Path to the CSV file encoding: File encoding Returns: List: CSV data as a list of dictionaries """ try: # Try different encodings encodings_to_try = [encoding, 'utf-8', 'gbk', 'gb2312', 'utf-8-sig'] for enc in encodings_to_try: try: df = pd.read_csv(file_path, encoding=enc) break except UnicodeDecodeError: continue else: # If all encodings fail, use default encoding and ignore errors df = pd.read_csv(file_path, encoding='utf-8', errors='ignore') # Convert to list of dicts, skipping NaN values csv_data = [] for _, row in df.iterrows(): # Convert NaN values to empty strings row_dict = {} for col in df.columns: value = row[col] if pd.isna(value): value = "" elif isinstance(value, (int, float)): value = str(value) else: value = str(value).strip() row_dict[str(col)] = value # Only add non-empty rows if any(v.strip() for v in row_dict.values()): csv_data.append(row_dict) logger.info(f"Read CSV file: {len(csv_data)} rows of data") return csv_data except Exception as e: logger.error(f"Failed to read CSV file: {str(e)}") return [] def convert_to_markdown_format(data_list: List[Dict[str, Any]], sheet_name: str = "") -> str: """ Convert data to markdown format. Args: data_list: List of data dictionaries sheet_name: Sheet name (optional) Returns: str: Markdown formatted text """ if not data_list: return "" markdown_content = [] # Add sheet title if sheet_name: markdown_content.append(f"# Sheet: {sheet_name}") markdown_content.append("") for i, row_data in enumerate(data_list, 1): # Generate markdown for each row row_markdown = [] for key, value in row_data.items(): if value and value.strip(): # Only include non-empty values row_markdown.append(f"{key}: {value.strip()}") if row_markdown: markdown_content.extend(row_markdown) # Add separator between rows, except for the last row if i < len(data_list): markdown_content.append("---") markdown_content.append("") return "\n".join(markdown_content) def convert_to_pagination_format(data_list: List[Dict[str, Any]]) -> List[str]: """ Convert data to key:value;key:value format. Args: data_list: List of data dictionaries Returns: List: Pagination formatted text lines """ if not data_list: return [] pagination_lines = [] for row_data in data_list: # Generate pagination format for each row row_pairs = [] for key, value in row_data.items(): if value and value.strip(): # Only include non-empty values # Remove semicolons and newlines from values to avoid format issues clean_value = str(value).replace(';', ',').replace('\n', ' ').strip() if clean_value: row_pairs.append(f"{key}:{clean_value}") if row_pairs: pagination_line = ";".join(row_pairs) pagination_lines.append(pagination_line) return pagination_lines def process_excel_file(file_path: str) -> Tuple[str, List[str]]: """ Process an Excel file and generate document.txt and pagination.txt content. Args: file_path: Path to the Excel file Returns: Tuple: (document_content, pagination_lines) """ sheets_data = read_excel_sheets(file_path) document_content_parts = [] pagination_lines = [] # Process each sheet for sheet_name, sheet_data in sheets_data.items(): if sheet_data: # Generate markdown format document content markdown_content = convert_to_markdown_format(sheet_data, sheet_name) if markdown_content: document_content_parts.append(markdown_content) # Generate pagination format content sheet_pagination_lines = convert_to_pagination_format(sheet_data) pagination_lines.extend(sheet_pagination_lines) # Merge document content from all sheets document_content = "\n\n".join(document_content_parts) return document_content, pagination_lines def process_csv_file(file_path: str) -> Tuple[str, List[str]]: """ Process a CSV file and generate document.txt and pagination.txt content. Args: file_path: Path to the CSV file Returns: Tuple: (document_content, pagination_lines) """ csv_data = read_csv_file(file_path) if not csv_data: return "", [] # Generate markdown format document content document_content = convert_to_markdown_format(csv_data) # Generate pagination format content pagination_lines = convert_to_pagination_format(csv_data) return document_content, pagination_lines def is_excel_file(file_path: str) -> bool: """Check if the file is an Excel file.""" return file_path.lower().endswith(('.xlsx', '.xls')) def is_csv_file(file_path: str) -> bool: """Check if the file is a CSV file.""" return file_path.lower().endswith('.csv')