qwen_agent/utils/dataset_manager.py
2025-10-20 19:56:50 +08:00

322 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Dataset management functions for organizing and processing datasets.
"""
import os
import shutil
import json
import tempfile
import zipfile
from typing import Dict, List, Optional
from pathlib import Path
from utils.file_utils import (
download_file, extract_zip_file, get_file_hash,
load_processed_files_log, save_processed_files_log,
remove_file_or_directory
)
from utils.excel_csv_processor import (
is_excel_file, is_csv_file, process_excel_file, process_csv_file
)
async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""Download or copy dataset files and organize them by key into dataset/{key}/document.txt.
Supports zip file extraction and combines content using '# Page' separators."""
if not files:
return {}
# Set up directories
project_dir = os.path.join("projects", unique_id)
files_dir = os.path.join(project_dir, "files")
dataset_dir = os.path.join(project_dir, "dataset")
# Create directories if they don't exist
os.makedirs(files_dir, exist_ok=True)
os.makedirs(dataset_dir, exist_ok=True)
processed_files_by_key = {}
def extract_zip_file_func(zip_path: str, extract_dir: str) -> List[str]:
"""Extract zip file and return list of extracted txt/md files"""
extracted_files = []
try:
import zipfile
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Find all extracted txt, md, xlsx, xls, and csv files
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith(('.txt', '.md', '.xlsx', '.xls', '.csv')):
extracted_files.append(os.path.join(root, file))
print(f"Extracted {len(extracted_files)} txt/md/xlsx/csv files from {zip_path}")
return extracted_files
except Exception as e:
print(f"Error extracting zip file {zip_path}: {str(e)}")
return []
# Process each key and its associated files
for key, file_list in files.items():
print(f"Processing key '{key}' with {len(file_list)} files")
processed_files_by_key[key] = []
# Create target directory for this key
target_dir = os.path.join(dataset_dir, key)
os.makedirs(target_dir, exist_ok=True)
# Check if files are already processed before doing any work
document_file = os.path.join(target_dir, "document.txt")
pagination_file = os.path.join(target_dir, "pagination.txt")
embeddings_file = os.path.join(target_dir, "document_embeddings.pkl")
already_processed = (
os.path.exists(document_file) and
os.path.exists(pagination_file) and
os.path.exists(embeddings_file) and
os.path.getsize(document_file) > 0 and
os.path.getsize(pagination_file) > 0 and
os.path.getsize(embeddings_file) > 0
)
if already_processed:
print(f" Skipping already processed files for {key}")
processed_files_by_key[key].append(document_file)
continue # Skip to next key
# Read and combine all files for this key
combined_content = []
pagination_lines = [] # Collect pagination lines from all files
all_processed_files = []
for file_path in file_list:
# Check if it's a URL (remote file) or local file
is_remote = file_path.startswith(('http://', 'https://'))
filename = file_path.split("/")[-1] if file_path else f"file_{len(all_processed_files)}"
# Create temporary extraction directory for zip files
temp_extract_dir = None
files_to_process = []
try:
if is_remote:
# Handle remote file
temp_file = os.path.join(files_dir, filename)
print(f"Downloading {file_path} -> {temp_file}")
success = await download_file(file_path, temp_file)
if not success:
print(f"Failed to download {file_path}")
continue
# Check if it's a zip file
if filename.lower().endswith('.zip'):
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
print(f"Extracting zip to temporary directory: {temp_extract_dir}")
extracted_files = extract_zip_file_func(temp_file, temp_extract_dir)
files_to_process.extend(extracted_files)
# Copy the zip file to project files directory
zip_dest = os.path.join(files_dir, filename)
shutil.copy2(temp_file, zip_dest)
print(f"Copied local zip file: {temp_file} -> {zip_dest}")
else:
files_to_process.append(temp_file)
else:
# Handle local file
if not os.path.exists(file_path):
print(f"Local file not found: {file_path}")
continue
if filename.lower().endswith('.zip'):
# Copy to project directory first
local_zip_path = os.path.join(files_dir, filename)
shutil.copy2(file_path, local_zip_path)
print(f"Copied local zip file: {file_path} -> {local_zip_path}")
# Extract zip file
temp_extract_dir = tempfile.mkdtemp(prefix=f"extract_{key}_")
print(f"Extracting local zip to temporary directory: {temp_extract_dir}")
extracted_files = extract_zip_file_func(local_zip_path, temp_extract_dir)
files_to_process.extend(extracted_files)
else:
# Copy non-zip file directly
dest_file = os.path.join(files_dir, filename)
shutil.copy2(file_path, dest_file)
files_to_process.append(dest_file)
print(f"Copied local file: {file_path} -> {dest_file}")
# Process all files (extracted from zip or single file)
for process_file_path in files_to_process:
try:
base_filename = os.path.basename(process_file_path)
# Check if it's an Excel file
if is_excel_file(process_file_path):
print(f"Processing Excel file: {base_filename}")
document_content, excel_pagination_lines = process_excel_file(process_file_path)
if document_content:
combined_content.append(f"# Page {base_filename}")
combined_content.append(document_content)
# Collect pagination lines from Excel files
pagination_lines.extend(excel_pagination_lines)
# Check if it's a CSV file
elif is_csv_file(process_file_path):
print(f"Processing CSV file: {base_filename}")
document_content, csv_pagination_lines = process_csv_file(process_file_path)
if document_content:
combined_content.append(f"# Page {base_filename}")
combined_content.append(document_content)
# Collect pagination lines from CSV files
pagination_lines.extend(csv_pagination_lines)
# Handle text files (original logic)
else:
with open(process_file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
# Add file content with page separator
combined_content.append(f"# Page {base_filename}")
combined_content.append(content)
except Exception as e:
print(f"Failed to read file content from {process_file_path}: {str(e)}")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
finally:
# Clean up temporary extraction directory
if temp_extract_dir and os.path.exists(temp_extract_dir):
try:
shutil.rmtree(temp_extract_dir)
print(f"Cleaned up temporary directory: {temp_extract_dir}")
except Exception as e:
print(f"Failed to clean up temporary directory {temp_extract_dir}: {str(e)}")
# Write combined content to dataset/{key}/document.txt
if combined_content:
try:
with open(document_file, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(combined_content))
print(f"Created combined document: {document_file}")
# Generate pagination and embeddings for the combined document
try:
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'embedding'))
from embedding import embed_document
# Generate pagination file from collected pagination lines
# For Excel/CSV files, use the pagination format we collected
# For text files, fall back to the original pagination generation
if pagination_lines:
print(f" Writing pagination data from Excel/CSV files for {key}")
with open(pagination_file, 'w', encoding='utf-8') as f:
for line in pagination_lines:
if line.strip():
f.write(f"{line}\n")
print(f" Generated {len(pagination_lines)} pagination lines")
else:
# For text-only files, use the original pagination generation
from embedding import split_document_by_pages
print(f" Generating pagination from text files for {key}")
pages = split_document_by_pages(str(document_file), str(pagination_file))
print(f" Generated {len(pages)} pages")
# Generate embeddings
print(f" Generating embeddings for {key}")
# Use paragraph chunking strategy with default settings
embedding_data = embed_document(
str(document_file),
str(embeddings_file),
chunking_strategy='paragraph'
)
if embedding_data:
print(f" Generated embeddings for {len(embedding_data['chunks'])} chunks")
# Add to processed files only after successful embedding
processed_files_by_key[key].append(document_file)
else:
print(f" Failed to generate embeddings")
except Exception as e:
print(f" Failed to generate pagination/embeddings for {key}: {str(e)}")
except Exception as e:
print(f"Failed to write combined document: {str(e)}")
# Load existing log
processed_log = load_processed_files_log(unique_id)
# Update log with newly processed files
for key, file_list in files.items():
if key not in processed_log:
processed_log[key] = {}
for file_path in file_list:
filename = os.path.basename(file_path)
processed_log[key][filename] = {
"original_path": file_path,
"processed_at": str(os.path.getmtime(document_file) if os.path.exists(document_file) else 0),
"status": "processed" if key in processed_files_by_key and processed_files_by_key[key] else "failed"
}
# Save the updated processed log
save_processed_files_log(unique_id, processed_log)
return processed_files_by_key
def generate_dataset_structure(unique_id: str) -> str:
"""Generate a string representation of the dataset structure"""
dataset_dir = os.path.join("projects", unique_id, "dataset")
structure = []
def add_directory_contents(dir_path: str, prefix: str = ""):
try:
items = sorted(os.listdir(dir_path))
for i, item in enumerate(items):
item_path = os.path.join(dir_path, item)
is_last = i == len(items) - 1
current_prefix = "└── " if is_last else "├── "
structure.append(f"{prefix}{current_prefix}{item}")
if os.path.isdir(item_path):
next_prefix = prefix + (" " if is_last else "")
add_directory_contents(item_path, next_prefix)
except Exception as e:
structure.append(f"{prefix}└── Error: {str(e)}")
if os.path.exists(dataset_dir):
structure.append(f"dataset/")
add_directory_contents(dataset_dir, "")
else:
structure.append("dataset/ (not found)")
return "\n".join(structure)
def remove_dataset_directory(unique_id: str, filename_without_ext: str):
"""Remove a specific dataset directory"""
dataset_path = os.path.join("projects", unique_id, "dataset", filename_without_ext)
remove_file_or_directory(dataset_path)
def remove_dataset_directory_by_key(unique_id: str, key: str):
"""Remove dataset directory by key"""
dataset_path = os.path.join("projects", unique_id, "dataset", key)
remove_file_or_directory(dataset_path)