import os import uuid import shutil import zipfile from datetime import datetime from typing import Optional, List from fastapi import APIRouter, HTTPException, Header, UploadFile, File, Form from pydantic import BaseModel import logging logger = logging.getLogger('app') from utils import ( DatasetRequest, QueueTaskRequest, IncrementalTaskRequest, QueueTaskResponse, load_processed_files_log, remove_file_or_directory, remove_dataset_directory_by_key ) from utils.fastapi_utils import get_versioned_filename from task_queue.manager import queue_manager from task_queue.integration_tasks import process_files_async, process_files_incremental_async, cleanup_project_async from task_queue.task_status import task_status_store router = APIRouter() @router.post("/api/v1/files/process/async") async def process_files_async_endpoint(request: QueueTaskRequest, authorization: Optional[str] = Header(None)): """ Queue-based API for asynchronous file processing. Same functionality as /api/v1/files/process, but processed asynchronously through the queue. Args: request: QueueTaskRequest containing dataset_id, files, system_prompt, mcp_settings, and queue options authorization: Authorization header containing API key (Bearer ) Returns: QueueTaskResponse: Processing result with task ID for tracking """ try: dataset_id = request.dataset_id if not dataset_id: raise HTTPException(status_code=400, detail="dataset_id is required") # Estimate processing time (based on file count) estimated_time = 0 if request.upload_folder: # For upload_folder, file count cannot be estimated in advance, so use the default time estimated_time = 120 # Default: 2 minutes elif request.files: total_files = sum(len(file_list) for file_list in request.files.values()) estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds # Create task status record import uuid task_id = str(uuid.uuid4()) task_status_store.set_status( task_id=task_id, unique_id=dataset_id, status="pending" ) # Submit async task task = process_files_async( dataset_id=dataset_id, files=request.files, upload_folder=request.upload_folder, task_id=task_id ) # Build a more detailed message message = f"File processing task has been submitted to the queue, project ID: {dataset_id}" if request.upload_folder: group_count = len(request.upload_folder) message += f", files will be scanned automatically from {group_count} uploaded folders" elif request.files: total_files = sum(len(file_list) for file_list in request.files.values()) message += f", including {total_files} files" return QueueTaskResponse( success=True, message=message, dataset_id=dataset_id, task_id=task_id, # Use our own task_id task_status="pending", estimated_processing_time=estimated_time ) except HTTPException: raise except Exception as e: logger.error(f"Error submitting async file processing task: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/api/v1/files/process/incremental") async def process_files_incremental_endpoint(request: IncrementalTaskRequest, authorization: Optional[str] = Header(None)): """ Queue-based API for incremental file processing, supporting file additions and deletions. Args: request: IncrementalTaskRequest containing dataset_id, files_to_add, files_to_remove, system_prompt, mcp_settings, and queue options authorization: Authorization header containing API key (Bearer ) Returns: QueueTaskResponse: Processing result with task ID for tracking """ try: dataset_id = request.dataset_id if not dataset_id: raise HTTPException(status_code=400, detail="dataset_id is required") # Validate that there is at least one add or delete operation if not request.files_to_add and not request.files_to_remove: raise HTTPException(status_code=400, detail="At least one of files_to_add or files_to_remove must be provided") # Estimate processing time (based on file count) estimated_time = 0 total_add_files = sum(len(file_list) for file_list in (request.files_to_add or {}).values()) total_remove_files = sum(len(file_list) for file_list in (request.files_to_remove or {}).values()) total_files = total_add_files + total_remove_files estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds # Create task status record import uuid task_id = str(uuid.uuid4()) task_status_store.set_status( task_id=task_id, unique_id=dataset_id, status="pending" ) # Submit incremental async task task = process_files_incremental_async( dataset_id=dataset_id, files_to_add=request.files_to_add, files_to_remove=request.files_to_remove, system_prompt=request.system_prompt, mcp_settings=request.mcp_settings, task_id=task_id ) return QueueTaskResponse( success=True, message=f"Incremental file processing task has been submitted to the queue - added {total_add_files} files, removed {total_remove_files} files, project ID: {dataset_id}", dataset_id=dataset_id, task_id=task_id, task_status="pending", estimated_processing_time=estimated_time ) except HTTPException: raise except Exception as e: logger.error(f"Error submitting incremental file processing task: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/api/v1/files/{dataset_id}/status") async def get_files_processing_status(dataset_id: str): """Get the file processing status for the project.""" try: # Load processed files log processed_log = load_processed_files_log(dataset_id) # Get project directory info project_dir = os.path.join("projects", "data", dataset_id) project_exists = os.path.exists(project_dir) # Collect document.txt files document_files = [] if project_exists: for root, dirs, files in os.walk(project_dir): for file in files: if file == "document.txt": document_files.append(os.path.join(root, file)) return { "dataset_id": dataset_id, "project_exists": project_exists, "processed_files_count": len(processed_log), "processed_files": processed_log, "document_files_count": len(document_files), "document_files": document_files, "log_file_exists": os.path.exists(os.path.join("projects", "data", dataset_id, "processed_files.json")) } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to retrieve file processing status: {str(e)}") @router.post("/api/v1/files/{dataset_id}/reset") async def reset_files_processing(dataset_id: str): """Reset the project's file processing status by deleting the processing log and all files.""" try: project_dir = os.path.join("projects", "data", dataset_id) log_file = os.path.join("projects", "data", dataset_id, "processed_files.json") # Load processed log to know what files to remove processed_log = load_processed_files_log(dataset_id) removed_files = [] # Remove all processed files and their dataset directories for file_hash, file_info in processed_log.items(): # Remove local file in files directory if 'local_path' in file_info: if remove_file_or_directory(file_info['local_path']): removed_files.append(file_info['local_path']) # Handle new key-based structure first if 'key' in file_info: # Remove dataset directory by key key = file_info['key'] if remove_dataset_directory_by_key(dataset_id, key): removed_files.append(f"dataset/{key}") elif 'filename' in file_info: # Fallback to old filename-based structure filename_without_ext = os.path.splitext(file_info['filename'])[0] dataset_dir = os.path.join("projects", "data", dataset_id, "datasets", filename_without_ext) if remove_file_or_directory(dataset_dir): removed_files.append(dataset_dir) # Also remove any specific dataset path if exists (fallback) if 'dataset_path' in file_info: if remove_file_or_directory(file_info['dataset_path']): removed_files.append(file_info['dataset_path']) # Remove the log file if remove_file_or_directory(log_file): removed_files.append(log_file) # Remove the entire files directory files_dir = os.path.join(project_dir, "files") if remove_file_or_directory(files_dir): removed_files.append(files_dir) # Also remove the entire dataset directory (clean up any remaining files) dataset_dir = os.path.join(project_dir, "datasets") if remove_file_or_directory(dataset_dir): removed_files.append(dataset_dir) # Remove README.md if exists readme_file = os.path.join(project_dir, "README.md") if remove_file_or_directory(readme_file): removed_files.append(readme_file) return { "message": f"File processing status reset successfully: {dataset_id}", "removed_files_count": len(removed_files), "removed_files": removed_files } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to reset file processing status: {str(e)}") @router.post("/api/v1/files/{dataset_id}/cleanup/async") async def cleanup_project_async_endpoint(dataset_id: str, remove_all: bool = False): """Asynchronously clean up project files.""" try: task = cleanup_project_async(dataset_id=dataset_id, remove_all=remove_all) return { "success": True, "message": f"Project cleanup task has been submitted to the queue, project ID: {dataset_id}", "dataset_id": dataset_id, "task_id": task.id, "action": "remove_all" if remove_all else "cleanup_logs" } except Exception as e: logger.error(f"Error submitting cleanup task: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to submit cleanup task: {str(e)}") @router.post("/api/v1/upload") async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form(None)): """ File upload API endpoint that uploads files to the ./projects/uploads/ directory. You can specify a custom folder name. If omitted, a date-based folder is used. When a folder is specified, the original filename is used and version control is supported. Args: file: Uploaded file folder: Optional custom folder name Returns: dict: Response containing file path and folder information """ try: # Debug information logger.info(f"Received folder parameter: {folder}") logger.info(f"File received: {file.filename if file else 'None'}") # Determine the upload folder if folder: # Use the specified custom folder target_folder = folder target_folder = os.path.basename(target_folder) else: # Get the current date and format it as YYYYMMDD current_date = datetime.now() target_folder = current_date.strftime("%Y%m%d") # Create upload directory upload_dir = os.path.join("projects", "uploads", target_folder) os.makedirs(upload_dir, exist_ok=True) # Process filename if not file.filename: raise HTTPException(status_code=400, detail="Filename cannot be empty") # Parse filename and extension original_filename = file.filename name_without_ext, file_extension = os.path.splitext(original_filename) # Choose naming strategy based on whether a folder is specified if folder: final_filename, version = get_versioned_filename(upload_dir, name_without_ext, file_extension) file_path = os.path.join(upload_dir, final_filename) # Save file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) return { "success": True, "message": f"File uploaded successfully{' (version: ' + str(version) + ')' if version > 1 else ''}", "file_path": file_path, "folder": target_folder, "original_filename": original_filename, "version": version } else: # Use UUID unique filename (original logic) unique_filename = f"{uuid.uuid4()}{file_extension}" file_path = os.path.join(upload_dir, unique_filename) # Save file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) return { "success": True, "message": "File uploaded successfully", "file_path": file_path, "folder": target_folder, "original_filename": original_filename } except HTTPException: raise except Exception as e: logger.error(f"Error uploading file: {str(e)}") raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}") # Task management routes that are related to file processing @router.get("/api/v1/task/{task_id}/status") async def get_task_status(task_id: str): """Get task status - simple and reliable.""" try: status_data = task_status_store.get_status(task_id) if not status_data: return { "success": False, "message": "Task does not exist or has expired", "task_id": task_id, "status": "not_found" } return { "success": True, "message": "Task status retrieved successfully", "task_id": task_id, "status": status_data["status"], "unique_id": status_data["unique_id"], "created_at": status_data["created_at"], "updated_at": status_data["updated_at"], "result": status_data.get("result"), "error": status_data.get("error") } except Exception as e: logger.error(f"Error getting task status: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to retrieve task status: {str(e)}") @router.delete("/api/v1/task/{task_id}") async def delete_task(task_id: str): """Delete task record.""" try: success = task_status_store.delete_status(task_id) if success: return { "success": True, "message": f"Task record deleted: {task_id}", "task_id": task_id } else: return { "success": False, "message": f"Task record does not exist: {task_id}", "task_id": task_id } except Exception as e: logger.error(f"Error deleting task: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to delete task record: {str(e)}") @router.get("/api/v1/tasks") async def list_tasks(status: Optional[str] = None, dataset_id: Optional[str] = None, limit: int = 100): """List tasks with optional filters.""" try: if status or dataset_id: # Use search function tasks = task_status_store.search_tasks(status=status, unique_id=dataset_id, limit=limit) else: # Get all tasks all_tasks = task_status_store.list_all() tasks = list(all_tasks.values())[:limit] return { "success": True, "message": "Task list retrieved successfully", "total_tasks": len(tasks), "tasks": tasks, "filters": { "status": status, "dataset_id": dataset_id, "limit": limit } } except Exception as e: logger.error(f"Error listing tasks: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to retrieve task list: {str(e)}") @router.get("/api/v1/tasks/statistics") async def get_task_statistics(): """Get task statistics.""" try: stats = task_status_store.get_statistics() return { "success": True, "message": "Statistics retrieved successfully", "statistics": stats } except Exception as e: logger.error(f"Error getting statistics: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to retrieve statistics: {str(e)}") @router.post("/api/v1/tasks/cleanup") async def cleanup_tasks(older_than_days: int = 7): """Clean up old task records.""" try: deleted_count = task_status_store.cleanup_old_tasks(older_than_days=older_than_days) return { "success": True, "message": f"Cleaned up {deleted_count} old task records", "deleted_count": deleted_count, "older_than_days": older_than_days } except Exception as e: logger.error(f"Error cleaning up tasks: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to clean up task records: {str(e)}")