253 lines
9.3 KiB
Python
253 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
File management functions for syncing files and detecting changes.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
from typing import Dict, List, Set, Tuple
|
|
from pathlib import Path
|
|
|
|
|
|
def get_existing_files(unique_id: str) -> Dict[str, Set[str]]:
|
|
"""Get existing files organized by group."""
|
|
existing_files = {}
|
|
files_dir = os.path.join("projects", "data", unique_id, "files")
|
|
|
|
if not os.path.exists(files_dir):
|
|
return existing_files
|
|
|
|
for group_name in os.listdir(files_dir):
|
|
group_dir = os.path.join(files_dir, group_name)
|
|
if os.path.isdir(group_dir):
|
|
existing_files[group_name] = set()
|
|
for file_name in os.listdir(group_dir):
|
|
file_path = os.path.join(group_dir, file_name)
|
|
if os.path.isfile(file_path):
|
|
existing_files[group_name].add(file_name)
|
|
|
|
return existing_files
|
|
|
|
|
|
def detect_file_changes(unique_id: str, new_files: Dict[str, List[str]]) -> Dict:
|
|
"""Detect file changes: added, removed, and existing files."""
|
|
existing_files = get_existing_files(unique_id)
|
|
|
|
changes = {
|
|
"added": {},
|
|
"removed": {},
|
|
"existing": {},
|
|
"removed_groups": set() # Track completely removed groups
|
|
}
|
|
|
|
# Convert new_files to sets for comparison
|
|
new_files_sets = {}
|
|
for group, file_list in new_files.items():
|
|
# Extract filenames from paths
|
|
filenames = set()
|
|
for file_path in file_list:
|
|
filename = os.path.basename(file_path)
|
|
filenames.add(filename)
|
|
new_files_sets[group] = filenames
|
|
|
|
# Detect added and existing files
|
|
for group, new_filenames in new_files_sets.items():
|
|
existing_filenames = existing_files.get(group, set())
|
|
|
|
changes["added"][group] = new_filenames - existing_filenames
|
|
changes["existing"][group] = new_filenames & existing_filenames
|
|
|
|
# For removed files, check against existing files in this group
|
|
if group not in changes["removed"]:
|
|
changes["removed"][group] = set()
|
|
|
|
# Detect removed files (files that exist but not in new request)
|
|
for group, existing_filenames in existing_files.items():
|
|
if group in new_files_sets:
|
|
# Group exists in new request, check for individual file removals
|
|
new_filenames = new_files_sets[group]
|
|
changes["removed"][group] = existing_filenames - new_filenames
|
|
else:
|
|
# Group completely removed from new request
|
|
changes["removed_groups"].add(group)
|
|
changes["removed"][group] = existing_filenames
|
|
|
|
return changes
|
|
|
|
|
|
def sync_files_to_group(unique_id: str, files: Dict[str, List[str]]) -> Tuple[Dict, Dict]:
|
|
"""
|
|
Sync files to group directories and return sync results.
|
|
|
|
Returns:
|
|
Tuple of (synced_files, failed_files)
|
|
"""
|
|
project_dir = os.path.join("projects", "data", unique_id)
|
|
files_dir = os.path.join(project_dir, "files")
|
|
|
|
# Create files directory
|
|
os.makedirs(files_dir, exist_ok=True)
|
|
|
|
# Detect changes first
|
|
changes = detect_file_changes(unique_id, files)
|
|
|
|
synced_files = {}
|
|
failed_files = {}
|
|
|
|
# Process each group
|
|
for group_name, file_list in files.items():
|
|
group_dir = os.path.join(files_dir, group_name)
|
|
os.makedirs(group_dir, exist_ok=True)
|
|
|
|
synced_files[group_name] = []
|
|
failed_files[group_name] = []
|
|
|
|
# Only process files that are new or need updating
|
|
filenames_to_sync = changes["added"].get(group_name, set())
|
|
|
|
for file_path in file_list:
|
|
filename = os.path.basename(file_path)
|
|
|
|
# Skip if file already exists and is in the "existing" category
|
|
if filename in changes["existing"].get(group_name, set()):
|
|
synced_files[group_name].append({
|
|
"original_path": file_path,
|
|
"local_path": os.path.join(group_dir, filename),
|
|
"status": "existing"
|
|
})
|
|
continue
|
|
|
|
# Only process files that are actually new
|
|
if filename not in filenames_to_sync:
|
|
continue
|
|
|
|
try:
|
|
# Check if it's a URL (remote file) or local file
|
|
is_remote = file_path.startswith(('http://', 'https://'))
|
|
|
|
if is_remote:
|
|
# For remote files, we'll download them in the processing stage
|
|
synced_files[group_name].append({
|
|
"original_path": file_path,
|
|
"local_path": os.path.join(group_dir, filename),
|
|
"status": "pending_download"
|
|
})
|
|
else:
|
|
# Handle local file
|
|
if not os.path.exists(file_path):
|
|
failed_files[group_name].append({
|
|
"original_path": file_path,
|
|
"error": "Local file not found"
|
|
})
|
|
continue
|
|
|
|
# Copy file to group directory
|
|
dest_path = os.path.join(group_dir, filename)
|
|
shutil.copy2(file_path, dest_path)
|
|
|
|
synced_files[group_name].append({
|
|
"original_path": file_path,
|
|
"local_path": dest_path,
|
|
"status": "copied"
|
|
})
|
|
|
|
except Exception as e:
|
|
failed_files[group_name].append({
|
|
"original_path": file_path,
|
|
"error": str(e)
|
|
})
|
|
|
|
return synced_files, failed_files
|
|
|
|
|
|
def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str]]:
|
|
"""Remove files and their processing results that are no longer needed."""
|
|
removed_files = {}
|
|
project_dir = os.path.join("projects", "data", unique_id)
|
|
|
|
# Handle individual file removals
|
|
for group_name, removed_filenames in changes["removed"].items():
|
|
if not removed_filenames:
|
|
continue
|
|
|
|
removed_files[group_name] = []
|
|
|
|
for filename in removed_filenames:
|
|
try:
|
|
# Remove original file
|
|
file_path = os.path.join(project_dir, "files", group_name, filename)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
removed_files[group_name].append(f"file: {filename}")
|
|
|
|
# Remove processed directory for this file
|
|
processed_dir = os.path.join(project_dir, "processed", group_name,
|
|
Path(filename).stem)
|
|
if os.path.exists(processed_dir):
|
|
shutil.rmtree(processed_dir)
|
|
removed_files[group_name].append(f"processed: {Path(filename).stem}")
|
|
|
|
except Exception as e:
|
|
print(f"Error cleaning up {filename}: {str(e)}")
|
|
|
|
# Handle completely removed groups
|
|
for group_name in changes.get("removed_groups", set()):
|
|
print(f"Cleaning up completely removed group: {group_name}")
|
|
removed_files[group_name] = []
|
|
|
|
try:
|
|
# Remove entire files/group directory
|
|
files_group_dir = os.path.join(project_dir, "files", group_name)
|
|
if os.path.exists(files_group_dir):
|
|
shutil.rmtree(files_group_dir)
|
|
removed_files[group_name].append("files group directory")
|
|
|
|
# Remove entire processed/group directory
|
|
processed_group_dir = os.path.join(project_dir, "processed", group_name)
|
|
if os.path.exists(processed_group_dir):
|
|
shutil.rmtree(processed_group_dir)
|
|
removed_files[group_name].append("processed group directory")
|
|
|
|
# Remove entire dataset/group directory
|
|
dataset_group_dir = os.path.join(project_dir, "dataset", group_name)
|
|
if os.path.exists(dataset_group_dir):
|
|
shutil.rmtree(dataset_group_dir)
|
|
removed_files[group_name].append("dataset group directory")
|
|
|
|
print(f"Completely removed group '{group_name}' and all its data")
|
|
|
|
except Exception as e:
|
|
print(f"Error cleaning up group {group_name}: {str(e)}")
|
|
|
|
return removed_files
|
|
|
|
|
|
def get_group_files_list(unique_id: str, group_name: str) -> List[str]:
|
|
"""Get list of files in a specific group."""
|
|
group_dir = os.path.join("projects", "data", unique_id, "files", group_name)
|
|
|
|
if not os.path.exists(group_dir):
|
|
return []
|
|
|
|
files = []
|
|
for filename in os.listdir(group_dir):
|
|
file_path = os.path.join(group_dir, filename)
|
|
if os.path.isfile(file_path):
|
|
files.append(filename)
|
|
|
|
return sorted(files)
|
|
|
|
|
|
def ensure_directories(unique_id: str):
|
|
"""Ensure all necessary directories exist for a project."""
|
|
base_dir = os.path.join("projects", "data", unique_id)
|
|
directories = [
|
|
"files",
|
|
"processed",
|
|
"dataset"
|
|
]
|
|
|
|
for dir_name in directories:
|
|
dir_path = os.path.join(base_dir, dir_name)
|
|
os.makedirs(dir_path, exist_ok=True) |