catalog-agent/task_queue/README.md
2025-10-18 09:20:59 +08:00

154 lines
3.8 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 队列系统使用说明
## 概述
本项目集成了基于 huey 和 SqliteHuey 的异步队列系统,用于处理文件的异步处理任务。
## 安装依赖
```bash
pip install huey
```
## 目录结构
```
queue/
├── __init__.py # 包初始化文件
├── config.py # 队列配置SqliteHuey配置
├── tasks.py # 文件处理任务定义
├── manager.py # 队列管理器
├── consumer.py # 队列消费者(工作进程)
├── example.py # 使用示例
└── README.md # 说明文档
```
## 核心功能
### 1. 队列配置 (config.py)
- 使用 SqliteHuey 作为消息队列
- 数据库文件存储在 `queue_data/huey.db`
- 支持任务重试和错误存储
### 2. 文件处理任务 (tasks.py)
- `process_file_async`: 异步处理单个文件
- `process_multiple_files_async`: 批量异步处理文件
- `process_zip_file_async`: 异步处理zip压缩文件
- `cleanup_processed_files`: 清理旧的文件
### 3. 队列管理器 (manager.py)
- 任务提交和管理
- 队列状态监控
- 任务结果查询
- 任务记录清理
## 使用方法
### 1. 启动队列消费者
```bash
# 启动默认配置的消费者
python queue/consumer.py
# 指定工作线程数
python queue/consumer.py --workers 4
# 查看队列统计信息
python queue/consumer.py --stats
# 检查队列状态
python queue/consumer.py --check
# 清空队列
python queue/consumer.py --flush
```
### 2. 在代码中使用队列
```python
from queue.manager import queue_manager
# 处理单个文件
task_id = queue_manager.enqueue_file(
project_id="my_project",
file_path="/path/to/file.txt",
original_filename="myfile.txt"
)
# 批量处理文件
task_ids = queue_manager.enqueue_multiple_files(
project_id="my_project",
file_paths=["/path/file1.txt", "/path/file2.txt"],
original_filenames=["file1.txt", "file2.txt"]
)
# 处理zip文件
task_id = queue_manager.enqueue_zip_file(
project_id="my_project",
zip_path="/path/to/archive.zip"
)
# 查看任务状态
status = queue_manager.get_task_status(task_id)
print(status)
# 获取队列统计信息
stats = queue_manager.get_queue_stats()
print(stats)
```
### 3. 运行示例
```bash
python queue/example.py
```
## 配置说明
### 队列配置参数 (config.py)
- `filename`: SQLite数据库文件路径
- `always_eager`: 是否立即执行任务开发时可设为True
- `utc`: 是否使用UTC时间
- `compression_level`: 压缩级别
- `store_errors`: 是否存储错误信息
- `max_retries`: 最大重试次数
- `retry_delay`: 重试延迟
### 消费者参数 (consumer.py)
- `--workers`: 工作线程数默认2
- `--worker-type`: 工作类型threads/greenlets/processes
- `--stats`: 显示统计信息
- `--check`: 检查队列状态
- `--flush`: 清空队列
## 任务状态
- `pending`: 等待处理
- `running`: 正在处理
- `complete/finished`: 处理完成
- `error`: 处理失败
- `scheduled`: 定时任务
## 最佳实践
1. **生产环境建议**:
- 设置合适的工作线程数建议CPU核心数的1-2倍
- 定期清理旧的任务记录
- 监控队列状态和任务执行情况
2. **开发环境建议**:
- 可以设置 `always_eager=True` 立即执行任务进行调试
- 使用 `--check` 参数查看队列状态
- 运行示例代码了解功能
3. **错误处理**:
- 任务失败后会自动重试最多3次
- 错误信息会存储在数据库中
- 可以通过 `get_task_status()` 查看错误详情
## 故障排除
1. **数据库锁定**: 确保只有一个消费者实例在运行
2. **任务卡住**: 检查文件路径和权限
3. **内存不足**: 调整工作线程数或使用进程模式
4. **磁盘空间**: 定期清理旧文件和任务记录