qwen_agent/task_queue/tasks.py
朱潮 425f3c5bb4 chore: replace Chinese comments and log messages with English
Convert all Chinese comments, docstrings, logger/print output,
HTTPException detail messages, and API response messages to English
across the entire codebase. Functional zh/ja localized strings
(e.g. prompt templates, timezone display names, date formats) are
preserved as-is.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-30 19:45:35 +08:00

360 lines
11 KiB
Python

#!/usr/bin/env python3
"""
File processing tasks for the queue system.
"""
import os
import json
import time
import shutil
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any
from huey import crontab
# Configure logging
logger = logging.getLogger('app')
from .config import huey
from utils.file_utils import (
extract_zip_file,
get_file_hash,
load_processed_files_log,
save_processed_files_log,
get_document_preview
)
@huey.task()
def process_file_async(
project_id: str,
file_path: str,
original_filename: str = None,
target_directory: str = "files"
) -> Dict[str, Any]:
"""
Asynchronously process a single file.
Args:
project_id: Project ID
file_path: File path
original_filename: Original filename
target_directory: Target directory
Returns:
Processing result dictionary
"""
try:
logger.info(f"Starting file processing: {file_path}")
# Ensure project directory exists
project_dir = os.path.join("projects", project_id)
files_dir = os.path.join(project_dir, target_directory)
os.makedirs(files_dir, exist_ok=True)
# Get file hash as identifier
file_hash = get_file_hash(file_path)
# Check if file has already been processed
processed_log = load_processed_files_log(project_id)
if file_hash in processed_log:
logger.info(f"File already processed, skipping: {file_path}")
return {
"status": "skipped",
"message": "File already processed",
"file_hash": file_hash,
"project_id": project_id
}
# Process the file
result = _process_single_file(
file_path,
files_dir,
original_filename or os.path.basename(file_path)
)
# Update processing log
if result["status"] == "success":
processed_log[file_hash] = {
"original_path": file_path,
"original_filename": original_filename or os.path.basename(file_path),
"processed_at": str(time.time()),
"status": "processed",
"result": result
}
save_processed_files_log(project_id, processed_log)
result["file_hash"] = file_hash
result["project_id"] = project_id
logger.info(f"File processing complete: {file_path}, status: {result['status']}")
return result
except Exception as e:
error_msg = f"Error processing file: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"file_path": file_path,
"project_id": project_id
}
@huey.task()
def process_multiple_files_async(
project_id: str,
file_paths: List[str],
original_filenames: List[str] = None
) -> List[Dict[str, Any]]:
"""
Asynchronously process multiple files in batch.
Args:
project_id: Project ID
file_paths: List of file paths
original_filenames: List of original filenames
Returns:
List of processing results
"""
try:
logger.info(f"Starting batch processing of {len(file_paths)} files")
results = []
for i, file_path in enumerate(file_paths):
original_filename = original_filenames[i] if original_filenames and i < len(original_filenames) else None
# Create async task for each file
result = process_file_async(project_id, file_path, original_filename)
results.append(result)
logger.info(f"Batch file processing tasks submitted, total {len(results)} files")
return results
except Exception as e:
error_msg = f"Error during batch file processing: {str(e)}"
logger.error(error_msg)
return [{
"status": "error",
"message": error_msg,
"project_id": project_id
}]
@huey.task()
def process_zip_file_async(
project_id: str,
zip_path: str,
extract_to: str = None
) -> Dict[str, Any]:
"""
Asynchronously process a zip archive file.
Args:
project_id: Project ID
zip_path: Zip file path
extract_to: Extraction target directory
Returns:
Processing result dictionary
"""
try:
logger.info(f"Starting zip file processing: {zip_path}")
# Set extraction directory
if extract_to is None:
extract_to = os.path.join("projects", project_id, "extracted", os.path.basename(zip_path))
os.makedirs(extract_to, exist_ok=True)
# Extract files
extracted_files = extract_zip_file(zip_path, extract_to)
if not extracted_files:
return {
"status": "error",
"message": "Extraction failed or no supported files found",
"zip_path": zip_path,
"project_id": project_id
}
# Batch process extracted files
result = process_multiple_files_async(project_id, extracted_files)
return {
"status": "success",
"message": f"Zip file processing complete, extracted {len(extracted_files)} files",
"zip_path": zip_path,
"extract_to": extract_to,
"extracted_files": extracted_files,
"project_id": project_id,
"batch_task_result": result
}
except Exception as e:
error_msg = f"Error processing zip file: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"zip_path": zip_path,
"project_id": project_id
}
@huey.task()
def cleanup_processed_files(
project_id: str,
older_than_days: int = 30
) -> Dict[str, Any]:
"""
Clean up old processed files.
Args:
project_id: Project ID
older_than_days: Clean files older than this many days
Returns:
Cleanup result dictionary
"""
try:
logger.info(f"Starting cleanup of files older than {older_than_days} days in project {project_id}")
project_dir = os.path.join("projects", project_id)
if not os.path.exists(project_dir):
return {
"status": "error",
"message": "Project directory does not exist",
"project_id": project_id
}
current_time = time.time()
cutoff_time = current_time - (older_than_days * 24 * 3600)
cleaned_files = []
# Walk through project directory
for root, dirs, files in os.walk(project_dir):
for file in files:
file_path = os.path.join(root, file)
file_mtime = os.path.getmtime(file_path)
if file_mtime < cutoff_time:
try:
os.remove(file_path)
cleaned_files.append(file_path)
logger.info(f"Deleted old file: {file_path}")
except Exception as e:
logger.error(f"Failed to delete file {file_path}: {str(e)}")
# Clean up empty directories
for root, dirs, files in os.walk(project_dir, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
if not os.listdir(dir_path):
os.rmdir(dir_path)
logger.info(f"Deleted empty directory: {dir_path}")
except Exception as e:
logger.error(f"Failed to delete directory {dir_path}: {str(e)}")
return {
"status": "success",
"message": f"Cleanup complete, deleted {len(cleaned_files)} files",
"project_id": project_id,
"cleaned_files": cleaned_files,
"older_than_days": older_than_days
}
except Exception as e:
error_msg = f"Error during file cleanup: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"project_id": project_id
}
def _process_single_file(
file_path: str,
target_dir: str,
original_filename: str
) -> Dict[str, Any]:
"""
Internal method for processing a single file.
Args:
file_path: Source file path
target_dir: Target directory
original_filename: Original filename
Returns:
Processing result dictionary
"""
try:
# Check if file exists
if not os.path.exists(file_path):
return {
"status": "error",
"message": "Source file does not exist",
"file_path": file_path
}
# Get file info
file_size = os.path.getsize(file_path)
file_ext = os.path.splitext(original_filename)[1].lower()
# Different processing based on file type
supported_extensions = ['.txt', '.md', '.csv', '.xlsx', '.zip']
if file_ext not in supported_extensions:
return {
"status": "error",
"message": f"Unsupported file type: {file_ext}",
"file_path": file_path,
"supported_extensions": supported_extensions
}
# Copy file to target directory
target_file_path = os.path.join(target_dir, original_filename)
# If target file already exists, add timestamp
if os.path.exists(target_file_path):
name, ext = os.path.splitext(original_filename)
timestamp = int(time.time())
target_file_path = os.path.join(target_dir, f"{name}_{timestamp}{ext}")
shutil.copy2(file_path, target_file_path)
# Get file preview (if it's a text file)
preview = None
if file_ext in ['.txt', '.md']:
preview = get_document_preview(target_file_path, max_lines=5)
return {
"status": "success",
"message": "File processed successfully",
"original_path": file_path,
"target_path": target_file_path,
"file_size": file_size,
"file_extension": file_ext,
"preview": preview
}
except Exception as e:
return {
"status": "error",
"message": f"Error processing file: {str(e)}",
"file_path": file_path
}
# Periodic task example: clean up files older than 30 days daily at 2 AM
@huey.periodic_task(crontab(hour=2, minute=0))
def daily_cleanup():
"""Daily cleanup task."""
logger.info("Running daily cleanup task")
# Add cleanup logic here
return {"status": "completed", "message": "Daily cleanup task completed"}