#!/usr/bin/env python3 """ File management functions for syncing files and detecting changes. """ import os import json import shutil from typing import Dict, List, Set, Tuple from pathlib import Path def get_existing_files(unique_id: str) -> Dict[str, Set[str]]: """Get existing files organized by group.""" existing_files = {} files_dir = os.path.join("projects", "data", unique_id, "files") if not os.path.exists(files_dir): return existing_files for group_name in os.listdir(files_dir): group_dir = os.path.join(files_dir, group_name) if os.path.isdir(group_dir): existing_files[group_name] = set() for file_name in os.listdir(group_dir): file_path = os.path.join(group_dir, file_name) if os.path.isfile(file_path): existing_files[group_name].add(file_name) return existing_files def detect_file_changes(unique_id: str, new_files: Dict[str, List[str]], incremental_mode: bool = False) -> Dict: """ Detect file changes: added, removed, and existing files. Args: unique_id: Project ID new_files: Dictionary of files to process, grouped by key incremental_mode: If True, only detect removed files when files_to_remove is explicitly provided This prevents accidental deletion of existing files during incremental additions """ existing_files = get_existing_files(unique_id) changes = { "added": {}, "removed": {}, "existing": {}, "removed_groups": set() # Track completely removed groups } # Convert new_files to sets for comparison new_files_sets = {} for group, file_list in new_files.items(): # Extract filenames from paths filenames = set() for file_path in file_list: filename = os.path.basename(file_path) filenames.add(filename) new_files_sets[group] = filenames # Detect added and existing files for group, new_filenames in new_files_sets.items(): existing_filenames = existing_files.get(group, set()) changes["added"][group] = new_filenames - existing_filenames changes["existing"][group] = new_filenames & existing_filenames # For removed files, check against existing files in this group if group not in changes["removed"]: changes["removed"][group] = set() # Detect removed files (files that exist but not in new request) # Skip this step in incremental mode to preserve existing files if not incremental_mode: for group, existing_filenames in existing_files.items(): if group in new_files_sets: # Group exists in new request, check for individual file removals new_filenames = new_files_sets[group] changes["removed"][group] = existing_filenames - new_filenames else: # Group completely removed from new request changes["removed_groups"].add(group) changes["removed"][group] = existing_filenames return changes def sync_files_to_group(unique_id: str, files: Dict[str, List[str]], incremental_mode: bool = False) -> Tuple[Dict, Dict]: """ Sync files to group directories and return sync results. Args: unique_id: Project ID files: Dictionary of files to sync, grouped by key incremental_mode: If True, preserve existing files and only add new ones Returns: Tuple of (synced_files, failed_files) """ project_dir = os.path.join("projects", "data", unique_id) files_dir = os.path.join(project_dir, "files") # Create files directory os.makedirs(files_dir, exist_ok=True) # Detect changes first changes = detect_file_changes(unique_id, files, incremental_mode) synced_files = {} failed_files = {} # Process each group for group_name, file_list in files.items(): group_dir = os.path.join(files_dir, group_name) os.makedirs(group_dir, exist_ok=True) synced_files[group_name] = [] failed_files[group_name] = [] # Only process files that are new or need updating filenames_to_sync = changes["added"].get(group_name, set()) for file_path in file_list: filename = os.path.basename(file_path) # Skip if file already exists and is in the "existing" category if filename in changes["existing"].get(group_name, set()): synced_files[group_name].append({ "original_path": file_path, "local_path": os.path.join(group_dir, filename), "status": "existing" }) continue # Only process files that are actually new if filename not in filenames_to_sync: continue try: # Check if it's a URL (remote file) or local file is_remote = file_path.startswith(('http://', 'https://')) if is_remote: # For remote files, we'll download them in the processing stage synced_files[group_name].append({ "original_path": file_path, "local_path": os.path.join(group_dir, filename), "status": "pending_download" }) else: # Handle local file if not os.path.exists(file_path): failed_files[group_name].append({ "original_path": file_path, "error": "Local file not found" }) continue # Copy file to group directory dest_path = os.path.join(group_dir, filename) shutil.copy2(file_path, dest_path) synced_files[group_name].append({ "original_path": file_path, "local_path": dest_path, "status": "copied" }) except Exception as e: failed_files[group_name].append({ "original_path": file_path, "error": str(e) }) return synced_files, failed_files def cleanup_orphaned_files(unique_id: str, changes: Dict) -> Dict[str, List[str]]: """Remove files and their processing results that are no longer needed.""" removed_files = {} project_dir = os.path.join("projects", "data", unique_id) # Handle individual file removals for group_name, removed_filenames in changes["removed"].items(): if not removed_filenames: continue removed_files[group_name] = [] for filename in removed_filenames: try: # Remove original file file_path = os.path.join(project_dir, "files", group_name, filename) if os.path.exists(file_path): os.remove(file_path) removed_files[group_name].append(f"file: {filename}") # Remove processed directory for this file processed_dir = os.path.join(project_dir, "processed", group_name, Path(filename).stem) if os.path.exists(processed_dir): shutil.rmtree(processed_dir) removed_files[group_name].append(f"processed: {Path(filename).stem}") except Exception as e: print(f"Error cleaning up {filename}: {str(e)}") # Handle completely removed groups for group_name in changes.get("removed_groups", set()): print(f"Cleaning up completely removed group: {group_name}") removed_files[group_name] = [] try: # Remove entire files/group directory files_group_dir = os.path.join(project_dir, "files", group_name) if os.path.exists(files_group_dir): shutil.rmtree(files_group_dir) removed_files[group_name].append("files group directory") # Remove entire processed/group directory processed_group_dir = os.path.join(project_dir, "processed", group_name) if os.path.exists(processed_group_dir): shutil.rmtree(processed_group_dir) removed_files[group_name].append("processed group directory") # Remove entire dataset/group directory dataset_group_dir = os.path.join(project_dir, "dataset", group_name) if os.path.exists(dataset_group_dir): shutil.rmtree(dataset_group_dir) removed_files[group_name].append("dataset group directory") print(f"Completely removed group '{group_name}' and all its data") except Exception as e: print(f"Error cleaning up group {group_name}: {str(e)}") return removed_files def get_group_files_list(unique_id: str, group_name: str) -> List[str]: """Get list of files in a specific group.""" group_dir = os.path.join("projects", "data", unique_id, "files", group_name) if not os.path.exists(group_dir): return [] files = [] for filename in os.listdir(group_dir): file_path = os.path.join(group_dir, filename) if os.path.isfile(file_path): files.append(filename) return sorted(files) def ensure_directories(unique_id: str): """Ensure all necessary directories exist for a project.""" base_dir = os.path.join("projects", "data", unique_id) directories = [ "files", "processed", "dataset" ] for dir_name in directories: dir_path = os.path.join(base_dir, dir_name) os.makedirs(dir_path, exist_ok=True)