285 lines
11 KiB
Python
285 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Dataset management functions for organizing and processing datasets.
|
|
New implementation with per-file processing and group merging.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from typing import Dict, List
|
|
|
|
# Import new modules
|
|
from utils.file_manager import (
|
|
ensure_directories, sync_files_to_group, cleanup_orphaned_files,
|
|
get_group_files_list
|
|
)
|
|
from utils.single_file_processor import (
|
|
process_single_file, check_file_already_processed
|
|
)
|
|
from utils.data_merger import (
|
|
merge_all_data_by_group, cleanup_dataset_group
|
|
)
|
|
|
|
|
|
async def download_dataset_files(unique_id: str, files: Dict[str, List[str]]) -> Dict[str, List[str]]:
|
|
"""
|
|
Process dataset files with new architecture:
|
|
1. Sync files to group directories
|
|
2. Process each file individually
|
|
3. Merge results by group
|
|
4. Clean up orphaned files
|
|
"""
|
|
if not files:
|
|
return {}
|
|
|
|
print(f"Starting new file processing for project: {unique_id}")
|
|
|
|
# Ensure project directories exist
|
|
ensure_directories(unique_id)
|
|
|
|
# Step 1: Sync files to group directories
|
|
print("Step 1: Syncing files to group directories...")
|
|
synced_files, failed_files = sync_files_to_group(unique_id, files)
|
|
|
|
# Step 2: Detect changes and cleanup orphaned files
|
|
from utils.file_manager import detect_file_changes
|
|
changes = detect_file_changes(unique_id, files)
|
|
|
|
if any(changes["removed"].values()):
|
|
print("Step 2: Cleaning up orphaned files...")
|
|
removed_files = cleanup_orphaned_files(unique_id, changes)
|
|
print(f"Removed orphaned files: {removed_files}")
|
|
|
|
# Step 3: Process individual files
|
|
print("Step 3: Processing individual files...")
|
|
processed_files_by_group = {}
|
|
processing_results = {}
|
|
|
|
for group_name, file_list in files.items():
|
|
processed_files_by_group[group_name] = []
|
|
processing_results[group_name] = []
|
|
|
|
for file_path in file_list:
|
|
filename = os.path.basename(file_path)
|
|
|
|
# Get local file path
|
|
local_path = os.path.join("projects", unique_id, "files", group_name, filename)
|
|
|
|
# Skip if file doesn't exist (might be remote file that failed to download)
|
|
if not os.path.exists(local_path) and not file_path.startswith(('http://', 'https://')):
|
|
print(f"Skipping non-existent file: {filename}")
|
|
continue
|
|
|
|
# Check if already processed
|
|
if check_file_already_processed(unique_id, group_name, filename):
|
|
print(f"Skipping already processed file: {filename}")
|
|
processed_files_by_group[group_name].append(filename)
|
|
processing_results[group_name].append({
|
|
"filename": filename,
|
|
"status": "existing"
|
|
})
|
|
continue
|
|
|
|
# Process the file
|
|
print(f"Processing file: {filename} (group: {group_name})")
|
|
result = await process_single_file(unique_id, group_name, filename, file_path, local_path)
|
|
processing_results[group_name].append(result)
|
|
|
|
if result["success"]:
|
|
processed_files_by_group[group_name].append(filename)
|
|
print(f" Successfully processed {filename}")
|
|
else:
|
|
print(f" Failed to process {filename}: {result['error']}")
|
|
|
|
# Step 4: Merge results by group
|
|
print("Step 4: Merging results by group...")
|
|
merge_results = {}
|
|
|
|
for group_name in processed_files_by_group.keys():
|
|
# Get all files in the group (including existing ones)
|
|
group_files = get_group_files_list(unique_id, group_name)
|
|
|
|
if group_files:
|
|
print(f"Merging group: {group_name} with {len(group_files)} files")
|
|
merge_result = merge_all_data_by_group(unique_id, group_name)
|
|
merge_results[group_name] = merge_result
|
|
|
|
if merge_result["success"]:
|
|
print(f" Successfully merged group {group_name}")
|
|
else:
|
|
print(f" Failed to merge group {group_name}: {merge_result['errors']}")
|
|
|
|
# Step 5: Save processing log
|
|
print("Step 5: Saving processing log...")
|
|
await save_processing_log(unique_id, files, synced_files, processing_results, merge_results)
|
|
|
|
print(f"File processing completed for project: {unique_id}")
|
|
return processed_files_by_group
|
|
|
|
|
|
async def save_processing_log(
|
|
unique_id: str,
|
|
requested_files: Dict[str, List[str]],
|
|
synced_files: Dict,
|
|
processing_results: Dict,
|
|
merge_results: Dict
|
|
):
|
|
"""Save comprehensive processing log."""
|
|
|
|
log_data = {
|
|
"unique_id": unique_id,
|
|
"timestamp": str(os.path.getmtime("projects") if os.path.exists("projects") else 0),
|
|
"requested_files": requested_files,
|
|
"synced_files": synced_files,
|
|
"processing_results": processing_results,
|
|
"merge_results": merge_results,
|
|
"summary": {
|
|
"total_groups": len(requested_files),
|
|
"total_files_requested": sum(len(files) for files in requested_files.values()),
|
|
"total_files_processed": sum(
|
|
len([r for r in results if r.get("success", False)])
|
|
for results in processing_results.values()
|
|
),
|
|
"total_groups_merged": len([r for r in merge_results.values() if r.get("success", False)])
|
|
}
|
|
}
|
|
|
|
log_file_path = os.path.join("projects", unique_id, "processing_log.json")
|
|
try:
|
|
with open(log_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(log_data, f, ensure_ascii=False, indent=2)
|
|
print(f"Processing log saved to: {log_file_path}")
|
|
except Exception as e:
|
|
print(f"Error saving processing log: {str(e)}")
|
|
|
|
|
|
def generate_dataset_structure(unique_id: str) -> str:
|
|
"""Generate a string representation of the dataset structure"""
|
|
project_dir = os.path.join("projects", unique_id)
|
|
structure = []
|
|
|
|
def add_directory_contents(dir_path: str, prefix: str = ""):
|
|
try:
|
|
if not os.path.exists(dir_path):
|
|
structure.append(f"{prefix}└── (not found)")
|
|
return
|
|
|
|
items = sorted(os.listdir(dir_path))
|
|
for i, item in enumerate(items):
|
|
item_path = os.path.join(dir_path, item)
|
|
is_last = i == len(items) - 1
|
|
current_prefix = "└── " if is_last else "├── "
|
|
structure.append(f"{prefix}{current_prefix}{item}")
|
|
|
|
if os.path.isdir(item_path):
|
|
next_prefix = prefix + (" " if is_last else "│ ")
|
|
add_directory_contents(item_path, next_prefix)
|
|
except Exception as e:
|
|
structure.append(f"{prefix}└── Error: {str(e)}")
|
|
|
|
# Add files directory structure
|
|
files_dir = os.path.join(project_dir, "files")
|
|
structure.append("files/")
|
|
add_directory_contents(files_dir, "")
|
|
|
|
# Add processed directory structure
|
|
processed_dir = os.path.join(project_dir, "processed")
|
|
structure.append("\nprocessed/")
|
|
add_directory_contents(processed_dir, "")
|
|
|
|
# Add dataset directory structure
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
structure.append("\ndataset/")
|
|
add_directory_contents(dataset_dir, "")
|
|
|
|
return "\n".join(structure)
|
|
|
|
|
|
def get_processing_status(unique_id: str) -> Dict:
|
|
"""Get comprehensive processing status for a project."""
|
|
|
|
project_dir = os.path.join("projects", unique_id)
|
|
|
|
if not os.path.exists(project_dir):
|
|
return {
|
|
"project_exists": False,
|
|
"unique_id": unique_id
|
|
}
|
|
|
|
status = {
|
|
"project_exists": True,
|
|
"unique_id": unique_id,
|
|
"directories": {
|
|
"files": os.path.exists(os.path.join(project_dir, "files")),
|
|
"processed": os.path.exists(os.path.join(project_dir, "processed")),
|
|
"dataset": os.path.exists(os.path.join(project_dir, "dataset"))
|
|
},
|
|
"groups": {},
|
|
"processing_log_exists": os.path.exists(os.path.join(project_dir, "processing_log.json"))
|
|
}
|
|
|
|
# Check each group's status
|
|
files_dir = os.path.join(project_dir, "files")
|
|
if os.path.exists(files_dir):
|
|
for group_name in os.listdir(files_dir):
|
|
group_path = os.path.join(files_dir, group_name)
|
|
if os.path.isdir(group_path):
|
|
status["groups"][group_name] = {
|
|
"files_count": len([
|
|
f for f in os.listdir(group_path)
|
|
if os.path.isfile(os.path.join(group_path, f))
|
|
]),
|
|
"merge_status": "pending"
|
|
}
|
|
|
|
# Check merge status for each group
|
|
dataset_dir = os.path.join(project_dir, "dataset")
|
|
if os.path.exists(dataset_dir):
|
|
for group_name in os.listdir(dataset_dir):
|
|
group_path = os.path.join(dataset_dir, group_name)
|
|
if os.path.isdir(group_path):
|
|
if group_name in status["groups"]:
|
|
# Check if merge is complete
|
|
document_path = os.path.join(group_path, "document.txt")
|
|
pagination_path = os.path.join(group_path, "pagination.txt")
|
|
embedding_path = os.path.join(group_path, "embedding.pkl")
|
|
|
|
if (os.path.exists(document_path) and os.path.exists(pagination_path) and
|
|
os.path.exists(embedding_path)):
|
|
status["groups"][group_name]["merge_status"] = "completed"
|
|
else:
|
|
status["groups"][group_name]["merge_status"] = "incomplete"
|
|
else:
|
|
status["groups"][group_name] = {
|
|
"files_count": 0,
|
|
"merge_status": "completed"
|
|
}
|
|
|
|
return status
|
|
|
|
|
|
def remove_dataset_directory(unique_id: str, filename_without_ext: str):
|
|
"""Remove a specific dataset directory (deprecated - use new structure)"""
|
|
# This function is kept for compatibility but delegates to new structure
|
|
dataset_path = os.path.join("projects", unique_id, "processed", filename_without_ext)
|
|
if os.path.exists(dataset_path):
|
|
import shutil
|
|
shutil.rmtree(dataset_path)
|
|
|
|
|
|
def remove_dataset_directory_by_key(unique_id: str, key: str):
|
|
"""Remove dataset directory by key (group name)"""
|
|
# Remove files directory
|
|
files_group_path = os.path.join("projects", unique_id, "files", key)
|
|
if os.path.exists(files_group_path):
|
|
import shutil
|
|
shutil.rmtree(files_group_path)
|
|
|
|
# Remove processed directory
|
|
processed_group_path = os.path.join("projects", unique_id, "processed", key)
|
|
if os.path.exists(processed_group_path):
|
|
import shutil
|
|
shutil.rmtree(processed_group_path)
|
|
|
|
# Remove dataset directory
|
|
cleanup_dataset_group(unique_id, key) |