qwen_agent/plans/schedule-job.md
朱潮 3b9c7165a9 feat: 添加定时任务调度系统(schedule-job)
- 新增 schedule-job skill,支持 cron 周期任务和一次性定时任务
- 新增 schedule_manager.py CLI 工具(list/add/edit/delete/toggle/logs)
- 新增 ScheduleExecutor 全局异步调度器,每 60s 扫描到期任务并调用 agent 执行
- 任务数据存储在 projects/robot/{bot_id}/users/{user_id}/tasks.yaml
- 执行结果写入 task_logs/execution.log
- 集成到 FastAPI lifespan 生命周期管理
- 添加 croniter、pyyaml 依赖

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 23:17:47 +08:00

250 lines
9.1 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# feat: Schedule Job 定时任务系统
## 概述
为每个用户的每个 bot 提供定时任务能力,支持 cron 周期任务和一次性定时任务。到期后系统自动调用 Agent 执行任务,结果写入日志并通过 PostAgent Hook 推送通知。
## 背景
### 当前状态
系统已有的后台任务机制:
- **Huey 任务队列** (`task_queue/`): SQLite 后端,用于文件处理
- **AsyncIO 后台任务**: `asyncio.create_task()` 用于非阻塞操作
- **Checkpoint 清理调度器** (`agent/db_pool_manager.py`): asyncio loop 方式
**缺失能力**
- 无法为用户设置周期性或定时触发的 Agent 任务
- 无 cron 式调度器
- 无用户级任务管理接口
### 设计目标
- 用户通过 AI 对话即可创建/管理定时任务
- 任务数据以 YAML 文件存储在用户专属目录,便于查看和备份
- 全局调度器以 asyncio 后台任务运行,无需额外进程
- 复用现有 `create_agent_and_generate_response()` 执行任务
## 架构设计
```
┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────┐
│ AI Agent 对话 │ │ 全局异步调度器 │ │ Agent 执行引擎 │
│ (Shell脚本工具) │────▶│ (asyncio loop) │────▶│ create_agent_and_ │
│ 增删改查 tasks │ │ 每60s扫描tasks.yaml │ │ generate_response │
└─────────────────┘ └──────────────────────┘ └────────────────────┘
│ │ │
▼ ▼ ▼
tasks.yaml tasks.yaml task_logs/
(用户任务数据) (更新执行时间) (执行结果日志)
```
## 数据模型
### tasks.yaml
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/tasks.yaml`
```yaml
tasks:
- id: "task_20260330143000_abc123" # 自动生成
name: "每日新闻摘要" # 任务名称
type: "cron" # cron | once
schedule: "0 9 * * *" # cron 表达式type=cron 时)
scheduled_at: null # ISO 8601 UTCtype=once 时)
timezone: "Asia/Tokyo" # 用户时区,用于 cron 解析
message: "请帮我总结今天的科技新闻" # 发送给 agent 的消息
status: "active" # active | paused | done | expired
created_at: "2026-03-30T05:30:00Z"
last_executed_at: null # 上次执行时间UTC
next_run_at: "2026-03-31T00:00:00Z" # 下次执行时间UTC调度器用此判断
execution_count: 0 # 已执行次数
```
**关键设计决策**
- 所有时间统一 UTC 存储cron 表达式结合 timezone 字段在本地时间计算后转 UTC
- `next_run_at` 预计算,调度器只需简单比较时间戳即可判断是否到期
- 一次性任务执行后 status 自动变为 `done`
### execution.log
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/task_logs/execution.log`
```yaml
- task_id: "task_20260330143000_abc123"
task_name: "每日新闻摘要"
executed_at: "2026-03-31T00:00:15Z"
status: "success" # success | error
response: "今天的科技新闻摘要:..."
duration_ms: 12500
```
保留最近 100 条日志,自动清理旧记录。
## 新增文件
### 1. `skills/schedule-job/scripts/schedule_manager.py`
Shell 命令行工具argparse CLIAI 通过 shell 调用来管理用户的定时任务。
**环境变量输入**(通过 plugin hook 机制传入):
- `BOT_ID`: 当前 bot ID
- `USER_IDENTIFIER`: 当前用户标识
**支持的命令**
| 命令 | 说明 | 示例 |
|------|------|------|
| `list` | 列出任务 | `list --format brief` |
| `add` | 添加任务 | `add --name "每日新闻" --type cron --schedule "0 9 * * *" --timezone "Asia/Tokyo" --message "..."` |
| `edit` | 编辑任务 | `edit <task_id> --schedule "0 10 * * 1-5"` |
| `delete` | 删除任务 | `delete <task_id>` |
| `toggle` | 暂停/恢复 | `toggle <task_id>` |
| `logs` | 查看日志 | `logs --task-id <id> --limit 10` |
**核心逻辑**
- 一次性任务 `--scheduled-at` 接受 ISO 8601 格式(带时区偏移),内部转 UTC
- cron 任务通过 `croniter` + timezone 计算 `next_run_at`
- 自动创建 `users/{user_id}/` 目录
### 2. `skills/schedule-job/SKILL.md`
Skill 描述文件,注入 AI prompt告诉 AI
- 如何使用 schedule_manager.py CLI
- cron 表达式语法说明
- 时区映射规则(语言 → 时区)
- 使用示例
### 3. `skills/schedule-job/.claude-plugin/plugin.json`
```json
{
"hooks": {
"PrePrompt": {
"command": "python scripts/schedule_manager.py list --format brief"
}
}
}
```
通过 PrePrompt hookAI 每次对话时自动看到用户当前的任务列表。
### 4. `services/schedule_executor.py`
全局异步调度器,核心类 `ScheduleExecutor`
**运行机制**:参考 `db_pool_manager.py``_cleanup_loop()` 模式
```
启动 → asyncio.create_task(_scan_loop)
每 SCAN_INTERVAL 秒
遍历 projects/robot/*/users/*/tasks.yaml
找到 status=active && next_run_at <= now 的任务
asyncio.create_task(_execute_task) → 受 Semaphore 并发控制
构建 AgentConfig → create_agent_and_generate_response(stream=False)
写入 execution.log → 更新 tasks.yaml (next_run_at / status)
```
**并发与防重复**
- `_executing_tasks: set` 记录正在执行的任务 ID防止同一任务被重复触发
- `asyncio.Semaphore(MAX_CONCURRENT)` 限制最大并发数(默认 5
**任务执行后更新**
- cron 任务:使用 croniter 计算下次 UTC 时间写入 `next_run_at`
- once 任务:将 status 设为 `done`,清空 `next_run_at`
- 失败时也更新 `next_run_at`,避免无限重试
**Agent 调用构建**
```python
bot_config = await fetch_bot_config(bot_id)
project_dir = create_project_directory(dataset_ids, bot_id, skills)
config = AgentConfig(
bot_id=bot_id,
user_identifier=user_id,
session_id=f"schedule_{task_id}", # 专用 session
stream=False,
tool_response=False,
messages=[{"role": "user", "content": task["message"]}],
... # 其余参数从 bot_config 获取
)
result = await create_agent_and_generate_response(config)
```
## 修改文件
### `pyproject.toml`
新增依赖:
```toml
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
```
### `utils/settings.py`
新增环境变量:
```python
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) # 扫描间隔(秒)
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # 最大并发数
```
### `fastapi_app.py`
在 lifespan 中集成调度器生命周期:
- **startup**: `schedule_executor.start()` — 在 checkpoint 清理调度器之后启动
- **shutdown**: `await schedule_executor.stop()` — 在 Mem0 关闭之前停止
## 复用的现有组件
| 组件 | 路径 | 用途 |
|------|------|------|
| `create_agent_and_generate_response()` | `routes/chat.py:246` | 执行 agent 调用 |
| `fetch_bot_config()` | `utils/fastapi_utils.py` | 获取 bot 配置 |
| `create_project_directory()` | `utils/fastapi_utils.py` | 创建项目目录 |
| `AgentConfig` | `agent/agent_config.py` | 构建执行配置 |
| `_cleanup_loop()` 模式 | `agent/db_pool_manager.py:240` | asyncio 后台循环参考 |
| `execute_hooks('PostAgent')` | `agent/plugin_hook_loader.py` | 执行结果通知 |
## 验证方式
```bash
# 1. 启动服务
poetry run uvicorn fastapi_app:app --host 0.0.0.0 --port 8001
# 2. CLI 创建测试任务
BOT_ID=<bot_id> USER_IDENTIFIER=test_user \
poetry run python skills/schedule-job/scripts/schedule_manager.py add \
--name "测试任务" --type once \
--scheduled-at "$(date -u -v+2M '+%Y-%m-%dT%H:%M:%SZ')" \
--message "你好,这是一个定时任务测试"
# 3. 检查 tasks.yaml
cat projects/robot/<bot_id>/users/test_user/tasks.yaml
# 4. 等待调度器执行(最多 60 秒),观察应用日志
# 5. 检查执行结果
cat projects/robot/<bot_id>/users/test_user/task_logs/execution.log
# 6. 通过 API 对话测试 skill
curl -X POST http://localhost:8001/api/v2/chat/completions \
-H 'authorization: Bearer <token>' \
-H 'content-type: application/json' \
-d '{"messages":[{"role":"user","content":"帮我创建一个每天早上9点的定时任务"}],"stream":false,"bot_id":"<bot_id>","user_identifier":"test_user"}'
```
## 后续扩展方向
- **任务失败重试**: 可增加 `max_retries``retry_delay` 字段
- **任务执行超时**: 为单个任务设置超时限制
- **通知渠道**: 集成飞书/邮件等 PostAgent Hook 推送执行结果
- **Web 管理界面**: 提供 REST API 查询/管理定时任务
- **任务模板**: 预设常用任务模板(每日新闻、天气播报等)