qwen_agent/routes/files.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

469 lines
18 KiB
Python

import os
import uuid
import shutil
import zipfile
from datetime import datetime
from typing import Optional, List
from fastapi import APIRouter, HTTPException, Header, UploadFile, File, Form
from pydantic import BaseModel
import logging
logger = logging.getLogger('app')
from utils import (
DatasetRequest, QueueTaskRequest, IncrementalTaskRequest, QueueTaskResponse,
load_processed_files_log, remove_file_or_directory, remove_dataset_directory_by_key
)
from utils.fastapi_utils import get_versioned_filename
from task_queue.manager import queue_manager
from task_queue.integration_tasks import process_files_async, process_files_incremental_async, cleanup_project_async
from task_queue.task_status import task_status_store
router = APIRouter()
@router.post("/api/v1/files/process/async")
async def process_files_async_endpoint(request: QueueTaskRequest, authorization: Optional[str] = Header(None)):
"""
Queue-based API for asynchronous file processing.
Same functionality as /api/v1/files/process, but processed asynchronously through the queue.
Args:
request: QueueTaskRequest containing dataset_id, files, system_prompt, mcp_settings, and queue options
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
QueueTaskResponse: Processing result with task ID for tracking
"""
try:
dataset_id = request.dataset_id
if not dataset_id:
raise HTTPException(status_code=400, detail="dataset_id is required")
# Estimate processing time (based on file count)
estimated_time = 0
if request.upload_folder:
# For upload_folder, file count cannot be estimated in advance, so use the default time
estimated_time = 120 # Default: 2 minutes
elif request.files:
total_files = sum(len(file_list) for file_list in request.files.values())
estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds
# Create task status record
import uuid
task_id = str(uuid.uuid4())
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="pending"
)
# Submit async task
task = process_files_async(
dataset_id=dataset_id,
files=request.files,
upload_folder=request.upload_folder,
task_id=task_id
)
# Build a more detailed message
message = f"File processing task has been submitted to the queue, project ID: {dataset_id}"
if request.upload_folder:
group_count = len(request.upload_folder)
message += f", files will be scanned automatically from {group_count} uploaded folders"
elif request.files:
total_files = sum(len(file_list) for file_list in request.files.values())
message += f", including {total_files} files"
return QueueTaskResponse(
success=True,
message=message,
dataset_id=dataset_id,
task_id=task_id, # Use our own task_id
task_status="pending",
estimated_processing_time=estimated_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error submitting async file processing task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/api/v1/files/process/incremental")
async def process_files_incremental_endpoint(request: IncrementalTaskRequest, authorization: Optional[str] = Header(None)):
"""
Queue-based API for incremental file processing, supporting file additions and deletions.
Args:
request: IncrementalTaskRequest containing dataset_id, files_to_add, files_to_remove, system_prompt, mcp_settings, and queue options
authorization: Authorization header containing API key (Bearer <API_KEY>)
Returns:
QueueTaskResponse: Processing result with task ID for tracking
"""
try:
dataset_id = request.dataset_id
if not dataset_id:
raise HTTPException(status_code=400, detail="dataset_id is required")
# Validate that there is at least one add or delete operation
if not request.files_to_add and not request.files_to_remove:
raise HTTPException(status_code=400, detail="At least one of files_to_add or files_to_remove must be provided")
# Estimate processing time (based on file count)
estimated_time = 0
total_add_files = sum(len(file_list) for file_list in (request.files_to_add or {}).values())
total_remove_files = sum(len(file_list) for file_list in (request.files_to_remove or {}).values())
total_files = total_add_files + total_remove_files
estimated_time = max(30, total_files * 10) # Estimated 10 seconds per file, minimum 30 seconds
# Create task status record
import uuid
task_id = str(uuid.uuid4())
task_status_store.set_status(
task_id=task_id,
unique_id=dataset_id,
status="pending"
)
# Submit incremental async task
task = process_files_incremental_async(
dataset_id=dataset_id,
files_to_add=request.files_to_add,
files_to_remove=request.files_to_remove,
system_prompt=request.system_prompt,
mcp_settings=request.mcp_settings,
task_id=task_id
)
return QueueTaskResponse(
success=True,
message=f"Incremental file processing task has been submitted to the queue - added {total_add_files} files, removed {total_remove_files} files, project ID: {dataset_id}",
dataset_id=dataset_id,
task_id=task_id,
task_status="pending",
estimated_processing_time=estimated_time
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error submitting incremental file processing task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/api/v1/files/{dataset_id}/status")
async def get_files_processing_status(dataset_id: str):
"""Get the file processing status for the project."""
try:
# Load processed files log
processed_log = load_processed_files_log(dataset_id)
# Get project directory info
project_dir = os.path.join("projects", "data", dataset_id)
project_exists = os.path.exists(project_dir)
# Collect document.txt files
document_files = []
if project_exists:
for root, dirs, files in os.walk(project_dir):
for file in files:
if file == "document.txt":
document_files.append(os.path.join(root, file))
return {
"dataset_id": dataset_id,
"project_exists": project_exists,
"processed_files_count": len(processed_log),
"processed_files": processed_log,
"document_files_count": len(document_files),
"document_files": document_files,
"log_file_exists": os.path.exists(os.path.join("projects", "data", dataset_id, "processed_files.json"))
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve file processing status: {str(e)}")
@router.post("/api/v1/files/{dataset_id}/reset")
async def reset_files_processing(dataset_id: str):
"""Reset the project's file processing status by deleting the processing log and all files."""
try:
project_dir = os.path.join("projects", "data", dataset_id)
log_file = os.path.join("projects", "data", dataset_id, "processed_files.json")
# Load processed log to know what files to remove
processed_log = load_processed_files_log(dataset_id)
removed_files = []
# Remove all processed files and their dataset directories
for file_hash, file_info in processed_log.items():
# Remove local file in files directory
if 'local_path' in file_info:
if remove_file_or_directory(file_info['local_path']):
removed_files.append(file_info['local_path'])
# Handle new key-based structure first
if 'key' in file_info:
# Remove dataset directory by key
key = file_info['key']
if remove_dataset_directory_by_key(dataset_id, key):
removed_files.append(f"dataset/{key}")
elif 'filename' in file_info:
# Fallback to old filename-based structure
filename_without_ext = os.path.splitext(file_info['filename'])[0]
dataset_dir = os.path.join("projects", "data", dataset_id, "datasets", filename_without_ext)
if remove_file_or_directory(dataset_dir):
removed_files.append(dataset_dir)
# Also remove any specific dataset path if exists (fallback)
if 'dataset_path' in file_info:
if remove_file_or_directory(file_info['dataset_path']):
removed_files.append(file_info['dataset_path'])
# Remove the log file
if remove_file_or_directory(log_file):
removed_files.append(log_file)
# Remove the entire files directory
files_dir = os.path.join(project_dir, "files")
if remove_file_or_directory(files_dir):
removed_files.append(files_dir)
# Also remove the entire dataset directory (clean up any remaining files)
dataset_dir = os.path.join(project_dir, "datasets")
if remove_file_or_directory(dataset_dir):
removed_files.append(dataset_dir)
# Remove README.md if exists
readme_file = os.path.join(project_dir, "README.md")
if remove_file_or_directory(readme_file):
removed_files.append(readme_file)
return {
"message": f"File processing status reset successfully: {dataset_id}",
"removed_files_count": len(removed_files),
"removed_files": removed_files
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to reset file processing status: {str(e)}")
@router.post("/api/v1/files/{dataset_id}/cleanup/async")
async def cleanup_project_async_endpoint(dataset_id: str, remove_all: bool = False):
"""Asynchronously clean up project files."""
try:
task = cleanup_project_async(dataset_id=dataset_id, remove_all=remove_all)
return {
"success": True,
"message": f"Project cleanup task has been submitted to the queue, project ID: {dataset_id}",
"dataset_id": dataset_id,
"task_id": task.id,
"action": "remove_all" if remove_all else "cleanup_logs"
}
except Exception as e:
logger.error(f"Error submitting cleanup task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to submit cleanup task: {str(e)}")
@router.post("/api/v1/upload")
async def upload_file(file: UploadFile = File(...), folder: Optional[str] = Form(None)):
"""
File upload API endpoint that uploads files to the ./projects/uploads/ directory.
You can specify a custom folder name. If omitted, a date-based folder is used.
When a folder is specified, the original filename is used and version control is supported.
Args:
file: Uploaded file
folder: Optional custom folder name
Returns:
dict: Response containing file path and folder information
"""
try:
# Debug information
logger.info(f"Received folder parameter: {folder}")
logger.info(f"File received: {file.filename if file else 'None'}")
# Determine the upload folder
if folder:
# Use the specified custom folder
target_folder = folder
target_folder = os.path.basename(target_folder)
else:
# Get the current date and format it as YYYYMMDD
current_date = datetime.now()
target_folder = current_date.strftime("%Y%m%d")
# Create upload directory
upload_dir = os.path.join("projects", "uploads", target_folder)
os.makedirs(upload_dir, exist_ok=True)
# Process filename
if not file.filename:
raise HTTPException(status_code=400, detail="Filename cannot be empty")
# Parse filename and extension
original_filename = file.filename
name_without_ext, file_extension = os.path.splitext(original_filename)
# Choose naming strategy based on whether a folder is specified
if folder:
final_filename, version = get_versioned_filename(upload_dir, name_without_ext, file_extension)
file_path = os.path.join(upload_dir, final_filename)
# Save file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"success": True,
"message": f"File uploaded successfully{' (version: ' + str(version) + ')' if version > 1 else ''}",
"file_path": file_path,
"folder": target_folder,
"original_filename": original_filename,
"version": version
}
else:
# Use UUID unique filename (original logic)
unique_filename = f"{uuid.uuid4()}{file_extension}"
file_path = os.path.join(upload_dir, unique_filename)
# Save file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"success": True,
"message": "File uploaded successfully",
"file_path": file_path,
"folder": target_folder,
"original_filename": original_filename
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading file: {str(e)}")
raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}")
# Task management routes that are related to file processing
@router.get("/api/v1/task/{task_id}/status")
async def get_task_status(task_id: str):
"""Get task status - simple and reliable."""
try:
status_data = task_status_store.get_status(task_id)
if not status_data:
return {
"success": False,
"message": "Task does not exist or has expired",
"task_id": task_id,
"status": "not_found"
}
return {
"success": True,
"message": "Task status retrieved successfully",
"task_id": task_id,
"status": status_data["status"],
"unique_id": status_data["unique_id"],
"created_at": status_data["created_at"],
"updated_at": status_data["updated_at"],
"result": status_data.get("result"),
"error": status_data.get("error")
}
except Exception as e:
logger.error(f"Error getting task status: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve task status: {str(e)}")
@router.delete("/api/v1/task/{task_id}")
async def delete_task(task_id: str):
"""Delete task record."""
try:
success = task_status_store.delete_status(task_id)
if success:
return {
"success": True,
"message": f"Task record deleted: {task_id}",
"task_id": task_id
}
else:
return {
"success": False,
"message": f"Task record does not exist: {task_id}",
"task_id": task_id
}
except Exception as e:
logger.error(f"Error deleting task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete task record: {str(e)}")
@router.get("/api/v1/tasks")
async def list_tasks(status: Optional[str] = None, dataset_id: Optional[str] = None, limit: int = 100):
"""List tasks with optional filters."""
try:
if status or dataset_id:
# Use search function
tasks = task_status_store.search_tasks(status=status, unique_id=dataset_id, limit=limit)
else:
# Get all tasks
all_tasks = task_status_store.list_all()
tasks = list(all_tasks.values())[:limit]
return {
"success": True,
"message": "Task list retrieved successfully",
"total_tasks": len(tasks),
"tasks": tasks,
"filters": {
"status": status,
"dataset_id": dataset_id,
"limit": limit
}
}
except Exception as e:
logger.error(f"Error listing tasks: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve task list: {str(e)}")
@router.get("/api/v1/tasks/statistics")
async def get_task_statistics():
"""Get task statistics."""
try:
stats = task_status_store.get_statistics()
return {
"success": True,
"message": "Statistics retrieved successfully",
"statistics": stats
}
except Exception as e:
logger.error(f"Error getting statistics: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve statistics: {str(e)}")
@router.post("/api/v1/tasks/cleanup")
async def cleanup_tasks(older_than_days: int = 7):
"""Clean up old task records."""
try:
deleted_count = task_status_store.cleanup_old_tasks(older_than_days=older_than_days)
return {
"success": True,
"message": f"Cleaned up {deleted_count} old task records",
"deleted_count": deleted_count,
"older_than_days": older_than_days
}
except Exception as e:
logger.error(f"Error cleaning up tasks: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to clean up task records: {str(e)}")