| .. | ||
| __init__.py | ||
| config.py | ||
| consumer.py | ||
| example.py | ||
| integration_tasks.py | ||
| manager.py | ||
| README.md | ||
| task_status.py | ||
| tasks.py | ||
队列系统使用说明
概述
本项目集成了基于 huey 和 SqliteHuey 的异步队列系统,用于处理文件的异步处理任务。
安装依赖
pip install huey
目录结构
queue/
├── __init__.py # 包初始化文件
├── config.py # 队列配置(SqliteHuey配置)
├── tasks.py # 文件处理任务定义
├── manager.py # 队列管理器
├── consumer.py # 队列消费者(工作进程)
├── example.py # 使用示例
└── README.md # 说明文档
核心功能
1. 队列配置 (config.py)
- 使用 SqliteHuey 作为消息队列
- 数据库文件存储在
queue_data/huey.db - 支持任务重试和错误存储
2. 文件处理任务 (tasks.py)
process_file_async: 异步处理单个文件process_multiple_files_async: 批量异步处理文件process_zip_file_async: 异步处理zip压缩文件cleanup_processed_files: 清理旧的文件
3. 队列管理器 (manager.py)
- 任务提交和管理
- 队列状态监控
- 任务结果查询
- 任务记录清理
使用方法
1. 启动队列消费者
# 启动默认配置的消费者
python queue/consumer.py
# 指定工作线程数
python queue/consumer.py --workers 4
# 查看队列统计信息
python queue/consumer.py --stats
# 检查队列状态
python queue/consumer.py --check
# 清空队列
python queue/consumer.py --flush
2. 在代码中使用队列
from queue.manager import queue_manager
# 处理单个文件
task_id = queue_manager.enqueue_file(
project_id="my_project",
file_path="/path/to/file.txt",
original_filename="myfile.txt"
)
# 批量处理文件
task_ids = queue_manager.enqueue_multiple_files(
project_id="my_project",
file_paths=["/path/file1.txt", "/path/file2.txt"],
original_filenames=["file1.txt", "file2.txt"]
)
# 处理zip文件
task_id = queue_manager.enqueue_zip_file(
project_id="my_project",
zip_path="/path/to/archive.zip"
)
# 查看任务状态
status = queue_manager.get_task_status(task_id)
print(status)
# 获取队列统计信息
stats = queue_manager.get_queue_stats()
print(stats)
3. 运行示例
python queue/example.py
配置说明
队列配置参数 (config.py)
filename: SQLite数据库文件路径always_eager: 是否立即执行任务(开发时可设为True)utc: 是否使用UTC时间compression_level: 压缩级别store_errors: 是否存储错误信息max_retries: 最大重试次数retry_delay: 重试延迟
消费者参数 (consumer.py)
--workers: 工作线程数(默认2)--worker-type: 工作类型(threads/greenlets/processes)--stats: 显示统计信息--check: 检查队列状态--flush: 清空队列
任务状态
pending: 等待处理running: 正在处理complete/finished: 处理完成error: 处理失败scheduled: 定时任务
最佳实践
-
生产环境建议:
- 设置合适的工作线程数(建议CPU核心数的1-2倍)
- 定期清理旧的任务记录
- 监控队列状态和任务执行情况
-
开发环境建议:
- 可以设置
always_eager=True立即执行任务进行调试 - 使用
--check参数查看队列状态 - 运行示例代码了解功能
- 可以设置
-
错误处理:
- 任务失败后会自动重试(最多3次)
- 错误信息会存储在数据库中
- 可以通过
get_task_status()查看错误详情
故障排除
- 数据库锁定: 确保只有一个消费者实例在运行
- 任务卡住: 检查文件路径和权限
- 内存不足: 调整工作线程数或使用进程模式
- 磁盘空间: 定期清理旧文件和任务记录