catalog-agent/utils/dataset_manager.py
2025-10-17 22:04:10 +08:00

281 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Dataset management functions for organizing and processing datasets.
"""
import os
import shutil
import json
from typing import Dict, List, Optional
from pathlib import Path
from utils.file_utils import (
download_file, extract_zip_file, get_file_hash,
load_processed_files_log, save_processed_files_log,
remove_file_or_directory
)
async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""Download or copy dataset files and organize them by key into dataset/{key}/document.txt.
Supports zip file extraction and combines content using '# Page' separators."""
if not files:
return {}
# Set up directories
project_dir = os.path.join("projects", unique_id)
files_dir = os.path.join(project_dir, "files")
dataset_dir = os.path.join(project_dir, "dataset")
# Create directories if they don't exist
os.makedirs(files_dir, exist_ok=True)
os.makedirs(dataset_dir, exist_ok=True)
processed_files_by_key = {}
def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]:
"""Extract zip file and return list of extracted txt/md files"""
extracted_files = []
try:
import zipfile
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Find all extracted txt and md files
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith(('.txt', '.md')):
extracted_files.append(os.path.join(root, file))
print(f"Extracted {len(extracted_files)} txt/md files from {zip_path}")
return extracted_files
except Exception as e:
print(f"Error extracting zip file {zip_path}: {str(e)}")
return []
# Process each key and its associated files
for key, file_list in files.items():
print(f"Processing key '{key}' with {len(file_list)} files")
processed_files_by_key[key] = []
# Create target directory for this key
target_dir = os.path.join(dataset_dir, key)
os.makedirs(target_dir, exist_ok=True)
# Check if files are already processed before doing any work
document_file = os.path.join(target_dir, "document.txt")
pagination_file = os.path.join(target_dir, "pagination.txt")
embeddings_file = os.path.join(target_dir, "document_embeddings.pkl")
already_processed = (
os.path.exists(document_file) and
os.path.exists(pagination_file) and
os.path.exists(embeddings_file) and
os.path.getsize(document_file) > 0 and
os.path.getsize(pagination_file) > 0 and
os.path.getsize(embeddings_file) > 0
)
if already_processed:
print(f" Skipping already processed files for {key}")
processed_files_by_key[key].append(document_file)
continue # Skip to next key
# Read and combine all files for this key
combined_content = []
all_processed_files = []
for file_path in file_list:
# Check if it's a URL (remote file) or local file
is_remote = file_path.startswith(('http://', 'https://'))
filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}"
# Create temporary extraction directory for zip files
temp_extract_dir = None
files_to_process = []
try:
if is_remote:
# Handle remote file
temp_file = os.path.join(files_dir, filename)
print(f"Downloading {file_path} -> {temp_file}")
success = await download_file(file_path, temp_file)
if not success:
print(f"Failed to download {file_path}")
continue
# Check if it's a zip file
if filename.lower().endswith('.zip'):
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
print(f"Extracting zip to temporary directory: {temp_extract_dir}")
extracted_files = extract_zip_file_func(temp_file, temp_extract_dir)
files_to_process.extend(extracted_files)
# Copy the zip file to project files directory
zip_dest = os.path.join(files_dir, filename)
shutil.copy2(temp_file, zip_dest)
print(f"Copied local zip file: {temp_file} -> {zip_dest}")
else:
files_to_process.append(temp_file)
else:
# Handle local file
if not os.path.exists(file_path):
print(f"Local file not found: {file_path}")
continue
if filename.lower().endswith('.zip'):
# Copy to project directory first
local_zip_path = os.path.join(files_dir, filename)
shutil.copy2(file_path, local_zip_path)
print(f"Copied local zip file: {file_path} -> {local_zip_path}")
# Extract zip file
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
print(f"Extracting local zip to temporary directory: {temp_extract_dir}")
extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir)
files_to_process.extend(extracted_files)
else:
# Copy non-zip file directly
dest_file = os.path.join(files_dir, filename)
shutil.copy2(file_path, dest_file)
files_to_process.append(dest_file)
print(f"Copied local file: {file_path} -> {dest_file}")
# Process all files (extracted from zip or single file)
for process_file_path in files_to_process:
try:
with open(process_file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
# Add file content with page separator
base_filename = os.path.basename(process_file_path)
combined_content.append(f"# Page {base_filename}")
combined_content.append(content)
except Exception as e:
print(f"Failed to read file content from {process_file_path}: {str(e)}")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
finally:
# Clean up temporary extraction directory
if temp_extract_dir and os.path.exists(temp_extract_dir):
try:
shutil.rmtree(temp_extract_dir)
print(f"Cleaned up temporary directory: {temp_extract_dir}")
except Exception as e:
print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}")
# Write combined content to dataset/{key}/document.txt
if combined_content:
try:
with open(document_file, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(combined_content))
print(f"Created combined document: {document_file}")
# Generate pagination and embeddings for the combined document
try:
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding'))
from embedding import split_document_by_pages, embed_document
# Generate pagination
print(f" Generating pagination for {key}")
pages = split_document_by_pages(str(document_file), str(pagination_file))
print(f" Generated {len(pages)} pages")
# Generate embeddings
print(f" Generating embeddings for {key}")
local_model_path = "./models/paraphrase-multilingual-MiniLM-L12-v2"
if not os.path.exists(local_model_path):
local_model_path = None # Fallback to HuggingFace model
# Use paragraph chunking strategy with default settings
embedding_data = embed_document(
str(document_file),
str(embeddings_file),
chunking_strategy='paragraph',
model_path=local_model_path
)
if embedding_data:
print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks")
# Add to processed files only after successful embedding
processed_files_by_key[key].append(document_file)
else:
print(f" Failed to generate embeddings")
except Exception as e:
print(f" Failed to generate pagination/embeddings for {key}: {str(e)}")
except Exception as e:
print(f"Failed to write combined document: {str(e)}")
# Load existing log
processed_log = load_processed_files_log(unique_id)
# Update log with newly processed files
for key, file_list in files.items():
if key not in processed_log:
processed_log[key] = {}
for file_path in file_list:
filename = os.path.basename(file_path)
processed_log[key][filename] = {
"original_path": file_path,
"processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0),
"status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed"
}
# Save the updated processed log
save_processed_files_log(unique_id, processed_log)
return processed_files_by_key
def generate_dataset_structure(unique_id: str) -> str:
"""Generate a string representation of the dataset structure"""
dataset_dir = os.path.join("projects", unique_id, "dataset")
structure = []
def add_directory_contents(dir_path: str, prefix: str = ""):
try:
items = sorted(os.listdir(dir_path))
for i, item in enumerate(items):
item_path = os.path.join(dir_path, item)
is_last = i == len(items) - 1
current_prefix = "└── " if is_last else "├── "
structure.append(f"{prefix}{current_prefix}{item}")
if os.path.isdir(item_path):
next_prefix = prefix + (" " if is_last else "")
add_directory_contents(item_path, next_prefix)
except Exception as e:
structure.append(f"{prefix}└── Error: {str(e)}")
if os.path.exists(dataset_dir):
structure.append(f"dataset/")
add_directory_contents(dataset_dir, "")
else:
structure.append("dataset/ (not found)")
return "\n".join(structure)
def remove_dataset_directory(unique_id: str, filename_without_ext: str):
"""Remove a specific dataset directory"""
dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext)
remove_file_or_directory(dataset_path)
def remove_dataset_directory_by_key(unique_id: str, key: str):
"""Remove dataset directory by key"""
dataset_path = os.path.join("projects", unique_id, "dataset", key)
remove_file_or_directory(dataset_path)