feat: 添加定时任务调度系统(schedule-job)

- 新增 schedule-job skill,支持 cron 周期任务和一次性定时任务
- 新增 schedule_manager.py CLI 工具(list/add/edit/delete/toggle/logs)
- 新增 ScheduleExecutor 全局异步调度器,每 60s 扫描到期任务并调用 agent 执行
- 任务数据存储在 projects/robot/{bot_id}/users/{user_id}/tasks.yaml
- 执行结果写入 task_logs/execution.log
- 集成到 FastAPI lifespan 生命周期管理
- 添加 croniter、pyyaml 依赖

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
朱潮 2026-03-30 23:17:47 +08:00
parent 89b7bb9928
commit 3b9c7165a9
10 changed files with 1180 additions and 1 deletions

View File

@ -108,6 +108,7 @@ async def lifespan(app: FastAPI):
close_global_mem0
)
from utils.settings import CHECKPOINT_CLEANUP_ENABLED, MEM0_ENABLED
from utils.settings import SCHEDULE_ENABLED
# 1. 初始化共享的数据库连接池
db_pool_manager = await init_global_db_pool()
@ -141,10 +142,28 @@ async def lifespan(app: FastAPI):
db_pool_manager.start_cleanup_scheduler()
logger.info("Checkpoint cleanup scheduler started")
# 6. 启动定时任务调度器
schedule_executor = None
if SCHEDULE_ENABLED:
try:
from services.schedule_executor import get_schedule_executor
schedule_executor = get_schedule_executor()
schedule_executor.start()
logger.info("Schedule executor started")
except Exception as e:
logger.warning(f"Schedule executor start failed (non-fatal): {e}")
yield
# 关闭时清理(按相反顺序)
logger.info("Shutting down...")
# 关闭定时任务调度器
if schedule_executor:
try:
await schedule_executor.stop()
logger.info("Schedule executor stopped")
except Exception as e:
logger.warning(f"Schedule executor stop failed (non-fatal): {e}")
# 关闭 Mem0
if MEM0_ENABLED:
try:

249
plans/schedule-job.md Normal file
View File

@ -0,0 +1,249 @@
# feat: Schedule Job 定时任务系统
## 概述
为每个用户的每个 bot 提供定时任务能力,支持 cron 周期任务和一次性定时任务。到期后系统自动调用 Agent 执行任务,结果写入日志并通过 PostAgent Hook 推送通知。
## 背景
### 当前状态
系统已有的后台任务机制:
- **Huey 任务队列** (`task_queue/`): SQLite 后端,用于文件处理
- **AsyncIO 后台任务**: `asyncio.create_task()` 用于非阻塞操作
- **Checkpoint 清理调度器** (`agent/db_pool_manager.py`): asyncio loop 方式
**缺失能力**
- 无法为用户设置周期性或定时触发的 Agent 任务
- 无 cron 式调度器
- 无用户级任务管理接口
### 设计目标
- 用户通过 AI 对话即可创建/管理定时任务
- 任务数据以 YAML 文件存储在用户专属目录,便于查看和备份
- 全局调度器以 asyncio 后台任务运行,无需额外进程
- 复用现有 `create_agent_and_generate_response()` 执行任务
## 架构设计
```
┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────┐
│ AI Agent 对话 │ │ 全局异步调度器 │ │ Agent 执行引擎 │
│ (Shell脚本工具) │────▶│ (asyncio loop) │────▶│ create_agent_and_ │
│ 增删改查 tasks │ │ 每60s扫描tasks.yaml │ │ generate_response │
└─────────────────┘ └──────────────────────┘ └────────────────────┘
│ │ │
▼ ▼ ▼
tasks.yaml tasks.yaml task_logs/
(用户任务数据) (更新执行时间) (执行结果日志)
```
## 数据模型
### tasks.yaml
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/tasks.yaml`
```yaml
tasks:
- id: "task_20260330143000_abc123" # 自动生成
name: "每日新闻摘要" # 任务名称
type: "cron" # cron | once
schedule: "0 9 * * *" # cron 表达式type=cron 时)
scheduled_at: null # ISO 8601 UTCtype=once 时)
timezone: "Asia/Tokyo" # 用户时区,用于 cron 解析
message: "请帮我总结今天的科技新闻" # 发送给 agent 的消息
status: "active" # active | paused | done | expired
created_at: "2026-03-30T05:30:00Z"
last_executed_at: null # 上次执行时间UTC
next_run_at: "2026-03-31T00:00:00Z" # 下次执行时间UTC调度器用此判断
execution_count: 0 # 已执行次数
```
**关键设计决策**
- 所有时间统一 UTC 存储cron 表达式结合 timezone 字段在本地时间计算后转 UTC
- `next_run_at` 预计算,调度器只需简单比较时间戳即可判断是否到期
- 一次性任务执行后 status 自动变为 `done`
### execution.log
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/task_logs/execution.log`
```yaml
- task_id: "task_20260330143000_abc123"
task_name: "每日新闻摘要"
executed_at: "2026-03-31T00:00:15Z"
status: "success" # success | error
response: "今天的科技新闻摘要:..."
duration_ms: 12500
```
保留最近 100 条日志,自动清理旧记录。
## 新增文件
### 1. `skills/schedule-job/scripts/schedule_manager.py`
Shell 命令行工具argparse CLIAI 通过 shell 调用来管理用户的定时任务。
**环境变量输入**(通过 plugin hook 机制传入):
- `BOT_ID`: 当前 bot ID
- `USER_IDENTIFIER`: 当前用户标识
**支持的命令**
| 命令 | 说明 | 示例 |
|------|------|------|
| `list` | 列出任务 | `list --format brief` |
| `add` | 添加任务 | `add --name "每日新闻" --type cron --schedule "0 9 * * *" --timezone "Asia/Tokyo" --message "..."` |
| `edit` | 编辑任务 | `edit <task_id> --schedule "0 10 * * 1-5"` |
| `delete` | 删除任务 | `delete <task_id>` |
| `toggle` | 暂停/恢复 | `toggle <task_id>` |
| `logs` | 查看日志 | `logs --task-id <id> --limit 10` |
**核心逻辑**
- 一次性任务 `--scheduled-at` 接受 ISO 8601 格式(带时区偏移),内部转 UTC
- cron 任务通过 `croniter` + timezone 计算 `next_run_at`
- 自动创建 `users/{user_id}/` 目录
### 2. `skills/schedule-job/SKILL.md`
Skill 描述文件,注入 AI prompt告诉 AI
- 如何使用 schedule_manager.py CLI
- cron 表达式语法说明
- 时区映射规则(语言 → 时区)
- 使用示例
### 3. `skills/schedule-job/.claude-plugin/plugin.json`
```json
{
"hooks": {
"PrePrompt": {
"command": "python scripts/schedule_manager.py list --format brief"
}
}
}
```
通过 PrePrompt hookAI 每次对话时自动看到用户当前的任务列表。
### 4. `services/schedule_executor.py`
全局异步调度器,核心类 `ScheduleExecutor`
**运行机制**:参考 `db_pool_manager.py``_cleanup_loop()` 模式
```
启动 → asyncio.create_task(_scan_loop)
每 SCAN_INTERVAL 秒
遍历 projects/robot/*/users/*/tasks.yaml
找到 status=active && next_run_at <= now 的任务
asyncio.create_task(_execute_task) → 受 Semaphore 并发控制
构建 AgentConfig → create_agent_and_generate_response(stream=False)
写入 execution.log → 更新 tasks.yaml (next_run_at / status)
```
**并发与防重复**
- `_executing_tasks: set` 记录正在执行的任务 ID防止同一任务被重复触发
- `asyncio.Semaphore(MAX_CONCURRENT)` 限制最大并发数(默认 5
**任务执行后更新**
- cron 任务:使用 croniter 计算下次 UTC 时间写入 `next_run_at`
- once 任务:将 status 设为 `done`,清空 `next_run_at`
- 失败时也更新 `next_run_at`,避免无限重试
**Agent 调用构建**
```python
bot_config = await fetch_bot_config(bot_id)
project_dir = create_project_directory(dataset_ids, bot_id, skills)
config = AgentConfig(
bot_id=bot_id,
user_identifier=user_id,
session_id=f"schedule_{task_id}", # 专用 session
stream=False,
tool_response=False,
messages=[{"role": "user", "content": task["message"]}],
... # 其余参数从 bot_config 获取
)
result = await create_agent_and_generate_response(config)
```
## 修改文件
### `pyproject.toml`
新增依赖:
```toml
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
```
### `utils/settings.py`
新增环境变量:
```python
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) # 扫描间隔(秒)
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # 最大并发数
```
### `fastapi_app.py`
在 lifespan 中集成调度器生命周期:
- **startup**: `schedule_executor.start()` — 在 checkpoint 清理调度器之后启动
- **shutdown**: `await schedule_executor.stop()` — 在 Mem0 关闭之前停止
## 复用的现有组件
| 组件 | 路径 | 用途 |
|------|------|------|
| `create_agent_and_generate_response()` | `routes/chat.py:246` | 执行 agent 调用 |
| `fetch_bot_config()` | `utils/fastapi_utils.py` | 获取 bot 配置 |
| `create_project_directory()` | `utils/fastapi_utils.py` | 创建项目目录 |
| `AgentConfig` | `agent/agent_config.py` | 构建执行配置 |
| `_cleanup_loop()` 模式 | `agent/db_pool_manager.py:240` | asyncio 后台循环参考 |
| `execute_hooks('PostAgent')` | `agent/plugin_hook_loader.py` | 执行结果通知 |
## 验证方式
```bash
# 1. 启动服务
poetry run uvicorn fastapi_app:app --host 0.0.0.0 --port 8001
# 2. CLI 创建测试任务
BOT_ID=<bot_id> USER_IDENTIFIER=test_user \
poetry run python skills/schedule-job/scripts/schedule_manager.py add \
--name "测试任务" --type once \
--scheduled-at "$(date -u -v+2M '+%Y-%m-%dT%H:%M:%SZ')" \
--message "你好,这是一个定时任务测试"
# 3. 检查 tasks.yaml
cat projects/robot/<bot_id>/users/test_user/tasks.yaml
# 4. 等待调度器执行(最多 60 秒),观察应用日志
# 5. 检查执行结果
cat projects/robot/<bot_id>/users/test_user/task_logs/execution.log
# 6. 通过 API 对话测试 skill
curl -X POST http://localhost:8001/api/v2/chat/completions \
-H 'authorization: Bearer <token>' \
-H 'content-type: application/json' \
-d '{"messages":[{"role":"user","content":"帮我创建一个每天早上9点的定时任务"}],"stream":false,"bot_id":"<bot_id>","user_identifier":"test_user"}'
```
## 后续扩展方向
- **任务失败重试**: 可增加 `max_retries``retry_delay` 字段
- **任务执行超时**: 为单个任务设置超时限制
- **通知渠道**: 集成飞书/邮件等 PostAgent Hook 推送执行结果
- **Web 管理界面**: 提供 REST API 查询/管理定时任务
- **任务模板**: 预设常用任务模板(每日新闻、天气播报等)

18
poetry.lock generated
View File

@ -677,6 +677,22 @@ files = [
]
markers = {main = "platform_system == \"Windows\"", dev = "sys_platform == \"win32\""}
[[package]]
name = "croniter"
version = "3.0.4"
description = "croniter provides iteration for datetime object with cron like format"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6"
groups = ["main"]
files = [
{file = "croniter-3.0.4-py2.py3-none-any.whl", hash = "sha256:96e14cdd5dcb479dd48d7db14b53d8434b188dfb9210448bef6f65663524a6f0"},
{file = "croniter-3.0.4.tar.gz", hash = "sha256:f9dcd4bdb6c97abedb6f09d6ed3495b13ede4d4544503fa580b6372a56a0c520"},
]
[package.dependencies]
python-dateutil = "*"
pytz = ">2021.1"
[[package]]
name = "cryptography"
version = "46.0.5"
@ -6619,4 +6635,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "dbe40b78bc1b7796331da5e14512ae6992f783cadca977bc66b098642d1e8cd9"
content-hash = "3ed06e25ab7936d04d523544c24df6e8678eda0e99388ed1e4de0acbb8e3e63e"

View File

@ -37,6 +37,8 @@ dependencies = [
"json-repair (>=0.29.0,<0.30.0)",
"tiktoken (>=0.5.0,<1.0.0)",
"wsgidav (>=4.3.3,<5.0.0)",
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
]
[tool.poetry.requires-plugins]

View File

@ -19,6 +19,7 @@ chardet==5.2.0 ; python_version >= "3.12" and python_version < "4.0"
charset-normalizer==3.4.4 ; python_version >= "3.12" and python_version < "4.0"
click==8.3.0 ; python_version >= "3.12" and python_version < "4.0"
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows"
croniter==3.0.4 ; python_version >= "3.12" and python_version < "4.0"
cryptography==46.0.5 ; python_version >= "3.12" and python_version < "4.0"
daytona-api-client-async==0.127.0 ; python_version >= "3.12" and python_version < "4.0"
daytona-api-client==0.127.0 ; python_version >= "3.12" and python_version < "4.0"

View File

@ -0,0 +1,306 @@
"""
全局定时任务调度器
扫描所有 projects/robot/{bot_id}/users/{user_id}/tasks.yaml 文件
找到到期的任务并调用 create_agent_and_generate_response 执行
"""
import asyncio
import logging
import time
import yaml
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
logger = logging.getLogger('app')
class ScheduleExecutor:
"""定时任务调度器,以 asyncio 后台任务运行"""
def __init__(self, scan_interval: int = 60, max_concurrent: int = 5):
self._scan_interval = scan_interval
self._max_concurrent = max_concurrent
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._executing_tasks: set = set() # 正在执行的任务 ID防重复
self._semaphore: Optional[asyncio.Semaphore] = None
def start(self):
"""启动调度器"""
if self._task is not None and not self._task.done():
logger.warning("Schedule executor is already running")
return
self._stop_event.clear()
self._semaphore = asyncio.Semaphore(self._max_concurrent)
self._task = asyncio.create_task(self._scan_loop())
logger.info(
f"Schedule executor started: interval={self._scan_interval}s, "
f"max_concurrent={self._max_concurrent}"
)
async def stop(self):
"""停止调度器"""
self._stop_event.set()
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Schedule executor stopped")
async def _scan_loop(self):
"""主扫描循环"""
while not self._stop_event.is_set():
try:
await self._scan_and_execute()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Schedule scan error: {e}")
# 等待下一次扫描或停止信号
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self._scan_interval
)
break # 收到停止信号
except asyncio.TimeoutError:
pass # 超时继续下一轮扫描
async def _scan_and_execute(self):
"""扫描所有 tasks.yaml找到到期任务并触发执行"""
now = datetime.now(timezone.utc)
robot_dir = Path("projects/robot")
if not robot_dir.exists():
return
tasks_files = list(robot_dir.glob("*/users/*/tasks.yaml"))
if not tasks_files:
return
for tasks_file in tasks_files:
try:
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data or not data.get("tasks"):
continue
# 从路径提取 bot_id 和 user_id
parts = tasks_file.parts
# 路径格式: .../projects/robot/{bot_id}/users/{user_id}/tasks.yaml
bot_id = parts[-4]
user_id = parts[-2]
for task in data["tasks"]:
if task.get("status") != "active":
continue
if task["id"] in self._executing_tasks:
continue
next_run_str = task.get("next_run_at")
if not next_run_str:
continue
try:
next_run = datetime.fromisoformat(next_run_str)
if next_run.tzinfo is None:
next_run = next_run.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
logger.warning(f"Invalid next_run_at for task {task['id']}: {next_run_str}")
continue
if next_run <= now:
# 到期,触发执行
asyncio.create_task(
self._execute_task(bot_id, user_id, task, tasks_file)
)
except Exception as e:
logger.error(f"Error reading {tasks_file}: {e}")
async def _execute_task(self, bot_id: str, user_id: str, task: dict, tasks_file: Path):
"""执行单个到期任务"""
task_id = task["id"]
self._executing_tasks.add(task_id)
start_time = time.time()
try:
async with self._semaphore:
logger.info(f"Executing scheduled task: {task_id} ({task.get('name', '')}) for bot={bot_id} user={user_id}")
# 调用 agent
response_text = await self._call_agent(bot_id, user_id, task)
# 写入日志
duration_ms = int((time.time() - start_time) * 1000)
self._write_log(bot_id, user_id, task, response_text, "success", duration_ms)
# 更新 tasks.yaml
self._update_task_after_execution(task_id, tasks_file)
logger.info(f"Task {task_id} completed in {duration_ms}ms")
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"Task {task_id} execution failed: {e}")
self._write_log(bot_id, user_id, task, f"ERROR: {e}", "error", duration_ms)
# 即使失败也更新 next_run_at避免无限重试
self._update_task_after_execution(task_id, tasks_file)
finally:
self._executing_tasks.discard(task_id)
async def _call_agent(self, bot_id: str, user_id: str, task: dict) -> str:
"""构建 AgentConfig 并调用 agent"""
from routes.chat import create_agent_and_generate_response
from utils.fastapi_utils import fetch_bot_config, create_project_directory
from agent.agent_config import AgentConfig
bot_config = await fetch_bot_config(bot_id)
project_dir = create_project_directory(
bot_config.get("dataset_ids", []),
bot_id,
bot_config.get("skills")
)
messages = [{"role": "user", "content": task["message"]}]
config = AgentConfig(
bot_id=bot_id,
api_key=bot_config.get("api_key"),
model_name=bot_config.get("model", "qwen3-next"),
model_server=bot_config.get("model_server", ""),
language=bot_config.get("language", "ja"),
system_prompt=bot_config.get("system_prompt"),
mcp_settings=bot_config.get("mcp_settings", []),
user_identifier=user_id,
session_id=f"schedule_{task['id']}",
project_dir=project_dir,
stream=False,
tool_response=False,
messages=messages,
dataset_ids=bot_config.get("dataset_ids", []),
enable_memori=bot_config.get("enable_memory", False),
shell_env=bot_config.get("shell_env") or {},
)
result = await create_agent_and_generate_response(config)
return result.choices[0]["message"]["content"]
def _update_task_after_execution(self, task_id: str, tasks_file: Path):
"""执行后更新 tasks.yaml"""
try:
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data or not data.get("tasks"):
return
now_utc = datetime.now(timezone.utc).isoformat()
for task in data["tasks"]:
if task["id"] != task_id:
continue
task["last_executed_at"] = now_utc
task["execution_count"] = task.get("execution_count", 0) + 1
if task["type"] == "once":
task["status"] = "done"
task["next_run_at"] = None
elif task["type"] == "cron" and task.get("schedule"):
# 计算下次执行时间
task["next_run_at"] = self._compute_next_run(
task["schedule"],
task.get("timezone", "UTC")
)
break
with open(tasks_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
except Exception as e:
logger.error(f"Failed to update task {task_id}: {e}")
def _compute_next_run(self, schedule: str, tz: str) -> str:
"""计算 cron 任务的下次执行 UTC 时间"""
from croniter import croniter
# 时区偏移映射
tz_offsets = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
'UTC': 0,
'America/New_York': -5,
'America/Los_Angeles': -8,
'Europe/London': 0,
'Europe/Berlin': 1,
}
offset_hours = tz_offsets.get(tz, 0)
offset = timedelta(hours=offset_hours)
now_utc = datetime.now(timezone.utc)
now_local = (now_utc + offset).replace(tzinfo=None)
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def _write_log(self, bot_id: str, user_id: str, task: dict,
response: str, status: str, duration_ms: int):
"""写入执行日志"""
logs_dir = Path("projects/robot") / bot_id / "users" / user_id / "task_logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_file = logs_dir / "execution.log"
log_entry = {
"task_id": task["id"],
"task_name": task.get("name", ""),
"executed_at": datetime.now(timezone.utc).isoformat(),
"status": status,
"response": response[:2000] if response else "", # 截断过长响应
"duration_ms": duration_ms,
}
# 追加写入 YAML 列表
existing_logs = []
if log_file.exists():
try:
with open(log_file, 'r', encoding='utf-8') as f:
existing_logs = yaml.safe_load(f) or []
except Exception:
existing_logs = []
existing_logs.append(log_entry)
# 保留最近 100 条日志
if len(existing_logs) > 100:
existing_logs = existing_logs[-100:]
with open(log_file, 'w', encoding='utf-8') as f:
yaml.dump(existing_logs, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# 全局单例
_executor: Optional[ScheduleExecutor] = None
def get_schedule_executor() -> ScheduleExecutor:
"""获取全局调度器实例"""
global _executor
if _executor is None:
from utils.settings import SCHEDULE_SCAN_INTERVAL, SCHEDULE_MAX_CONCURRENT
_executor = ScheduleExecutor(
scan_interval=SCHEDULE_SCAN_INTERVAL,
max_concurrent=SCHEDULE_MAX_CONCURRENT,
)
return _executor

View File

@ -0,0 +1,7 @@
{
"hooks": {
"PrePrompt": {
"command": "python scripts/schedule_manager.py list --format brief"
}
}
}

View File

@ -0,0 +1,144 @@
---
name: schedule-job
description: 定时任务管理 - 为用户创建、管理和查看定时任务(支持 cron 周期任务和一次性任务)
---
# Schedule Job - 定时任务管理
管理用户的定时任务,支持 cron 周期性任务和一次性定时任务。任务到期后系统自动执行 AI 对话。
## Quick Start
用户请求创建定时任务时:
1. 确认任务类型(周期 cron / 一次性 once
2. 确定时间cron 表达式或具体时间)和时区
3. 确定发送给 AI 的消息内容
4. 调用 schedule_manager.py 创建任务
## Instructions
### 工具路径
所有操作通过 shell 命令执行:
```bash
python {skill_dir}/scripts/schedule_manager.py <command> [options]
```
### 可用命令
#### 列出任务
```bash
python {skill_dir}/scripts/schedule_manager.py list
python {skill_dir}/scripts/schedule_manager.py list --format brief
```
#### 添加 Cron 周期任务
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "任务名称" \
--type cron \
--schedule "0 9 * * *" \
--timezone "Asia/Tokyo" \
--message "请帮我总结今天的科技新闻"
```
#### 添加一次性任务
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "会议提醒" \
--type once \
--scheduled-at "2026-04-01T10:00:00+09:00" \
--message "提醒我10点有会议"
```
#### 编辑任务
```bash
python {skill_dir}/scripts/schedule_manager.py edit <task_id> \
--schedule "0 10 * * 1-5" \
--message "新的消息内容"
```
#### 删除任务
```bash
python {skill_dir}/scripts/schedule_manager.py delete <task_id>
```
#### 暂停/恢复任务
```bash
python {skill_dir}/scripts/schedule_manager.py toggle <task_id>
```
#### 查看执行日志
```bash
python {skill_dir}/scripts/schedule_manager.py logs --limit 10
python {skill_dir}/scripts/schedule_manager.py logs --task-id <task_id>
```
### 时区映射
根据用户语言自动推荐时区:
- 中文 (zh) → Asia/Shanghai (UTC+8)
- 日语 (ja/jp) → Asia/Tokyo (UTC+9)
- 英语 (en) → UTC
### Cron 表达式说明
标准 5 字段格式:`分 时 日 月 星期`
常用示例:
| 表达式 | 含义 |
|--------|------|
| `0 9 * * *` | 每天 9:00 |
| `0 9 * * 1-5` | 周一到周五 9:00 |
| `30 8 * * 1` | 每周一 8:30 |
| `0 */2 * * *` | 每 2 小时 |
| `0 9,18 * * *` | 每天 9:00 和 18:00 |
**注意**: cron 表达式的时间基于 --timezone 指定的时区。
### 一次性任务时间格式
支持 ISO 8601 格式(推荐带时区偏移):
- `2026-04-01T10:00:00+09:00` (日本时间)
- `2026-04-01T01:00:00Z` (UTC)
- `2026-04-01T08:00:00+08:00` (中国时间)
## Examples
**用户**: "帮我设置一个每天早上9点的新闻总结任务"
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "每日新闻总结" \
--type cron \
--schedule "0 9 * * *" \
--timezone "Asia/Tokyo" \
--message "请帮我搜索并总结今天的重要科技新闻,用简洁的方式列出 Top 5"
```
**用户**: "提醒我明天下午3点开会"
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "开会提醒" \
--type once \
--scheduled-at "2026-03-31T15:00:00+09:00" \
--message "提醒:你现在有一个会议要参加"
```
**用户**: "把每日新闻任务改到早上10点"
```bash
# 先查看任务列表获取 task_id
python {skill_dir}/scripts/schedule_manager.py list
# 然后编辑
python {skill_dir}/scripts/schedule_manager.py edit <task_id> --schedule "0 10 * * *"
```
## Guidelines
- 创建任务前先用 `list` 确认用户已有的任务,避免创建重复任务
- 根据用户语言自动设置合适的时区
- message 内容应该是完整的、可独立执行的指令,因为 AI 执行时没有对话上下文
- 一次性任务的时间不能是过去的时间
- 编辑任务时只修改用户要求改的字段,不要改动其他字段

View File

@ -0,0 +1,421 @@
#!/usr/bin/env python3
"""
定时任务管理 CLI 工具
用于增删改查用户的定时任务数据存储在 tasks.yaml 文件中
环境变量:
BOT_ID: 当前 bot ID
USER_IDENTIFIER: 当前用户标识
"""
import argparse
import os
import sys
import yaml
import json
import string
import random
from datetime import datetime, timezone, timedelta
from pathlib import Path
try:
from croniter import croniter
except ImportError:
print("Error: croniter is required. Install with: pip install croniter", file=sys.stderr)
sys.exit(1)
# 语言到时区的映射
LANGUAGE_TIMEZONE_MAP = {
'zh': 'Asia/Shanghai',
'ja': 'Asia/Tokyo',
'jp': 'Asia/Tokyo',
'en': 'UTC',
}
# 时区到 UTC 偏移(小时)
TIMEZONE_OFFSET_MAP = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
'UTC': 0,
'America/New_York': -5,
'America/Los_Angeles': -8,
'Europe/London': 0,
'Europe/Berlin': 1,
}
def get_tasks_dir(bot_id: str, user_id: str) -> Path:
"""获取用户任务目录路径"""
base = Path(os.getenv("PROJECT_ROOT", "."))
return base / "projects" / "robot" / bot_id / "users" / user_id
def get_tasks_file(bot_id: str, user_id: str) -> Path:
"""获取 tasks.yaml 文件路径"""
return get_tasks_dir(bot_id, user_id) / "tasks.yaml"
def get_logs_dir(bot_id: str, user_id: str) -> Path:
"""获取任务日志目录"""
return get_tasks_dir(bot_id, user_id) / "task_logs"
def load_tasks(bot_id: str, user_id: str) -> dict:
"""加载 tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
if not tasks_file.exists():
return {"tasks": []}
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
return data if data and "tasks" in data else {"tasks": []}
def save_tasks(bot_id: str, user_id: str, data: dict):
"""保存 tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
tasks_file.parent.mkdir(parents=True, exist_ok=True)
with open(tasks_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
def generate_task_id() -> str:
"""生成唯一任务 ID"""
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"task_{ts}_{rand}"
def parse_timezone_offset(tz: str) -> int:
"""获取时区的 UTC 偏移小时数"""
return TIMEZONE_OFFSET_MAP.get(tz, 0)
def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str:
"""
根据 cron 表达式和时区计算下次执行的 UTC 时间
cron 表达式是基于用户本地时间的需要先在本地时间计算下次触发再转换为 UTC
"""
offset_hours = parse_timezone_offset(tz)
offset = timedelta(hours=offset_hours)
# 当前 UTC 时间
now_utc = after or datetime.now(timezone.utc)
# 转为用户本地时间naive
now_local = (now_utc + offset).replace(tzinfo=None)
# 在本地时间上计算下次 cron 触发
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
# 转回 UTC
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def parse_scheduled_at(scheduled_at_str: str) -> str:
"""解析一次性任务的时间字符串,返回 UTC ISO 格式"""
# 尝试解析带时区偏移的 ISO 格式
try:
dt = datetime.fromisoformat(scheduled_at_str)
if dt.tzinfo is None:
# 无时区信息,假设 UTC
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except ValueError:
pass
# 尝试解析常见格式
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]:
try:
dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc)
return dt.isoformat()
except ValueError:
continue
raise ValueError(f"无法解析时间格式: {scheduled_at_str}")
def cmd_list(args, bot_id: str, user_id: str):
"""列出所有任务"""
data = load_tasks(bot_id, user_id)
tasks = data.get("tasks", [])
if not tasks:
print("当前没有定时任务。")
return
if args.format == "brief":
# 简洁格式,用于 PrePrompt hook
print(f"定时任务列表 ({len(tasks)} 个):")
for t in tasks:
status_icon = {"active": "", "paused": "⏸️", "done": "✔️", "expired": ""}.get(t["status"], "")
if t["type"] == "cron":
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}")
else:
print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}")
else:
# 详细格式
for t in tasks:
print(f"--- 任务: {t['name']} ---")
print(f" ID: {t['id']}")
print(f" 类型: {t['type']}")
print(f" 状态: {t['status']}")
if t["type"] == "cron":
print(f" Cron: {t['schedule']}")
print(f" 时区: {t.get('timezone', 'UTC')}")
else:
print(f" 计划时间: {t.get('scheduled_at', 'N/A')}")
print(f" 消息: {t['message']}")
print(f" 下次执行: {t.get('next_run_at', 'N/A')}")
print(f" 上次执行: {t.get('last_executed_at', 'N/A')}")
print(f" 执行次数: {t.get('execution_count', 0)}")
print(f" 创建时间: {t.get('created_at', 'N/A')}")
print()
def cmd_add(args, bot_id: str, user_id: str):
"""添加任务"""
data = load_tasks(bot_id, user_id)
task_id = generate_task_id()
now_utc = datetime.now(timezone.utc).isoformat()
task = {
"id": task_id,
"name": args.name,
"type": args.type,
"schedule": None,
"scheduled_at": None,
"timezone": args.timezone or "UTC",
"message": args.message,
"status": "active",
"created_at": now_utc,
"last_executed_at": None,
"next_run_at": None,
"execution_count": 0,
}
if args.type == "cron":
if not args.schedule:
print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr)
sys.exit(1)
# 验证 cron 表达式
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"])
elif args.type == "once":
if not args.scheduled_at:
print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr)
sys.exit(1)
try:
utc_time = parse_scheduled_at(args.scheduled_at)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
task["scheduled_at"] = utc_time
task["next_run_at"] = utc_time
data["tasks"].append(task)
save_tasks(bot_id, user_id, data)
print(f"任务已创建: {task_id}")
print(f" 名称: {args.name}")
print(f" 类型: {args.type}")
print(f" 下次执行 (UTC): {task['next_run_at']}")
def cmd_edit(args, bot_id: str, user_id: str):
"""编辑任务"""
data = load_tasks(bot_id, user_id)
task = None
for t in data["tasks"]:
if t["id"] == args.task_id:
task = t
break
if not task:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
sys.exit(1)
if args.name:
task["name"] = args.name
if args.message:
task["message"] = args.message
if args.schedule:
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
if args.timezone:
task["timezone"] = args.timezone
if args.scheduled_at:
try:
task["scheduled_at"] = parse_scheduled_at(args.scheduled_at)
task["next_run_at"] = task["scheduled_at"]
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# 重新计算 next_run_at如果是 cron 且修改了 schedule 或 timezone
if task["type"] == "cron" and task.get("schedule"):
task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC"))
save_tasks(bot_id, user_id, data)
print(f"任务已更新: {args.task_id}")
def cmd_delete(args, bot_id: str, user_id: str):
"""删除任务"""
data = load_tasks(bot_id, user_id)
original_count = len(data["tasks"])
data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id]
if len(data["tasks"]) == original_count:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
print(f"任务已删除: {args.task_id}")
def cmd_toggle(args, bot_id: str, user_id: str):
"""暂停/恢复任务"""
data = load_tasks(bot_id, user_id)
for t in data["tasks"]:
if t["id"] == args.task_id:
if t["status"] == "active":
t["status"] = "paused"
print(f"任务已暂停: {args.task_id}")
elif t["status"] == "paused":
t["status"] = "active"
# 恢复时重新计算 next_run_at
if t["type"] == "cron" and t.get("schedule"):
t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC"))
print(f"任务已恢复: {args.task_id}")
else:
print(f"任务状态为 {t['status']},无法切换", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
return
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
sys.exit(1)
def cmd_logs(args, bot_id: str, user_id: str):
"""查看执行日志"""
logs_dir = get_logs_dir(bot_id, user_id)
log_file = logs_dir / "execution.log"
if not log_file.exists():
print("暂无执行日志。")
return
with open(log_file, 'r', encoding='utf-8') as f:
logs = yaml.safe_load(f)
if not logs:
print("暂无执行日志。")
return
# 按任务 ID 过滤
if args.task_id:
logs = [l for l in logs if l.get("task_id") == args.task_id]
# 限制数量
limit = args.limit or 10
logs = logs[-limit:]
if not logs:
print("没有匹配的执行日志。")
return
for log in logs:
status_icon = "" if log.get("status") == "success" else ""
print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}")
response = log.get("response", "")
# 截断过长的响应
if len(response) > 200:
response = response[:200] + "..."
print(f" {response}")
if log.get("duration_ms"):
print(f" 耗时: {log['duration_ms']}ms")
print()
def main():
bot_id = os.getenv("BOT_ID", "")
user_id = os.getenv("USER_IDENTIFIER", "")
if not bot_id or not user_id:
print("Error: BOT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr)
sys.exit(1)
parser = argparse.ArgumentParser(description="定时任务管理工具")
subparsers = parser.add_subparsers(dest="command", help="可用命令")
# list
p_list = subparsers.add_parser("list", help="列出所有任务")
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式")
# add
p_add = subparsers.add_parser("add", help="添加任务")
p_add.add_argument("--name", required=True, help="任务名称")
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型")
p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)")
p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)")
p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC")
p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容")
# edit
p_edit = subparsers.add_parser("edit", help="编辑任务")
p_edit.add_argument("task_id", help="任务 ID")
p_edit.add_argument("--name", help="新任务名称")
p_edit.add_argument("--schedule", help="新 Cron 表达式")
p_edit.add_argument("--scheduled-at", help="新执行时间")
p_edit.add_argument("--timezone", help="新时区")
p_edit.add_argument("--message", help="新消息内容")
# delete
p_delete = subparsers.add_parser("delete", help="删除任务")
p_delete.add_argument("task_id", help="任务 ID")
# toggle
p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务")
p_toggle.add_argument("task_id", help="任务 ID")
# logs
p_logs = subparsers.add_parser("logs", help="查看执行日志")
p_logs.add_argument("--task-id", help="按任务 ID 过滤")
p_logs.add_argument("--limit", type=int, default=10, help="显示条数")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
commands = {
"list": cmd_list,
"add": cmd_add,
"edit": cmd_edit,
"delete": cmd_delete,
"toggle": cmd_toggle,
"logs": cmd_logs,
}
commands[args.command](args, bot_id, user_id)
if __name__ == "__main__":
main()

View File

@ -84,4 +84,18 @@ MEM0_ENABLED = os.getenv("MEM0_ENABLED", "true") == "true"
# 召回记忆数量
MEM0_SEMANTIC_SEARCH_TOP_K = int(os.getenv("MEM0_SEMANTIC_SEARCH_TOP_K", "20"))
# ============================================================
# Schedule Job 定时任务配置
# ============================================================
# 是否启用定时任务调度器
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
# 调度器扫描间隔(秒)
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60"))
# 最大并发执行任务数
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5"))
os.environ["OPENAI_API_KEY"] = "your_api_key"