qwen_agent/task_queue
2025-12-23 17:36:45 +08:00
..
__init__.py add task 2025-10-18 09:20:59 +08:00
config.py 日志优化 2025-11-27 21:50:03 +08:00
consumer.py 新增队列管理后台 2025-11-09 10:31:23 +08:00
example.py add task 2025-10-18 09:20:59 +08:00
integration_tasks.py 新增folder功能,删除unique_id 2025-11-09 11:54:03 +08:00
manager.py 日志优化 2025-11-27 21:50:03 +08:00
optimized_consumer.py 日志优化 2025-11-27 21:50:03 +08:00
README.md add task 2025-10-18 09:20:59 +08:00
task_status.py 新增队列管理后台 2025-11-09 10:31:23 +08:00
tasks.py add symlink 2025-12-23 17:36:45 +08:00

队列系统使用说明

概述

本项目集成了基于 huey 和 SqliteHuey 的异步队列系统,用于处理文件的异步处理任务。

安装依赖

pip install huey

目录结构

queue/
├── __init__.py          # 包初始化文件
├── config.py           # 队列配置SqliteHuey配置
├── tasks.py            # 文件处理任务定义
├── manager.py          # 队列管理器
├── consumer.py         # 队列消费者(工作进程)
├── example.py          # 使用示例
└── README.md           # 说明文档

核心功能

1. 队列配置 (config.py)

  • 使用 SqliteHuey 作为消息队列
  • 数据库文件存储在 queue_data/huey.db
  • 支持任务重试和错误存储

2. 文件处理任务 (tasks.py)

  • process_file_async: 异步处理单个文件
  • process_multiple_files_async: 批量异步处理文件
  • process_zip_file_async: 异步处理zip压缩文件
  • cleanup_processed_files: 清理旧的文件

3. 队列管理器 (manager.py)

  • 任务提交和管理
  • 队列状态监控
  • 任务结果查询
  • 任务记录清理

使用方法

1. 启动队列消费者

# 启动默认配置的消费者
python queue/consumer.py

# 指定工作线程数
python queue/consumer.py --workers 4

# 查看队列统计信息
python queue/consumer.py --stats

# 检查队列状态
python queue/consumer.py --check

# 清空队列
python queue/consumer.py --flush

2. 在代码中使用队列

from queue.manager import queue_manager

# 处理单个文件
task_id = queue_manager.enqueue_file(
    project_id="my_project",
    file_path="/path/to/file.txt",
    original_filename="myfile.txt"
)

# 批量处理文件
task_ids = queue_manager.enqueue_multiple_files(
    project_id="my_project",
    file_paths=["/path/file1.txt", "/path/file2.txt"],
    original_filenames=["file1.txt", "file2.txt"]
)

# 处理zip文件
task_id = queue_manager.enqueue_zip_file(
    project_id="my_project",
    zip_path="/path/to/archive.zip"
)

# 查看任务状态
status = queue_manager.get_task_status(task_id)
print(status)

# 获取队列统计信息
stats = queue_manager.get_queue_stats()
print(stats)

3. 运行示例

python queue/example.py

配置说明

队列配置参数 (config.py)

  • filename: SQLite数据库文件路径
  • always_eager: 是否立即执行任务开发时可设为True
  • utc: 是否使用UTC时间
  • compression_level: 压缩级别
  • store_errors: 是否存储错误信息
  • max_retries: 最大重试次数
  • retry_delay: 重试延迟

消费者参数 (consumer.py)

  • --workers: 工作线程数默认2
  • --worker-type: 工作类型threads/greenlets/processes
  • --stats: 显示统计信息
  • --check: 检查队列状态
  • --flush: 清空队列

任务状态

  • pending: 等待处理
  • running: 正在处理
  • complete/finished: 处理完成
  • error: 处理失败
  • scheduled: 定时任务

最佳实践

  1. 生产环境建议:

    • 设置合适的工作线程数建议CPU核心数的1-2倍
    • 定期清理旧的任务记录
    • 监控队列状态和任务执行情况
  2. 开发环境建议:

    • 可以设置 always_eager=True 立即执行任务进行调试
    • 使用 --check 参数查看队列状态
    • 运行示例代码了解功能
  3. 错误处理:

    • 任务失败后会自动重试最多3次
    • 错误信息会存储在数据库中
    • 可以通过 get_task_status() 查看错误详情

故障排除

  1. 数据库锁定: 确保只有一个消费者实例在运行
  2. 任务卡住: 检查文件路径和权限
  3. 内存不足: 调整工作线程数或使用进程模式
  4. 磁盘空间: 定期清理旧文件和任务记录