From 3b9c7165a93a701f41219ceacd2c33a335a432a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Mon, 30 Mar 2026 23:17:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E7=B3=BB=E7=BB=9F=EF=BC=88?= =?UTF-8?q?schedule-job=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 schedule-job skill,支持 cron 周期任务和一次性定时任务 - 新增 schedule_manager.py CLI 工具(list/add/edit/delete/toggle/logs) - 新增 ScheduleExecutor 全局异步调度器,每 60s 扫描到期任务并调用 agent 执行 - 任务数据存储在 projects/robot/{bot_id}/users/{user_id}/tasks.yaml - 执行结果写入 task_logs/execution.log - 集成到 FastAPI lifespan 生命周期管理 - 添加 croniter、pyyaml 依赖 Co-Authored-By: Claude Opus 4.6 (1M context) --- fastapi_app.py | 19 + plans/schedule-job.md | 249 +++++++++++ poetry.lock | 18 +- pyproject.toml | 2 + requirements.txt | 1 + services/schedule_executor.py | 306 +++++++++++++ .../schedule-job/.claude-plugin/plugin.json | 7 + skills/schedule-job/SKILL.md | 144 ++++++ .../schedule-job/scripts/schedule_manager.py | 421 ++++++++++++++++++ utils/settings.py | 14 + 10 files changed, 1180 insertions(+), 1 deletion(-) create mode 100644 plans/schedule-job.md create mode 100644 services/schedule_executor.py create mode 100644 skills/schedule-job/.claude-plugin/plugin.json create mode 100644 skills/schedule-job/SKILL.md create mode 100644 skills/schedule-job/scripts/schedule_manager.py diff --git a/fastapi_app.py b/fastapi_app.py index 20977f6..1f05332 100644 --- a/fastapi_app.py +++ b/fastapi_app.py @@ -108,6 +108,7 @@ async def lifespan(app: FastAPI): close_global_mem0 ) from utils.settings import CHECKPOINT_CLEANUP_ENABLED, MEM0_ENABLED + from utils.settings import SCHEDULE_ENABLED # 1. 初始化共享的数据库连接池 db_pool_manager = await init_global_db_pool() @@ -141,10 +142,28 @@ async def lifespan(app: FastAPI): db_pool_manager.start_cleanup_scheduler() logger.info("Checkpoint cleanup scheduler started") + # 6. 启动定时任务调度器 + schedule_executor = None + if SCHEDULE_ENABLED: + try: + from services.schedule_executor import get_schedule_executor + schedule_executor = get_schedule_executor() + schedule_executor.start() + logger.info("Schedule executor started") + except Exception as e: + logger.warning(f"Schedule executor start failed (non-fatal): {e}") + yield # 关闭时清理(按相反顺序) logger.info("Shutting down...") + # 关闭定时任务调度器 + if schedule_executor: + try: + await schedule_executor.stop() + logger.info("Schedule executor stopped") + except Exception as e: + logger.warning(f"Schedule executor stop failed (non-fatal): {e}") # 关闭 Mem0 if MEM0_ENABLED: try: diff --git a/plans/schedule-job.md b/plans/schedule-job.md new file mode 100644 index 0000000..b1b27ae --- /dev/null +++ b/plans/schedule-job.md @@ -0,0 +1,249 @@ +# feat: Schedule Job 定时任务系统 + +## 概述 + +为每个用户的每个 bot 提供定时任务能力,支持 cron 周期任务和一次性定时任务。到期后系统自动调用 Agent 执行任务,结果写入日志并通过 PostAgent Hook 推送通知。 + +## 背景 + +### 当前状态 + +系统已有的后台任务机制: +- **Huey 任务队列** (`task_queue/`): SQLite 后端,用于文件处理 +- **AsyncIO 后台任务**: `asyncio.create_task()` 用于非阻塞操作 +- **Checkpoint 清理调度器** (`agent/db_pool_manager.py`): asyncio loop 方式 + +**缺失能力**: +- 无法为用户设置周期性或定时触发的 Agent 任务 +- 无 cron 式调度器 +- 无用户级任务管理接口 + +### 设计目标 + +- 用户通过 AI 对话即可创建/管理定时任务 +- 任务数据以 YAML 文件存储在用户专属目录,便于查看和备份 +- 全局调度器以 asyncio 后台任务运行,无需额外进程 +- 复用现有 `create_agent_and_generate_response()` 执行任务 + +## 架构设计 + +``` +┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────┐ +│ AI Agent 对话 │ │ 全局异步调度器 │ │ Agent 执行引擎 │ +│ (Shell脚本工具) │────▶│ (asyncio loop) │────▶│ create_agent_and_ │ +│ 增删改查 tasks │ │ 每60s扫描tasks.yaml │ │ generate_response │ +└─────────────────┘ └──────────────────────┘ └────────────────────┘ + │ │ │ + ▼ ▼ ▼ + tasks.yaml tasks.yaml task_logs/ + (用户任务数据) (更新执行时间) (执行结果日志) +``` + +## 数据模型 + +### tasks.yaml + +**存储路径**: `projects/robot/{bot_id}/users/{user_id}/tasks.yaml` + +```yaml +tasks: + - id: "task_20260330143000_abc123" # 自动生成 + name: "每日新闻摘要" # 任务名称 + type: "cron" # cron | once + schedule: "0 9 * * *" # cron 表达式(type=cron 时) + scheduled_at: null # ISO 8601 UTC(type=once 时) + timezone: "Asia/Tokyo" # 用户时区,用于 cron 解析 + message: "请帮我总结今天的科技新闻" # 发送给 agent 的消息 + status: "active" # active | paused | done | expired + created_at: "2026-03-30T05:30:00Z" + last_executed_at: null # 上次执行时间(UTC) + next_run_at: "2026-03-31T00:00:00Z" # 下次执行时间(UTC,调度器用此判断) + execution_count: 0 # 已执行次数 +``` + +**关键设计决策**: +- 所有时间统一 UTC 存储,cron 表达式结合 timezone 字段在本地时间计算后转 UTC +- `next_run_at` 预计算,调度器只需简单比较时间戳即可判断是否到期 +- 一次性任务执行后 status 自动变为 `done` + +### execution.log + +**存储路径**: `projects/robot/{bot_id}/users/{user_id}/task_logs/execution.log` + +```yaml +- task_id: "task_20260330143000_abc123" + task_name: "每日新闻摘要" + executed_at: "2026-03-31T00:00:15Z" + status: "success" # success | error + response: "今天的科技新闻摘要:..." + duration_ms: 12500 +``` + +保留最近 100 条日志,自动清理旧记录。 + +## 新增文件 + +### 1. `skills/schedule-job/scripts/schedule_manager.py` + +Shell 命令行工具(argparse CLI),AI 通过 shell 调用来管理用户的定时任务。 + +**环境变量输入**(通过 plugin hook 机制传入): +- `BOT_ID`: 当前 bot ID +- `USER_IDENTIFIER`: 当前用户标识 + +**支持的命令**: + +| 命令 | 说明 | 示例 | +|------|------|------| +| `list` | 列出任务 | `list --format brief` | +| `add` | 添加任务 | `add --name "每日新闻" --type cron --schedule "0 9 * * *" --timezone "Asia/Tokyo" --message "..."` | +| `edit` | 编辑任务 | `edit --schedule "0 10 * * 1-5"` | +| `delete` | 删除任务 | `delete ` | +| `toggle` | 暂停/恢复 | `toggle ` | +| `logs` | 查看日志 | `logs --task-id --limit 10` | + +**核心逻辑**: +- 一次性任务 `--scheduled-at` 接受 ISO 8601 格式(带时区偏移),内部转 UTC +- cron 任务通过 `croniter` + timezone 计算 `next_run_at` +- 自动创建 `users/{user_id}/` 目录 + +### 2. `skills/schedule-job/SKILL.md` + +Skill 描述文件,注入 AI prompt,告诉 AI: +- 如何使用 schedule_manager.py CLI +- cron 表达式语法说明 +- 时区映射规则(语言 → 时区) +- 使用示例 + +### 3. `skills/schedule-job/.claude-plugin/plugin.json` + +```json +{ + "hooks": { + "PrePrompt": { + "command": "python scripts/schedule_manager.py list --format brief" + } + } +} +``` + +通过 PrePrompt hook,AI 每次对话时自动看到用户当前的任务列表。 + +### 4. `services/schedule_executor.py` + +全局异步调度器,核心类 `ScheduleExecutor`。 + +**运行机制**:参考 `db_pool_manager.py` 的 `_cleanup_loop()` 模式 + +``` +启动 → asyncio.create_task(_scan_loop) + ↓ + 每 SCAN_INTERVAL 秒 + ↓ + 遍历 projects/robot/*/users/*/tasks.yaml + ↓ + 找到 status=active && next_run_at <= now 的任务 + ↓ + asyncio.create_task(_execute_task) → 受 Semaphore 并发控制 + ↓ + 构建 AgentConfig → create_agent_and_generate_response(stream=False) + ↓ + 写入 execution.log → 更新 tasks.yaml (next_run_at / status) +``` + +**并发与防重复**: +- `_executing_tasks: set` 记录正在执行的任务 ID,防止同一任务被重复触发 +- `asyncio.Semaphore(MAX_CONCURRENT)` 限制最大并发数(默认 5) + +**任务执行后更新**: +- cron 任务:使用 croniter 计算下次 UTC 时间写入 `next_run_at` +- once 任务:将 status 设为 `done`,清空 `next_run_at` +- 失败时也更新 `next_run_at`,避免无限重试 + +**Agent 调用构建**: +```python +bot_config = await fetch_bot_config(bot_id) +project_dir = create_project_directory(dataset_ids, bot_id, skills) +config = AgentConfig( + bot_id=bot_id, + user_identifier=user_id, + session_id=f"schedule_{task_id}", # 专用 session + stream=False, + tool_response=False, + messages=[{"role": "user", "content": task["message"]}], + ... # 其余参数从 bot_config 获取 +) +result = await create_agent_and_generate_response(config) +``` + +## 修改文件 + +### `pyproject.toml` + +新增依赖: +```toml +"croniter (>=2.0.0,<4.0.0)", +"pyyaml (>=6.0,<7.0)", +``` + +### `utils/settings.py` + +新增环境变量: +```python +SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true" +SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) # 扫描间隔(秒) +SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # 最大并发数 +``` + +### `fastapi_app.py` + +在 lifespan 中集成调度器生命周期: +- **startup**: `schedule_executor.start()` — 在 checkpoint 清理调度器之后启动 +- **shutdown**: `await schedule_executor.stop()` — 在 Mem0 关闭之前停止 + +## 复用的现有组件 + +| 组件 | 路径 | 用途 | +|------|------|------| +| `create_agent_and_generate_response()` | `routes/chat.py:246` | 执行 agent 调用 | +| `fetch_bot_config()` | `utils/fastapi_utils.py` | 获取 bot 配置 | +| `create_project_directory()` | `utils/fastapi_utils.py` | 创建项目目录 | +| `AgentConfig` | `agent/agent_config.py` | 构建执行配置 | +| `_cleanup_loop()` 模式 | `agent/db_pool_manager.py:240` | asyncio 后台循环参考 | +| `execute_hooks('PostAgent')` | `agent/plugin_hook_loader.py` | 执行结果通知 | + +## 验证方式 + +```bash +# 1. 启动服务 +poetry run uvicorn fastapi_app:app --host 0.0.0.0 --port 8001 + +# 2. CLI 创建测试任务 +BOT_ID= USER_IDENTIFIER=test_user \ + poetry run python skills/schedule-job/scripts/schedule_manager.py add \ + --name "测试任务" --type once \ + --scheduled-at "$(date -u -v+2M '+%Y-%m-%dT%H:%M:%SZ')" \ + --message "你好,这是一个定时任务测试" + +# 3. 检查 tasks.yaml +cat projects/robot//users/test_user/tasks.yaml + +# 4. 等待调度器执行(最多 60 秒),观察应用日志 + +# 5. 检查执行结果 +cat projects/robot//users/test_user/task_logs/execution.log + +# 6. 通过 API 对话测试 skill +curl -X POST http://localhost:8001/api/v2/chat/completions \ + -H 'authorization: Bearer ' \ + -H 'content-type: application/json' \ + -d '{"messages":[{"role":"user","content":"帮我创建一个每天早上9点的定时任务"}],"stream":false,"bot_id":"","user_identifier":"test_user"}' +``` + +## 后续扩展方向 + +- **任务失败重试**: 可增加 `max_retries` 和 `retry_delay` 字段 +- **任务执行超时**: 为单个任务设置超时限制 +- **通知渠道**: 集成飞书/邮件等 PostAgent Hook 推送执行结果 +- **Web 管理界面**: 提供 REST API 查询/管理定时任务 +- **任务模板**: 预设常用任务模板(每日新闻、天气播报等) diff --git a/poetry.lock b/poetry.lock index 91a8dd0..64027d2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -677,6 +677,22 @@ files = [ ] markers = {main = "platform_system == \"Windows\"", dev = "sys_platform == \"win32\""} +[[package]] +name = "croniter" +version = "3.0.4" +description = "croniter provides iteration for datetime object with cron like format" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6" +groups = ["main"] +files = [ + {file = "croniter-3.0.4-py2.py3-none-any.whl", hash = "sha256:96e14cdd5dcb479dd48d7db14b53d8434b188dfb9210448bef6f65663524a6f0"}, + {file = "croniter-3.0.4.tar.gz", hash = "sha256:f9dcd4bdb6c97abedb6f09d6ed3495b13ede4d4544503fa580b6372a56a0c520"}, +] + +[package.dependencies] +python-dateutil = "*" +pytz = ">2021.1" + [[package]] name = "cryptography" version = "46.0.5" @@ -6619,4 +6635,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "dbe40b78bc1b7796331da5e14512ae6992f783cadca977bc66b098642d1e8cd9" +content-hash = "3ed06e25ab7936d04d523544c24df6e8678eda0e99388ed1e4de0acbb8e3e63e" diff --git a/pyproject.toml b/pyproject.toml index a14f34b..f9143c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,8 @@ dependencies = [ "json-repair (>=0.29.0,<0.30.0)", "tiktoken (>=0.5.0,<1.0.0)", "wsgidav (>=4.3.3,<5.0.0)", + "croniter (>=2.0.0,<4.0.0)", + "pyyaml (>=6.0,<7.0)", ] [tool.poetry.requires-plugins] diff --git a/requirements.txt b/requirements.txt index a7a07e5..0479a06 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,7 @@ chardet==5.2.0 ; python_version >= "3.12" and python_version < "4.0" charset-normalizer==3.4.4 ; python_version >= "3.12" and python_version < "4.0" click==8.3.0 ; python_version >= "3.12" and python_version < "4.0" colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows" +croniter==3.0.4 ; python_version >= "3.12" and python_version < "4.0" cryptography==46.0.5 ; python_version >= "3.12" and python_version < "4.0" daytona-api-client-async==0.127.0 ; python_version >= "3.12" and python_version < "4.0" daytona-api-client==0.127.0 ; python_version >= "3.12" and python_version < "4.0" diff --git a/services/schedule_executor.py b/services/schedule_executor.py new file mode 100644 index 0000000..f926be8 --- /dev/null +++ b/services/schedule_executor.py @@ -0,0 +1,306 @@ +""" +全局定时任务调度器 + +扫描所有 projects/robot/{bot_id}/users/{user_id}/tasks.yaml 文件, +找到到期的任务并调用 create_agent_and_generate_response 执行。 +""" + +import asyncio +import logging +import time +import yaml +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Optional + +logger = logging.getLogger('app') + + +class ScheduleExecutor: + """定时任务调度器,以 asyncio 后台任务运行""" + + def __init__(self, scan_interval: int = 60, max_concurrent: int = 5): + self._scan_interval = scan_interval + self._max_concurrent = max_concurrent + self._task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() + self._executing_tasks: set = set() # 正在执行的任务 ID,防重复 + self._semaphore: Optional[asyncio.Semaphore] = None + + def start(self): + """启动调度器""" + if self._task is not None and not self._task.done(): + logger.warning("Schedule executor is already running") + return + + self._stop_event.clear() + self._semaphore = asyncio.Semaphore(self._max_concurrent) + self._task = asyncio.create_task(self._scan_loop()) + logger.info( + f"Schedule executor started: interval={self._scan_interval}s, " + f"max_concurrent={self._max_concurrent}" + ) + + async def stop(self): + """停止调度器""" + self._stop_event.set() + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("Schedule executor stopped") + + async def _scan_loop(self): + """主扫描循环""" + while not self._stop_event.is_set(): + try: + await self._scan_and_execute() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Schedule scan error: {e}") + + # 等待下一次扫描或停止信号 + try: + await asyncio.wait_for( + self._stop_event.wait(), + timeout=self._scan_interval + ) + break # 收到停止信号 + except asyncio.TimeoutError: + pass # 超时继续下一轮扫描 + + async def _scan_and_execute(self): + """扫描所有 tasks.yaml,找到到期任务并触发执行""" + now = datetime.now(timezone.utc) + robot_dir = Path("projects/robot") + + if not robot_dir.exists(): + return + + tasks_files = list(robot_dir.glob("*/users/*/tasks.yaml")) + if not tasks_files: + return + + for tasks_file in tasks_files: + try: + with open(tasks_file, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + + if not data or not data.get("tasks"): + continue + + # 从路径提取 bot_id 和 user_id + parts = tasks_file.parts + # 路径格式: .../projects/robot/{bot_id}/users/{user_id}/tasks.yaml + bot_id = parts[-4] + user_id = parts[-2] + + for task in data["tasks"]: + if task.get("status") != "active": + continue + if task["id"] in self._executing_tasks: + continue + + next_run_str = task.get("next_run_at") + if not next_run_str: + continue + + try: + next_run = datetime.fromisoformat(next_run_str) + if next_run.tzinfo is None: + next_run = next_run.replace(tzinfo=timezone.utc) + except (ValueError, TypeError): + logger.warning(f"Invalid next_run_at for task {task['id']}: {next_run_str}") + continue + + if next_run <= now: + # 到期,触发执行 + asyncio.create_task( + self._execute_task(bot_id, user_id, task, tasks_file) + ) + + except Exception as e: + logger.error(f"Error reading {tasks_file}: {e}") + + async def _execute_task(self, bot_id: str, user_id: str, task: dict, tasks_file: Path): + """执行单个到期任务""" + task_id = task["id"] + self._executing_tasks.add(task_id) + start_time = time.time() + + try: + async with self._semaphore: + logger.info(f"Executing scheduled task: {task_id} ({task.get('name', '')}) for bot={bot_id} user={user_id}") + + # 调用 agent + response_text = await self._call_agent(bot_id, user_id, task) + + # 写入日志 + duration_ms = int((time.time() - start_time) * 1000) + self._write_log(bot_id, user_id, task, response_text, "success", duration_ms) + + # 更新 tasks.yaml + self._update_task_after_execution(task_id, tasks_file) + + logger.info(f"Task {task_id} completed in {duration_ms}ms") + + except Exception as e: + duration_ms = int((time.time() - start_time) * 1000) + logger.error(f"Task {task_id} execution failed: {e}") + self._write_log(bot_id, user_id, task, f"ERROR: {e}", "error", duration_ms) + # 即使失败也更新 next_run_at,避免无限重试 + self._update_task_after_execution(task_id, tasks_file) + finally: + self._executing_tasks.discard(task_id) + + async def _call_agent(self, bot_id: str, user_id: str, task: dict) -> str: + """构建 AgentConfig 并调用 agent""" + from routes.chat import create_agent_and_generate_response + from utils.fastapi_utils import fetch_bot_config, create_project_directory + from agent.agent_config import AgentConfig + + bot_config = await fetch_bot_config(bot_id) + project_dir = create_project_directory( + bot_config.get("dataset_ids", []), + bot_id, + bot_config.get("skills") + ) + + messages = [{"role": "user", "content": task["message"]}] + + config = AgentConfig( + bot_id=bot_id, + api_key=bot_config.get("api_key"), + model_name=bot_config.get("model", "qwen3-next"), + model_server=bot_config.get("model_server", ""), + language=bot_config.get("language", "ja"), + system_prompt=bot_config.get("system_prompt"), + mcp_settings=bot_config.get("mcp_settings", []), + user_identifier=user_id, + session_id=f"schedule_{task['id']}", + project_dir=project_dir, + stream=False, + tool_response=False, + messages=messages, + dataset_ids=bot_config.get("dataset_ids", []), + enable_memori=bot_config.get("enable_memory", False), + shell_env=bot_config.get("shell_env") or {}, + ) + + result = await create_agent_and_generate_response(config) + return result.choices[0]["message"]["content"] + + def _update_task_after_execution(self, task_id: str, tasks_file: Path): + """执行后更新 tasks.yaml""" + try: + with open(tasks_file, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + + if not data or not data.get("tasks"): + return + + now_utc = datetime.now(timezone.utc).isoformat() + + for task in data["tasks"]: + if task["id"] != task_id: + continue + + task["last_executed_at"] = now_utc + task["execution_count"] = task.get("execution_count", 0) + 1 + + if task["type"] == "once": + task["status"] = "done" + task["next_run_at"] = None + elif task["type"] == "cron" and task.get("schedule"): + # 计算下次执行时间 + task["next_run_at"] = self._compute_next_run( + task["schedule"], + task.get("timezone", "UTC") + ) + break + + with open(tasks_file, 'w', encoding='utf-8') as f: + yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) + + except Exception as e: + logger.error(f"Failed to update task {task_id}: {e}") + + def _compute_next_run(self, schedule: str, tz: str) -> str: + """计算 cron 任务的下次执行 UTC 时间""" + from croniter import croniter + + # 时区偏移映射 + tz_offsets = { + 'Asia/Shanghai': 8, + 'Asia/Tokyo': 9, + 'UTC': 0, + 'America/New_York': -5, + 'America/Los_Angeles': -8, + 'Europe/London': 0, + 'Europe/Berlin': 1, + } + + offset_hours = tz_offsets.get(tz, 0) + offset = timedelta(hours=offset_hours) + + now_utc = datetime.now(timezone.utc) + now_local = (now_utc + offset).replace(tzinfo=None) + + cron = croniter(schedule, now_local) + next_local = cron.get_next(datetime) + + next_utc = next_local - offset + return next_utc.replace(tzinfo=timezone.utc).isoformat() + + def _write_log(self, bot_id: str, user_id: str, task: dict, + response: str, status: str, duration_ms: int): + """写入执行日志""" + logs_dir = Path("projects/robot") / bot_id / "users" / user_id / "task_logs" + logs_dir.mkdir(parents=True, exist_ok=True) + log_file = logs_dir / "execution.log" + + log_entry = { + "task_id": task["id"], + "task_name": task.get("name", ""), + "executed_at": datetime.now(timezone.utc).isoformat(), + "status": status, + "response": response[:2000] if response else "", # 截断过长响应 + "duration_ms": duration_ms, + } + + # 追加写入 YAML 列表 + existing_logs = [] + if log_file.exists(): + try: + with open(log_file, 'r', encoding='utf-8') as f: + existing_logs = yaml.safe_load(f) or [] + except Exception: + existing_logs = [] + + existing_logs.append(log_entry) + + # 保留最近 100 条日志 + if len(existing_logs) > 100: + existing_logs = existing_logs[-100:] + + with open(log_file, 'w', encoding='utf-8') as f: + yaml.dump(existing_logs, f, allow_unicode=True, default_flow_style=False, sort_keys=False) + + +# 全局单例 +_executor: Optional[ScheduleExecutor] = None + + +def get_schedule_executor() -> ScheduleExecutor: + """获取全局调度器实例""" + global _executor + if _executor is None: + from utils.settings import SCHEDULE_SCAN_INTERVAL, SCHEDULE_MAX_CONCURRENT + _executor = ScheduleExecutor( + scan_interval=SCHEDULE_SCAN_INTERVAL, + max_concurrent=SCHEDULE_MAX_CONCURRENT, + ) + return _executor diff --git a/skills/schedule-job/.claude-plugin/plugin.json b/skills/schedule-job/.claude-plugin/plugin.json new file mode 100644 index 0000000..5d8bf66 --- /dev/null +++ b/skills/schedule-job/.claude-plugin/plugin.json @@ -0,0 +1,7 @@ +{ + "hooks": { + "PrePrompt": { + "command": "python scripts/schedule_manager.py list --format brief" + } + } +} diff --git a/skills/schedule-job/SKILL.md b/skills/schedule-job/SKILL.md new file mode 100644 index 0000000..b085ded --- /dev/null +++ b/skills/schedule-job/SKILL.md @@ -0,0 +1,144 @@ +--- +name: schedule-job +description: 定时任务管理 - 为用户创建、管理和查看定时任务(支持 cron 周期任务和一次性任务) +--- + +# Schedule Job - 定时任务管理 + +管理用户的定时任务,支持 cron 周期性任务和一次性定时任务。任务到期后系统自动执行 AI 对话。 + +## Quick Start + +用户请求创建定时任务时: +1. 确认任务类型(周期 cron / 一次性 once) +2. 确定时间(cron 表达式或具体时间)和时区 +3. 确定发送给 AI 的消息内容 +4. 调用 schedule_manager.py 创建任务 + +## Instructions + +### 工具路径 + +所有操作通过 shell 命令执行: +```bash +python {skill_dir}/scripts/schedule_manager.py [options] +``` + +### 可用命令 + +#### 列出任务 +```bash +python {skill_dir}/scripts/schedule_manager.py list +python {skill_dir}/scripts/schedule_manager.py list --format brief +``` + +#### 添加 Cron 周期任务 +```bash +python {skill_dir}/scripts/schedule_manager.py add \ + --name "任务名称" \ + --type cron \ + --schedule "0 9 * * *" \ + --timezone "Asia/Tokyo" \ + --message "请帮我总结今天的科技新闻" +``` + +#### 添加一次性任务 +```bash +python {skill_dir}/scripts/schedule_manager.py add \ + --name "会议提醒" \ + --type once \ + --scheduled-at "2026-04-01T10:00:00+09:00" \ + --message "提醒我10点有会议" +``` + +#### 编辑任务 +```bash +python {skill_dir}/scripts/schedule_manager.py edit \ + --schedule "0 10 * * 1-5" \ + --message "新的消息内容" +``` + +#### 删除任务 +```bash +python {skill_dir}/scripts/schedule_manager.py delete +``` + +#### 暂停/恢复任务 +```bash +python {skill_dir}/scripts/schedule_manager.py toggle +``` + +#### 查看执行日志 +```bash +python {skill_dir}/scripts/schedule_manager.py logs --limit 10 +python {skill_dir}/scripts/schedule_manager.py logs --task-id +``` + +### 时区映射 + +根据用户语言自动推荐时区: +- 中文 (zh) → Asia/Shanghai (UTC+8) +- 日语 (ja/jp) → Asia/Tokyo (UTC+9) +- 英语 (en) → UTC + +### Cron 表达式说明 + +标准 5 字段格式:`分 时 日 月 星期` + +常用示例: +| 表达式 | 含义 | +|--------|------| +| `0 9 * * *` | 每天 9:00 | +| `0 9 * * 1-5` | 周一到周五 9:00 | +| `30 8 * * 1` | 每周一 8:30 | +| `0 */2 * * *` | 每 2 小时 | +| `0 9,18 * * *` | 每天 9:00 和 18:00 | + +**注意**: cron 表达式的时间基于 --timezone 指定的时区。 + +### 一次性任务时间格式 + +支持 ISO 8601 格式(推荐带时区偏移): +- `2026-04-01T10:00:00+09:00` (日本时间) +- `2026-04-01T01:00:00Z` (UTC) +- `2026-04-01T08:00:00+08:00` (中国时间) + +## Examples + +**用户**: "帮我设置一个每天早上9点的新闻总结任务" + +```bash +python {skill_dir}/scripts/schedule_manager.py add \ + --name "每日新闻总结" \ + --type cron \ + --schedule "0 9 * * *" \ + --timezone "Asia/Tokyo" \ + --message "请帮我搜索并总结今天的重要科技新闻,用简洁的方式列出 Top 5" +``` + +**用户**: "提醒我明天下午3点开会" + +```bash +python {skill_dir}/scripts/schedule_manager.py add \ + --name "开会提醒" \ + --type once \ + --scheduled-at "2026-03-31T15:00:00+09:00" \ + --message "提醒:你现在有一个会议要参加" +``` + +**用户**: "把每日新闻任务改到早上10点" + +```bash +# 先查看任务列表获取 task_id +python {skill_dir}/scripts/schedule_manager.py list +# 然后编辑 +python {skill_dir}/scripts/schedule_manager.py edit --schedule "0 10 * * *" +``` + +## Guidelines + +- 创建任务前先用 `list` 确认用户已有的任务,避免创建重复任务 +- 根据用户语言自动设置合适的时区 +- message 内容应该是完整的、可独立执行的指令,因为 AI 执行时没有对话上下文 +- 一次性任务的时间不能是过去的时间 +- 编辑任务时只修改用户要求改的字段,不要改动其他字段 diff --git a/skills/schedule-job/scripts/schedule_manager.py b/skills/schedule-job/scripts/schedule_manager.py new file mode 100644 index 0000000..c4536e9 --- /dev/null +++ b/skills/schedule-job/scripts/schedule_manager.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 +""" +定时任务管理 CLI 工具 +用于增删改查用户的定时任务,数据存储在 tasks.yaml 文件中。 + +环境变量: + BOT_ID: 当前 bot ID + USER_IDENTIFIER: 当前用户标识 +""" + +import argparse +import os +import sys +import yaml +import json +import string +import random +from datetime import datetime, timezone, timedelta +from pathlib import Path + +try: + from croniter import croniter +except ImportError: + print("Error: croniter is required. Install with: pip install croniter", file=sys.stderr) + sys.exit(1) + + +# 语言到时区的映射 +LANGUAGE_TIMEZONE_MAP = { + 'zh': 'Asia/Shanghai', + 'ja': 'Asia/Tokyo', + 'jp': 'Asia/Tokyo', + 'en': 'UTC', +} + +# 时区到 UTC 偏移(小时) +TIMEZONE_OFFSET_MAP = { + 'Asia/Shanghai': 8, + 'Asia/Tokyo': 9, + 'UTC': 0, + 'America/New_York': -5, + 'America/Los_Angeles': -8, + 'Europe/London': 0, + 'Europe/Berlin': 1, +} + + +def get_tasks_dir(bot_id: str, user_id: str) -> Path: + """获取用户任务目录路径""" + base = Path(os.getenv("PROJECT_ROOT", ".")) + return base / "projects" / "robot" / bot_id / "users" / user_id + + +def get_tasks_file(bot_id: str, user_id: str) -> Path: + """获取 tasks.yaml 文件路径""" + return get_tasks_dir(bot_id, user_id) / "tasks.yaml" + + +def get_logs_dir(bot_id: str, user_id: str) -> Path: + """获取任务日志目录""" + return get_tasks_dir(bot_id, user_id) / "task_logs" + + +def load_tasks(bot_id: str, user_id: str) -> dict: + """加载 tasks.yaml""" + tasks_file = get_tasks_file(bot_id, user_id) + if not tasks_file.exists(): + return {"tasks": []} + with open(tasks_file, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + return data if data and "tasks" in data else {"tasks": []} + + +def save_tasks(bot_id: str, user_id: str, data: dict): + """保存 tasks.yaml""" + tasks_file = get_tasks_file(bot_id, user_id) + tasks_file.parent.mkdir(parents=True, exist_ok=True) + with open(tasks_file, 'w', encoding='utf-8') as f: + yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) + + +def generate_task_id() -> str: + """生成唯一任务 ID""" + ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) + return f"task_{ts}_{rand}" + + +def parse_timezone_offset(tz: str) -> int: + """获取时区的 UTC 偏移小时数""" + return TIMEZONE_OFFSET_MAP.get(tz, 0) + + +def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str: + """ + 根据 cron 表达式和时区计算下次执行的 UTC 时间。 + + cron 表达式是基于用户本地时间的,需要先在本地时间计算下次触发,再转换为 UTC。 + """ + offset_hours = parse_timezone_offset(tz) + offset = timedelta(hours=offset_hours) + + # 当前 UTC 时间 + now_utc = after or datetime.now(timezone.utc) + # 转为用户本地时间(naive) + now_local = (now_utc + offset).replace(tzinfo=None) + + # 在本地时间上计算下次 cron 触发 + cron = croniter(schedule, now_local) + next_local = cron.get_next(datetime) + + # 转回 UTC + next_utc = next_local - offset + return next_utc.replace(tzinfo=timezone.utc).isoformat() + + +def parse_scheduled_at(scheduled_at_str: str) -> str: + """解析一次性任务的时间字符串,返回 UTC ISO 格式""" + # 尝试解析带时区偏移的 ISO 格式 + try: + dt = datetime.fromisoformat(scheduled_at_str) + if dt.tzinfo is None: + # 无时区信息,假设 UTC + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat() + except ValueError: + pass + + # 尝试解析常见格式 + for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]: + try: + dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc) + return dt.isoformat() + except ValueError: + continue + + raise ValueError(f"无法解析时间格式: {scheduled_at_str}") + + +def cmd_list(args, bot_id: str, user_id: str): + """列出所有任务""" + data = load_tasks(bot_id, user_id) + tasks = data.get("tasks", []) + + if not tasks: + print("当前没有定时任务。") + return + + if args.format == "brief": + # 简洁格式,用于 PrePrompt hook + print(f"定时任务列表 ({len(tasks)} 个):") + for t in tasks: + status_icon = {"active": "✅", "paused": "⏸️", "done": "✔️", "expired": "⏰"}.get(t["status"], "❓") + if t["type"] == "cron": + print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}次") + else: + print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}") + else: + # 详细格式 + for t in tasks: + print(f"--- 任务: {t['name']} ---") + print(f" ID: {t['id']}") + print(f" 类型: {t['type']}") + print(f" 状态: {t['status']}") + if t["type"] == "cron": + print(f" Cron: {t['schedule']}") + print(f" 时区: {t.get('timezone', 'UTC')}") + else: + print(f" 计划时间: {t.get('scheduled_at', 'N/A')}") + print(f" 消息: {t['message']}") + print(f" 下次执行: {t.get('next_run_at', 'N/A')}") + print(f" 上次执行: {t.get('last_executed_at', 'N/A')}") + print(f" 执行次数: {t.get('execution_count', 0)}") + print(f" 创建时间: {t.get('created_at', 'N/A')}") + print() + + +def cmd_add(args, bot_id: str, user_id: str): + """添加任务""" + data = load_tasks(bot_id, user_id) + + task_id = generate_task_id() + now_utc = datetime.now(timezone.utc).isoformat() + + task = { + "id": task_id, + "name": args.name, + "type": args.type, + "schedule": None, + "scheduled_at": None, + "timezone": args.timezone or "UTC", + "message": args.message, + "status": "active", + "created_at": now_utc, + "last_executed_at": None, + "next_run_at": None, + "execution_count": 0, + } + + if args.type == "cron": + if not args.schedule: + print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr) + sys.exit(1) + # 验证 cron 表达式 + try: + croniter(args.schedule) + except (ValueError, KeyError) as e: + print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr) + sys.exit(1) + task["schedule"] = args.schedule + task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"]) + + elif args.type == "once": + if not args.scheduled_at: + print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr) + sys.exit(1) + try: + utc_time = parse_scheduled_at(args.scheduled_at) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + task["scheduled_at"] = utc_time + task["next_run_at"] = utc_time + + data["tasks"].append(task) + save_tasks(bot_id, user_id, data) + print(f"任务已创建: {task_id}") + print(f" 名称: {args.name}") + print(f" 类型: {args.type}") + print(f" 下次执行 (UTC): {task['next_run_at']}") + + +def cmd_edit(args, bot_id: str, user_id: str): + """编辑任务""" + data = load_tasks(bot_id, user_id) + + task = None + for t in data["tasks"]: + if t["id"] == args.task_id: + task = t + break + + if not task: + print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) + sys.exit(1) + + if args.name: + task["name"] = args.name + if args.message: + task["message"] = args.message + if args.schedule: + try: + croniter(args.schedule) + except (ValueError, KeyError) as e: + print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr) + sys.exit(1) + task["schedule"] = args.schedule + if args.timezone: + task["timezone"] = args.timezone + if args.scheduled_at: + try: + task["scheduled_at"] = parse_scheduled_at(args.scheduled_at) + task["next_run_at"] = task["scheduled_at"] + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + # 重新计算 next_run_at(如果是 cron 且修改了 schedule 或 timezone) + if task["type"] == "cron" and task.get("schedule"): + task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC")) + + save_tasks(bot_id, user_id, data) + print(f"任务已更新: {args.task_id}") + + +def cmd_delete(args, bot_id: str, user_id: str): + """删除任务""" + data = load_tasks(bot_id, user_id) + original_count = len(data["tasks"]) + data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id] + + if len(data["tasks"]) == original_count: + print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) + sys.exit(1) + + save_tasks(bot_id, user_id, data) + print(f"任务已删除: {args.task_id}") + + +def cmd_toggle(args, bot_id: str, user_id: str): + """暂停/恢复任务""" + data = load_tasks(bot_id, user_id) + + for t in data["tasks"]: + if t["id"] == args.task_id: + if t["status"] == "active": + t["status"] = "paused" + print(f"任务已暂停: {args.task_id}") + elif t["status"] == "paused": + t["status"] = "active" + # 恢复时重新计算 next_run_at + if t["type"] == "cron" and t.get("schedule"): + t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC")) + print(f"任务已恢复: {args.task_id}") + else: + print(f"任务状态为 {t['status']},无法切换", file=sys.stderr) + sys.exit(1) + save_tasks(bot_id, user_id, data) + return + + print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) + sys.exit(1) + + +def cmd_logs(args, bot_id: str, user_id: str): + """查看执行日志""" + logs_dir = get_logs_dir(bot_id, user_id) + log_file = logs_dir / "execution.log" + + if not log_file.exists(): + print("暂无执行日志。") + return + + with open(log_file, 'r', encoding='utf-8') as f: + logs = yaml.safe_load(f) + + if not logs: + print("暂无执行日志。") + return + + # 按任务 ID 过滤 + if args.task_id: + logs = [l for l in logs if l.get("task_id") == args.task_id] + + # 限制数量 + limit = args.limit or 10 + logs = logs[-limit:] + + if not logs: + print("没有匹配的执行日志。") + return + + for log in logs: + status_icon = "✅" if log.get("status") == "success" else "❌" + print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}") + response = log.get("response", "") + # 截断过长的响应 + if len(response) > 200: + response = response[:200] + "..." + print(f" {response}") + if log.get("duration_ms"): + print(f" 耗时: {log['duration_ms']}ms") + print() + + +def main(): + bot_id = os.getenv("BOT_ID", "") + user_id = os.getenv("USER_IDENTIFIER", "") + + if not bot_id or not user_id: + print("Error: BOT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr) + sys.exit(1) + + parser = argparse.ArgumentParser(description="定时任务管理工具") + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # list + p_list = subparsers.add_parser("list", help="列出所有任务") + p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式") + + # add + p_add = subparsers.add_parser("add", help="添加任务") + p_add.add_argument("--name", required=True, help="任务名称") + p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型") + p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)") + p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)") + p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC") + p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容") + + # edit + p_edit = subparsers.add_parser("edit", help="编辑任务") + p_edit.add_argument("task_id", help="任务 ID") + p_edit.add_argument("--name", help="新任务名称") + p_edit.add_argument("--schedule", help="新 Cron 表达式") + p_edit.add_argument("--scheduled-at", help="新执行时间") + p_edit.add_argument("--timezone", help="新时区") + p_edit.add_argument("--message", help="新消息内容") + + # delete + p_delete = subparsers.add_parser("delete", help="删除任务") + p_delete.add_argument("task_id", help="任务 ID") + + # toggle + p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务") + p_toggle.add_argument("task_id", help="任务 ID") + + # logs + p_logs = subparsers.add_parser("logs", help="查看执行日志") + p_logs.add_argument("--task-id", help="按任务 ID 过滤") + p_logs.add_argument("--limit", type=int, default=10, help="显示条数") + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + commands = { + "list": cmd_list, + "add": cmd_add, + "edit": cmd_edit, + "delete": cmd_delete, + "toggle": cmd_toggle, + "logs": cmd_logs, + } + + commands[args.command](args, bot_id, user_id) + + +if __name__ == "__main__": + main() diff --git a/utils/settings.py b/utils/settings.py index 61c38bc..b2ce36b 100644 --- a/utils/settings.py +++ b/utils/settings.py @@ -84,4 +84,18 @@ MEM0_ENABLED = os.getenv("MEM0_ENABLED", "true") == "true" # 召回记忆数量 MEM0_SEMANTIC_SEARCH_TOP_K = int(os.getenv("MEM0_SEMANTIC_SEARCH_TOP_K", "20")) +# ============================================================ +# Schedule Job 定时任务配置 +# ============================================================ + +# 是否启用定时任务调度器 +SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true" + +# 调度器扫描间隔(秒) +SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) + +# 最大并发执行任务数 +SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) + + os.environ["OPENAI_API_KEY"] = "your_api_key"