catalog-agent/utils/dataset_manager.py
2025-10-17 23:17:35 +08:00

283 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Dataset management functions for organizing and processing datasets.
"""
import os
import shutil
import json
import tempfile
import zipfile
from typing import Dict, List, Optional
from pathlib import Path
from utils.file_utils import (
download_file, extract_zip_file, get_file_hash,
load_processed_files_log, save_processed_files_log,
remove_file_or_directory
)
async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""Download or copy dataset files and organize them by key into dataset/{key}/document.txt.
Supports zip file extraction and combines content using '# Page' separators."""
if not files:
return {}
# Set up directories
project_dir = os.path.join("projects", unique_id)
files_dir = os.path.join(project_dir, "files")
dataset_dir = os.path.join(project_dir, "dataset")
# Create directories if they don't exist
os.makedirs(files_dir, exist_ok=True)
os.makedirs(dataset_dir, exist_ok=True)
processed_files_by_key = {}
def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]:
"""Extract zip file and return list of extracted txt/md files"""
extracted_files = []
try:
import zipfile
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Find all extracted txt and md files
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith(('.txt', '.md')):
extracted_files.append(os.path.join(root, file))
print(f"Extracted {len(extracted_files)} txt/md files from {zip_path}")
return extracted_files
except Exception as e:
print(f"Error extracting zip file {zip_path}: {str(e)}")
return []
# Process each key and its associated files
for key, file_list in files.items():
print(f"Processing key '{key}' with {len(file_list)} files")
processed_files_by_key[key] = []
# Create target directory for this key
target_dir = os.path.join(dataset_dir, key)
os.makedirs(target_dir, exist_ok=True)
# Check if files are already processed before doing any work
document_file = os.path.join(target_dir, "document.txt")
pagination_file = os.path.join(target_dir, "pagination.txt")
embeddings_file = os.path.join(target_dir, "document_embeddings.pkl")
already_processed = (
os.path.exists(document_file) and
os.path.exists(pagination_file) and
os.path.exists(embeddings_file) and
os.path.getsize(document_file) > 0 and
os.path.getsize(pagination_file) > 0 and
os.path.getsize(embeddings_file) > 0
)
if already_processed:
print(f" Skipping already processed files for {key}")
processed_files_by_key[key].append(document_file)
continue # Skip to next key
# Read and combine all files for this key
combined_content = []
all_processed_files = []
for file_path in file_list:
# Check if it's a URL (remote file) or local file
is_remote = file_path.startswith(('http://', 'https://'))
filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}"
# Create temporary extraction directory for zip files
temp_extract_dir = None
files_to_process = []
try:
if is_remote:
# Handle remote file
temp_file = os.path.join(files_dir, filename)
print(f"Downloading {file_path} -> {temp_file}")
success = await download_file(file_path, temp_file)
if not success:
print(f"Failed to download {file_path}")
continue
# Check if it's a zip file
if filename.lower().endswith('.zip'):
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
print(f"Extracting zip to temporary directory: {temp_extract_dir}")
extracted_files = extract_zip_file_func(temp_file, temp_extract_dir)
files_to_process.extend(extracted_files)
# Copy the zip file to project files directory
zip_dest = os.path.join(files_dir, filename)
shutil.copy2(temp_file, zip_dest)
print(f"Copied local zip file: {temp_file} -> {zip_dest}")
else:
files_to_process.append(temp_file)
else:
# Handle local file
if not os.path.exists(file_path):
print(f"Local file not found: {file_path}")
continue
if filename.lower().endswith('.zip'):
# Copy to project directory first
local_zip_path = os.path.join(files_dir, filename)
shutil.copy2(file_path, local_zip_path)
print(f"Copied local zip file: {file_path} -> {local_zip_path}")
# Extract zip file
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
print(f"Extracting local zip to temporary directory: {temp_extract_dir}")
extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir)
files_to_process.extend(extracted_files)
else:
# Copy non-zip file directly
dest_file = os.path.join(files_dir, filename)
shutil.copy2(file_path, dest_file)
files_to_process.append(dest_file)
print(f"Copied local file: {file_path} -> {dest_file}")
# Process all files (extracted from zip or single file)
for process_file_path in files_to_process:
try:
with open(process_file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
# Add file content with page separator
base_filename = os.path.basename(process_file_path)
combined_content.append(f"# Page {base_filename}")
combined_content.append(content)
except Exception as e:
print(f"Failed to read file content from {process_file_path}: {str(e)}")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
finally:
# Clean up temporary extraction directory
if temp_extract_dir and os.path.exists(temp_extract_dir):
try:
shutil.rmtree(temp_extract_dir)
print(f"Cleaned up temporary directory: {temp_extract_dir}")
except Exception as e:
print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}")
# Write combined content to dataset/{key}/document.txt
if combined_content:
try:
with open(document_file, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(combined_content))
print(f"Created combined document: {document_file}")
# Generate pagination and embeddings for the combined document
try:
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding'))
from embedding import split_document_by_pages, embed_document
# Generate pagination
print(f" Generating pagination for {key}")
pages = split_document_by_pages(str(document_file), str(pagination_file))
print(f" Generated {len(pages)} pages")
# Generate embeddings
print(f" Generating embeddings for {key}")
local_model_path = "./models/paraphrase-multilingual-MiniLM-L12-v2"
if not os.path.exists(local_model_path):
local_model_path = None # Fallback to HuggingFace model
# Use paragraph chunking strategy with default settings
embedding_data = embed_document(
str(document_file),
str(embeddings_file),
chunking_strategy='paragraph',
model_path=local_model_path
)
if embedding_data:
print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks")
# Add to processed files only after successful embedding
processed_files_by_key[key].append(document_file)
else:
print(f" Failed to generate embeddings")
except Exception as e:
print(f" Failed to generate pagination/embeddings for {key}: {str(e)}")
except Exception as e:
print(f"Failed to write combined document: {str(e)}")
# Load existing log
processed_log = load_processed_files_log(unique_id)
# Update log with newly processed files
for key, file_list in files.items():
if key not in processed_log:
processed_log[key] = {}
for file_path in file_list:
filename = os.path.basename(file_path)
processed_log[key][filename] = {
"original_path": file_path,
"processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0),
"status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed"
}
# Save the updated processed log
save_processed_files_log(unique_id, processed_log)
return processed_files_by_key
def generate_dataset_structure(unique_id: str) -> str:
"""Generate a string representation of the dataset structure"""
dataset_dir = os.path.join("projects", unique_id, "dataset")
structure = []
def add_directory_contents(dir_path: str, prefix: str = ""):
try:
items = sorted(os.listdir(dir_path))
for i, item in enumerate(items):
item_path = os.path.join(dir_path, item)
is_last = i == len(items) - 1
current_prefix = "└── " if is_last else "├── "
structure.append(f"{prefix}{current_prefix}{item}")
if os.path.isdir(item_path):
next_prefix = prefix + (" " if is_last else "")
add_directory_contents(item_path, next_prefix)
except Exception as e:
structure.append(f"{prefix}└── Error: {str(e)}")
if os.path.exists(dataset_dir):
structure.append(f"dataset/")
add_directory_contents(dataset_dir, "")
else:
structure.append("dataset/ (not found)")
return "\n".join(structure)
def remove_dataset_directory(unique_id: str, filename_without_ext: str):
"""Remove a specific dataset directory"""
dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext)
remove_file_or_directory(dataset_path)
def remove_dataset_directory_by_key(unique_id: str, key: str):
"""Remove dataset directory by key"""
dataset_path = os.path.join("projects", unique_id, "dataset", key)
remove_file_or_directory(dataset_path)