# feat: Schedule Job 定时任务系统 ## 概述 为每个用户的每个 bot 提供定时任务能力,支持 cron 周期任务和一次性定时任务。到期后系统自动调用 Agent 执行任务,结果写入日志并通过 PostAgent Hook 推送通知。 ## 背景 ### 当前状态 系统已有的后台任务机制: - **Huey 任务队列** (`task_queue/`): SQLite 后端,用于文件处理 - **AsyncIO 后台任务**: `asyncio.create_task()` 用于非阻塞操作 - **Checkpoint 清理调度器** (`agent/db_pool_manager.py`): asyncio loop 方式 **缺失能力**: - 无法为用户设置周期性或定时触发的 Agent 任务 - 无 cron 式调度器 - 无用户级任务管理接口 ### 设计目标 - 用户通过 AI 对话即可创建/管理定时任务 - 任务数据以 YAML 文件存储在用户专属目录,便于查看和备份 - 全局调度器以 asyncio 后台任务运行,无需额外进程 - 复用现有 `create_agent_and_generate_response()` 执行任务 ## 架构设计 ``` ┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────┐ │ AI Agent 对话 │ │ 全局异步调度器 │ │ Agent 执行引擎 │ │ (Shell脚本工具) │────▶│ (asyncio loop) │────▶│ create_agent_and_ │ │ 增删改查 tasks │ │ 每60s扫描tasks.yaml │ │ generate_response │ └─────────────────┘ └──────────────────────┘ └────────────────────┘ │ │ │ ▼ ▼ ▼ tasks.yaml tasks.yaml task_logs/ (用户任务数据) (更新执行时间) (执行结果日志) ``` ## 数据模型 ### tasks.yaml **存储路径**: `projects/robot/{bot_id}/users/{user_id}/tasks.yaml` ```yaml tasks: - id: "task_20260330143000_abc123" # 自动生成 name: "每日新闻摘要" # 任务名称 type: "cron" # cron | once schedule: "0 9 * * *" # cron 表达式(type=cron 时) scheduled_at: null # ISO 8601 UTC(type=once 时) timezone: "Asia/Tokyo" # 用户时区,用于 cron 解析 message: "请帮我总结今天的科技新闻" # 发送给 agent 的消息 status: "active" # active | paused | done | expired created_at: "2026-03-30T05:30:00Z" last_executed_at: null # 上次执行时间(UTC) next_run_at: "2026-03-31T00:00:00Z" # 下次执行时间(UTC,调度器用此判断) execution_count: 0 # 已执行次数 ``` **关键设计决策**: - 所有时间统一 UTC 存储,cron 表达式结合 timezone 字段在本地时间计算后转 UTC - `next_run_at` 预计算,调度器只需简单比较时间戳即可判断是否到期 - 一次性任务执行后 status 自动变为 `done` ### execution.log **存储路径**: `projects/robot/{bot_id}/users/{user_id}/task_logs/execution.log` ```yaml - task_id: "task_20260330143000_abc123" task_name: "每日新闻摘要" executed_at: "2026-03-31T00:00:15Z" status: "success" # success | error response: "今天的科技新闻摘要:..." duration_ms: 12500 ``` 保留最近 100 条日志,自动清理旧记录。 ## 新增文件 ### 1. `skills/schedule-job/scripts/schedule_manager.py` Shell 命令行工具(argparse CLI),AI 通过 shell 调用来管理用户的定时任务。 **环境变量输入**(通过 plugin hook 机制传入): - `BOT_ID`: 当前 bot ID - `USER_IDENTIFIER`: 当前用户标识 **支持的命令**: | 命令 | 说明 | 示例 | |------|------|------| | `list` | 列出任务 | `list --format brief` | | `add` | 添加任务 | `add --name "每日新闻" --type cron --schedule "0 9 * * *" --timezone "Asia/Tokyo" --message "..."` | | `edit` | 编辑任务 | `edit --schedule "0 10 * * 1-5"` | | `delete` | 删除任务 | `delete ` | | `toggle` | 暂停/恢复 | `toggle ` | | `logs` | 查看日志 | `logs --task-id --limit 10` | **核心逻辑**: - 一次性任务 `--scheduled-at` 接受 ISO 8601 格式(带时区偏移),内部转 UTC - cron 任务通过 `croniter` + timezone 计算 `next_run_at` - 自动创建 `users/{user_id}/` 目录 ### 2. `skills/schedule-job/SKILL.md` Skill 描述文件,注入 AI prompt,告诉 AI: - 如何使用 schedule_manager.py CLI - cron 表达式语法说明 - 时区映射规则(语言 → 时区) - 使用示例 ### 3. `skills/schedule-job/.claude-plugin/plugin.json` ```json { "hooks": { "PrePrompt": { "command": "python scripts/schedule_manager.py list --format brief" } } } ``` 通过 PrePrompt hook,AI 每次对话时自动看到用户当前的任务列表。 ### 4. `services/schedule_executor.py` 全局异步调度器,核心类 `ScheduleExecutor`。 **运行机制**:参考 `db_pool_manager.py` 的 `_cleanup_loop()` 模式 ``` 启动 → asyncio.create_task(_scan_loop) ↓ 每 SCAN_INTERVAL 秒 ↓ 遍历 projects/robot/*/users/*/tasks.yaml ↓ 找到 status=active && next_run_at <= now 的任务 ↓ asyncio.create_task(_execute_task) → 受 Semaphore 并发控制 ↓ 构建 AgentConfig → create_agent_and_generate_response(stream=False) ↓ 写入 execution.log → 更新 tasks.yaml (next_run_at / status) ``` **并发与防重复**: - `_executing_tasks: set` 记录正在执行的任务 ID,防止同一任务被重复触发 - `asyncio.Semaphore(MAX_CONCURRENT)` 限制最大并发数(默认 5) **任务执行后更新**: - cron 任务:使用 croniter 计算下次 UTC 时间写入 `next_run_at` - once 任务:将 status 设为 `done`,清空 `next_run_at` - 失败时也更新 `next_run_at`,避免无限重试 **Agent 调用构建**: ```python bot_config = await fetch_bot_config(bot_id) project_dir = create_project_directory(dataset_ids, bot_id, skills) config = AgentConfig( bot_id=bot_id, user_identifier=user_id, session_id=f"schedule_{task_id}", # 专用 session stream=False, tool_response=False, messages=[{"role": "user", "content": task["message"]}], ... # 其余参数从 bot_config 获取 ) result = await create_agent_and_generate_response(config) ``` ## 修改文件 ### `pyproject.toml` 新增依赖: ```toml "croniter (>=2.0.0,<4.0.0)", "pyyaml (>=6.0,<7.0)", ``` ### `utils/settings.py` 新增环境变量: ```python SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true" SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) # 扫描间隔(秒) SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # 最大并发数 ``` ### `fastapi_app.py` 在 lifespan 中集成调度器生命周期: - **startup**: `schedule_executor.start()` — 在 checkpoint 清理调度器之后启动 - **shutdown**: `await schedule_executor.stop()` — 在 Mem0 关闭之前停止 ## 复用的现有组件 | 组件 | 路径 | 用途 | |------|------|------| | `create_agent_and_generate_response()` | `routes/chat.py:246` | 执行 agent 调用 | | `fetch_bot_config()` | `utils/fastapi_utils.py` | 获取 bot 配置 | | `create_project_directory()` | `utils/fastapi_utils.py` | 创建项目目录 | | `AgentConfig` | `agent/agent_config.py` | 构建执行配置 | | `_cleanup_loop()` 模式 | `agent/db_pool_manager.py:240` | asyncio 后台循环参考 | | `execute_hooks('PostAgent')` | `agent/plugin_hook_loader.py` | 执行结果通知 | ## 验证方式 ```bash # 1. 启动服务 poetry run uvicorn fastapi_app:app --host 0.0.0.0 --port 8001 # 2. CLI 创建测试任务 BOT_ID= USER_IDENTIFIER=test_user \ poetry run python skills/schedule-job/scripts/schedule_manager.py add \ --name "测试任务" --type once \ --scheduled-at "$(date -u -v+2M '+%Y-%m-%dT%H:%M:%SZ')" \ --message "你好,这是一个定时任务测试" # 3. 检查 tasks.yaml cat projects/robot//users/test_user/tasks.yaml # 4. 等待调度器执行(最多 60 秒),观察应用日志 # 5. 检查执行结果 cat projects/robot//users/test_user/task_logs/execution.log # 6. 通过 API 对话测试 skill curl -X POST http://localhost:8001/api/v2/chat/completions \ -H 'authorization: Bearer ' \ -H 'content-type: application/json' \ -d '{"messages":[{"role":"user","content":"帮我创建一个每天早上9点的定时任务"}],"stream":false,"bot_id":"","user_identifier":"test_user"}' ``` ## 后续扩展方向 - **任务失败重试**: 可增加 `max_retries` 和 `retry_delay` 字段 - **任务执行超时**: 为单个任务设置超时限制 - **通知渠道**: 集成飞书/邮件等 PostAgent Hook 推送执行结果 - **Web 管理界面**: 提供 REST API 查询/管理定时任务 - **任务模板**: 预设常用任务模板(每日新闻、天气播报等)