Compare commits

..

19 Commits

Author SHA1 Message Date
朱潮
a0e0c8c7b6 Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 20:56:54 +08:00
朱潮
85a262257e update schedule skill 2026-03-31 20:56:20 +08:00
朱潮
9d422eb3e7 Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 19:31:50 +08:00
朱潮
d38a737730 两个 Dockerfile 中的 npm 安装命令里添加了 nodemailer
和 dotenv:
2026-03-31 19:30:40 +08:00
朱潮
7b622d8544 Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 16:23:11 +08:00
朱潮
daa5bf345a update schedule skill 2026-03-31 16:23:03 +08:00
朱潮
bd91238a6e Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 16:11:18 +08:00
朱潮
bc59234eac update skill 2026-03-31 16:11:09 +08:00
朱潮
41b4329b5e Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 14:37:28 +08:00
朱潮
c2f7148f98 增加环境变量到pre prompt 2026-03-31 14:37:20 +08:00
朱潮
80559bdd7d _call_agent_v3 2026-03-31 14:29:24 +08:00
朱潮
b4cf5face0 Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 11:20:12 +08:00
朱潮
393c4e4138 schedule 通过 aiohttp POST 请求调用
http://127.0.0.1:8001/api/v2/chat/completions
2026-03-31 11:19:59 +08:00
朱潮
7ad51c95c0 Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 10:06:15 +08:00
朱潮
4090b4d734 将 users 改为 Path(users),使 / 运算符可以正确拼接路径。 2026-03-31 10:06:07 +08:00
朱潮
a665c01530 Merge branch 'feature/moshui20260330-schedule-job' into bot_manager 2026-03-31 10:00:27 +08:00
朱潮
9cc0d72430 update schedule 2026-03-31 10:00:16 +08:00
朱潮
2b749e988f merge from feature/moshui20260330-schedule-job 2026-03-31 09:46:17 +08:00
朱潮
3b9c7165a9 feat: 添加定时任务调度系统(schedule-job)
- 新增 schedule-job skill,支持 cron 周期任务和一次性定时任务
- 新增 schedule_manager.py CLI 工具(list/add/edit/delete/toggle/logs)
- 新增 ScheduleExecutor 全局异步调度器,每 60s 扫描到期任务并调用 agent 执行
- 任务数据存储在 projects/robot/{bot_id}/users/{user_id}/tasks.yaml
- 执行结果写入 task_logs/execution.log
- 集成到 FastAPI lifespan 生命周期管理
- 添加 croniter、pyyaml 依赖

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 23:17:47 +08:00
15 changed files with 1248 additions and 7 deletions

View File

@ -41,7 +41,7 @@ RUN pip install --no-cache-dir -r requirements.txt
# 安装 Playwright 并下载 Chromium
RUN pip install --no-cache-dir playwright && \
playwright install chromium
RUN npm install -g playwright sharp && \
RUN npm install -g playwright sharp nodemailer dotenv && \
npx playwright install chromium
# 复制应用代码

View File

@ -42,7 +42,7 @@ RUN pip install --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple/ -r req
# 安装 Playwright 并下载 Chromium
RUN pip install --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple/ playwright && \
playwright install chromium
RUN npm install -g playwright sharp && \
RUN npm install -g playwright sharp nodemailer dotenv && \
npx playwright install chromium
# 安装modelscope

View File

@ -166,12 +166,18 @@ async def _execute_command(skill_path: str, command: str, hook_type: str, config
try:
# 设置环境变量,传递给子进程
env = os.environ.copy()
env['BOT_ID'] = getattr(config, 'bot_id', '')
env['ASSISTANT_ID'] = getattr(config, 'bot_id', '')
env['USER_IDENTIFIER'] = getattr(config, 'user_identifier', '')
env['TRACE_ID'] = getattr(config, 'trace_id', '')
env['SESSION_ID'] = getattr(config, 'session_id', '')
env['LANGUAGE'] = getattr(config, 'language', '')
env['HOOK_TYPE'] = hook_type
# 合并 config 中的自定义 shell 环境变量
shell_env = getattr(config, 'shell_env', None)
if shell_env:
env.update(shell_env)
# 对于 PreSave传递 content
if hook_type == 'PreSave':
env['CONTENT'] = kwargs.get('content', '')

View File

@ -108,6 +108,7 @@ async def lifespan(app: FastAPI):
close_global_mem0
)
from utils.settings import CHECKPOINT_CLEANUP_ENABLED, MEM0_ENABLED
from utils.settings import SCHEDULE_ENABLED
# 1. 初始化共享的数据库连接池
db_pool_manager = await init_global_db_pool()
@ -155,10 +156,28 @@ async def lifespan(app: FastAPI):
db_pool_manager.start_cleanup_scheduler()
logger.info("Checkpoint cleanup scheduler started")
# 6. 启动定时任务调度器
schedule_executor = None
if SCHEDULE_ENABLED:
try:
from services.schedule_executor import get_schedule_executor
schedule_executor = get_schedule_executor()
schedule_executor.start()
logger.info("Schedule executor started")
except Exception as e:
logger.warning(f"Schedule executor start failed (non-fatal): {e}")
yield
# 关闭时清理(按相反顺序)
logger.info("Shutting down...")
# 关闭定时任务调度器
if schedule_executor:
try:
await schedule_executor.stop()
logger.info("Schedule executor stopped")
except Exception as e:
logger.warning(f"Schedule executor stop failed (non-fatal): {e}")
# 关闭 Mem0
if MEM0_ENABLED:
try:

249
plans/schedule-job.md Normal file
View File

@ -0,0 +1,249 @@
# feat: Schedule Job 定时任务系统
## 概述
为每个用户的每个 bot 提供定时任务能力,支持 cron 周期任务和一次性定时任务。到期后系统自动调用 Agent 执行任务,结果写入日志并通过 PostAgent Hook 推送通知。
## 背景
### 当前状态
系统已有的后台任务机制:
- **Huey 任务队列** (`task_queue/`): SQLite 后端,用于文件处理
- **AsyncIO 后台任务**: `asyncio.create_task()` 用于非阻塞操作
- **Checkpoint 清理调度器** (`agent/db_pool_manager.py`): asyncio loop 方式
**缺失能力**
- 无法为用户设置周期性或定时触发的 Agent 任务
- 无 cron 式调度器
- 无用户级任务管理接口
### 设计目标
- 用户通过 AI 对话即可创建/管理定时任务
- 任务数据以 YAML 文件存储在用户专属目录,便于查看和备份
- 全局调度器以 asyncio 后台任务运行,无需额外进程
- 复用现有 `create_agent_and_generate_response()` 执行任务
## 架构设计
```
┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────┐
│ AI Agent 对话 │ │ 全局异步调度器 │ │ Agent 执行引擎 │
│ (Shell脚本工具) │────▶│ (asyncio loop) │────▶│ create_agent_and_ │
│ 增删改查 tasks │ │ 每60s扫描tasks.yaml │ │ generate_response │
└─────────────────┘ └──────────────────────┘ └────────────────────┘
│ │ │
▼ ▼ ▼
tasks.yaml tasks.yaml task_logs/
(用户任务数据) (更新执行时间) (执行结果日志)
```
## 数据模型
### tasks.yaml
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/tasks.yaml`
```yaml
tasks:
- id: "task_20260330143000_abc123" # 自动生成
name: "每日新闻摘要" # 任务名称
type: "cron" # cron | once
schedule: "0 9 * * *" # cron 表达式type=cron 时)
scheduled_at: null # ISO 8601 UTCtype=once 时)
timezone: "Asia/Tokyo" # 用户时区,用于 cron 解析
message: "请帮我总结今天的科技新闻" # 发送给 agent 的消息
status: "active" # active | paused | done | expired
created_at: "2026-03-30T05:30:00Z"
last_executed_at: null # 上次执行时间UTC
next_run_at: "2026-03-31T00:00:00Z" # 下次执行时间UTC调度器用此判断
execution_count: 0 # 已执行次数
```
**关键设计决策**
- 所有时间统一 UTC 存储cron 表达式结合 timezone 字段在本地时间计算后转 UTC
- `next_run_at` 预计算,调度器只需简单比较时间戳即可判断是否到期
- 一次性任务执行后 status 自动变为 `done`
### execution.log
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/task_logs/execution.log`
```yaml
- task_id: "task_20260330143000_abc123"
task_name: "每日新闻摘要"
executed_at: "2026-03-31T00:00:15Z"
status: "success" # success | error
response: "今天的科技新闻摘要:..."
duration_ms: 12500
```
保留最近 100 条日志,自动清理旧记录。
## 新增文件
### 1. `skills/schedule-job/scripts/schedule_manager.py`
Shell 命令行工具argparse CLIAI 通过 shell 调用来管理用户的定时任务。
**环境变量输入**(通过 plugin hook 机制传入):
- `BOT_ID`: 当前 bot ID
- `USER_IDENTIFIER`: 当前用户标识
**支持的命令**
| 命令 | 说明 | 示例 |
|------|------|------|
| `list` | 列出任务 | `list --format brief` |
| `add` | 添加任务 | `add --name "每日新闻" --type cron --schedule "0 9 * * *" --timezone "Asia/Tokyo" --message "..."` |
| `edit` | 编辑任务 | `edit <task_id> --schedule "0 10 * * 1-5"` |
| `delete` | 删除任务 | `delete <task_id>` |
| `toggle` | 暂停/恢复 | `toggle <task_id>` |
| `logs` | 查看日志 | `logs --task-id <id> --limit 10` |
**核心逻辑**
- 一次性任务 `--scheduled-at` 接受 ISO 8601 格式(带时区偏移),内部转 UTC
- cron 任务通过 `croniter` + timezone 计算 `next_run_at`
- 自动创建 `users/{user_id}/` 目录
### 2. `skills/schedule-job/SKILL.md`
Skill 描述文件,注入 AI prompt告诉 AI
- 如何使用 schedule_manager.py CLI
- cron 表达式语法说明
- 时区映射规则(语言 → 时区)
- 使用示例
### 3. `skills/schedule-job/.claude-plugin/plugin.json`
```json
{
"hooks": {
"PrePrompt": {
"command": "python scripts/schedule_manager.py list --format brief"
}
}
}
```
通过 PrePrompt hookAI 每次对话时自动看到用户当前的任务列表。
### 4. `services/schedule_executor.py`
全局异步调度器,核心类 `ScheduleExecutor`
**运行机制**:参考 `db_pool_manager.py``_cleanup_loop()` 模式
```
启动 → asyncio.create_task(_scan_loop)
每 SCAN_INTERVAL 秒
遍历 projects/robot/*/users/*/tasks.yaml
找到 status=active && next_run_at <= now 的任务
asyncio.create_task(_execute_task) → 受 Semaphore 并发控制
构建 AgentConfig → create_agent_and_generate_response(stream=False)
写入 execution.log → 更新 tasks.yaml (next_run_at / status)
```
**并发与防重复**
- `_executing_tasks: set` 记录正在执行的任务 ID防止同一任务被重复触发
- `asyncio.Semaphore(MAX_CONCURRENT)` 限制最大并发数(默认 5
**任务执行后更新**
- cron 任务:使用 croniter 计算下次 UTC 时间写入 `next_run_at`
- once 任务:将 status 设为 `done`,清空 `next_run_at`
- 失败时也更新 `next_run_at`,避免无限重试
**Agent 调用构建**
```python
bot_config = await fetch_bot_config(bot_id)
project_dir = create_project_directory(dataset_ids, bot_id, skills)
config = AgentConfig(
bot_id=bot_id,
user_identifier=user_id,
session_id=f"schedule_{task_id}", # 专用 session
stream=False,
tool_response=False,
messages=[{"role": "user", "content": task["message"]}],
... # 其余参数从 bot_config 获取
)
result = await create_agent_and_generate_response(config)
```
## 修改文件
### `pyproject.toml`
新增依赖:
```toml
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
```
### `utils/settings.py`
新增环境变量:
```python
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) # 扫描间隔(秒)
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # 最大并发数
```
### `fastapi_app.py`
在 lifespan 中集成调度器生命周期:
- **startup**: `schedule_executor.start()` — 在 checkpoint 清理调度器之后启动
- **shutdown**: `await schedule_executor.stop()` — 在 Mem0 关闭之前停止
## 复用的现有组件
| 组件 | 路径 | 用途 |
|------|------|------|
| `create_agent_and_generate_response()` | `routes/chat.py:246` | 执行 agent 调用 |
| `fetch_bot_config()` | `utils/fastapi_utils.py` | 获取 bot 配置 |
| `create_project_directory()` | `utils/fastapi_utils.py` | 创建项目目录 |
| `AgentConfig` | `agent/agent_config.py` | 构建执行配置 |
| `_cleanup_loop()` 模式 | `agent/db_pool_manager.py:240` | asyncio 后台循环参考 |
| `execute_hooks('PostAgent')` | `agent/plugin_hook_loader.py` | 执行结果通知 |
## 验证方式
```bash
# 1. 启动服务
poetry run uvicorn fastapi_app:app --host 0.0.0.0 --port 8001
# 2. CLI 创建测试任务
BOT_ID=<bot_id> USER_IDENTIFIER=test_user \
poetry run python skills/schedule-job/scripts/schedule_manager.py add \
--name "测试任务" --type once \
--scheduled-at "$(date -u -v+2M '+%Y-%m-%dT%H:%M:%SZ')" \
--message "你好,这是一个定时任务测试"
# 3. 检查 tasks.yaml
cat projects/robot/<bot_id>/users/test_user/tasks.yaml
# 4. 等待调度器执行(最多 60 秒),观察应用日志
# 5. 检查执行结果
cat projects/robot/<bot_id>/users/test_user/task_logs/execution.log
# 6. 通过 API 对话测试 skill
curl -X POST http://localhost:8001/api/v2/chat/completions \
-H 'authorization: Bearer <token>' \
-H 'content-type: application/json' \
-d '{"messages":[{"role":"user","content":"帮我创建一个每天早上9点的定时任务"}],"stream":false,"bot_id":"<bot_id>","user_identifier":"test_user"}'
```
## 后续扩展方向
- **任务失败重试**: 可增加 `max_retries``retry_delay` 字段
- **任务执行超时**: 为单个任务设置超时限制
- **通知渠道**: 集成飞书/邮件等 PostAgent Hook 推送执行结果
- **Web 管理界面**: 提供 REST API 查询/管理定时任务
- **任务模板**: 预设常用任务模板(每日新闻、天气播报等)

18
poetry.lock generated
View File

@ -727,6 +727,22 @@ files = [
]
markers = {main = "platform_system == \"Windows\"", dev = "sys_platform == \"win32\""}
[[package]]
name = "croniter"
version = "3.0.4"
description = "croniter provides iteration for datetime object with cron like format"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6"
groups = ["main"]
files = [
{file = "croniter-3.0.4-py2.py3-none-any.whl", hash = "sha256:96e14cdd5dcb479dd48d7db14b53d8434b188dfb9210448bef6f65663524a6f0"},
{file = "croniter-3.0.4.tar.gz", hash = "sha256:f9dcd4bdb6c97abedb6f09d6ed3495b13ede4d4544503fa580b6372a56a0c520"},
]
[package.dependencies]
python-dateutil = "*"
pytz = ">2021.1"
[[package]]
name = "cryptography"
version = "46.0.5"
@ -6991,4 +7007,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<3.15"
content-hash = "c9c4f80cdbf7d6bce20f65f40b9adce05c5f4a830299de148fcd8482937bddb0"
content-hash = "a92b7ea21f349d57bb145e9367235201ff22924605d5f12b8fa98f3e46889039"

View File

@ -42,6 +42,8 @@ dependencies = [
"websockets (>=15.0.0,<16.0.0)",
"setuptools (<71)",
"webrtcvad (>=2.0.10,<3.0.0)",
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
]
[tool.poetry.requires-plugins]

View File

@ -21,6 +21,7 @@ chardet==7.1.0 ; python_version >= "3.12" and python_version < "3.15"
charset-normalizer==3.4.5 ; python_version >= "3.12" and python_version < "3.15"
click==8.3.1 ; python_version >= "3.12" and python_version < "3.15"
colorama==0.4.6 ; python_version >= "3.12" and python_version < "3.15" and platform_system == "Windows"
croniter==3.0.4 ; python_version >= "3.12" and python_version < "3.15"
cryptography==46.0.5 ; python_version >= "3.12" and python_version < "3.15"
daytona-api-client-async==0.151.0 ; python_version >= "3.12" and python_version < "3.15"
daytona-api-client==0.151.0 ; python_version >= "3.12" and python_version < "3.15"

View File

@ -0,0 +1,326 @@
"""
全局定时任务调度器
扫描所有 projects/robot/{bot_id}/users/{user_id}/tasks.yaml 文件
找到到期的任务并调用 create_agent_and_generate_response 执行
"""
import asyncio
import logging
import time
import yaml
import aiohttp
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
logger = logging.getLogger('app')
class ScheduleExecutor:
"""定时任务调度器,以 asyncio 后台任务运行"""
def __init__(self, scan_interval: int = 60, max_concurrent: int = 5):
self._scan_interval = scan_interval
self._max_concurrent = max_concurrent
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._executing_tasks: set = set() # 正在执行的任务 ID防重复
self._semaphore: Optional[asyncio.Semaphore] = None
def start(self):
"""启动调度器"""
if self._task is not None and not self._task.done():
logger.warning("Schedule executor is already running")
return
self._stop_event.clear()
self._semaphore = asyncio.Semaphore(self._max_concurrent)
self._task = asyncio.create_task(self._scan_loop())
logger.info(
f"Schedule executor started: interval={self._scan_interval}s, "
f"max_concurrent={self._max_concurrent}"
)
async def stop(self):
"""停止调度器"""
self._stop_event.set()
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Schedule executor stopped")
async def _scan_loop(self):
"""主扫描循环"""
while not self._stop_event.is_set():
try:
await self._scan_and_execute()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Schedule scan error: {e}")
# 等待下一次扫描或停止信号
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self._scan_interval
)
break # 收到停止信号
except asyncio.TimeoutError:
pass # 超时继续下一轮扫描
async def _scan_and_execute(self):
"""扫描所有 tasks.yaml找到到期任务并触发执行"""
now = datetime.now(timezone.utc)
robot_dir = Path("projects/robot")
if not robot_dir.exists():
return
tasks_files = list(robot_dir.glob("*/users/*/tasks.yaml"))
if not tasks_files:
return
for tasks_file in tasks_files:
try:
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data or not data.get("tasks"):
continue
# 从路径提取 bot_id 和 user_id
parts = tasks_file.parts
# 路径格式: .../projects/robot/{bot_id}/users/{user_id}/tasks.yaml
bot_id = parts[-4]
user_id = parts[-2]
for task in data["tasks"]:
if task.get("status") != "active":
continue
if task["id"] in self._executing_tasks:
continue
next_run_str = task.get("next_run_at")
if not next_run_str:
continue
try:
next_run = datetime.fromisoformat(next_run_str)
if next_run.tzinfo is None:
next_run = next_run.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
logger.warning(f"Invalid next_run_at for task {task['id']}: {next_run_str}")
continue
if next_run <= now:
# 到期,触发执行
asyncio.create_task(
self._execute_task(bot_id, user_id, task, tasks_file)
)
except Exception as e:
logger.error(f"Error reading {tasks_file}: {e}")
async def _execute_task(self, bot_id: str, user_id: str, task: dict, tasks_file: Path):
"""执行单个到期任务"""
task_id = task["id"]
self._executing_tasks.add(task_id)
start_time = time.time()
try:
async with self._semaphore:
logger.info(f"Executing scheduled task: {task_id} ({task.get('name', '')}) for bot={bot_id} user={user_id}")
# 调用 agent
response_text = await self._call_agent_v3(bot_id, user_id, task)
# 写入日志
duration_ms = int((time.time() - start_time) * 1000)
self._write_log(bot_id, user_id, task, response_text, "success", duration_ms)
# 更新 tasks.yaml
self._update_task_after_execution(task_id, tasks_file)
logger.info(f"Task {task_id} completed in {duration_ms}ms")
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"Task {task_id} execution failed: {e}")
self._write_log(bot_id, user_id, task, f"ERROR: {e}", "error", duration_ms)
# 即使失败也更新 next_run_at避免无限重试
self._update_task_after_execution(task_id, tasks_file)
finally:
self._executing_tasks.discard(task_id)
async def _call_agent_v2(self, bot_id: str, user_id: str, task: dict) -> str:
"""通过 HTTP 调用 /api/v2/chat/completions 接口"""
from utils.fastapi_utils import generate_v2_auth_token
url = f"http://127.0.0.1:8001/api/v2/chat/completions"
auth_token = generate_v2_auth_token(bot_id)
payload = {
"messages": [{"role": "user", "content": task["message"]}],
"stream": False,
"bot_id": bot_id,
"tool_response": False,
"session_id": f"schedule_{task['id']}",
"user_identifier": user_id,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {auth_token}",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=300)) as resp:
if resp.status != 200:
body = await resp.text()
raise RuntimeError(f"API returned {resp.status}: {body}")
data = await resp.json()
return data["choices"][0]["message"]["content"]
async def _call_agent_v3(self, bot_id: str, user_id: str, task: dict) -> str:
"""通过 HTTP 调用 /api/v3/chat/completions 接口(从数据库读取配置)"""
url = "http://127.0.0.1:8001/api/v3/chat/completions"
payload = {
"messages": [{"role": "user", "content": task["message"]}],
"stream": False,
"bot_id": bot_id,
"session_id": f"schedule_{task['id']}",
"user_identifier": user_id,
}
headers = {
"Content-Type": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=300)) as resp:
if resp.status != 200:
body = await resp.text()
raise RuntimeError(f"API returned {resp.status}: {body}")
data = await resp.json()
return data["choices"][0]["message"]["content"]
def _update_task_after_execution(self, task_id: str, tasks_file: Path):
"""执行后更新 tasks.yaml"""
try:
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data or not data.get("tasks"):
return
now_utc = datetime.now(timezone.utc).isoformat()
for task in data["tasks"]:
if task["id"] != task_id:
continue
task["last_executed_at"] = now_utc
task["execution_count"] = task.get("execution_count", 0) + 1
if task["type"] == "once":
task["status"] = "done"
task["next_run_at"] = None
elif task["type"] == "cron" and task.get("schedule"):
# 计算下次执行时间
task["next_run_at"] = self._compute_next_run(
task["schedule"],
task.get("timezone", "UTC")
)
break
with open(tasks_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
except Exception as e:
logger.error(f"Failed to update task {task_id}: {e}")
def _compute_next_run(self, schedule: str, tz: str) -> str:
"""计算 cron 任务的下次执行 UTC 时间"""
from croniter import croniter
# 时区偏移映射
tz_offsets = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
'UTC': 0,
'America/New_York': -5,
'America/Los_Angeles': -8,
'Europe/London': 0,
'Europe/Berlin': 1,
}
offset_hours = tz_offsets.get(tz, 0)
offset = timedelta(hours=offset_hours)
now_utc = datetime.now(timezone.utc)
now_local = (now_utc + offset).replace(tzinfo=None)
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def _write_log(self, bot_id: str, user_id: str, task: dict,
response: str, status: str, duration_ms: int):
"""写入执行日志"""
logs_dir = Path("projects/robot") / bot_id / "users" / user_id / "task_logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_file = logs_dir / "execution.log"
log_entry = {
"task_id": task["id"],
"task_name": task.get("name", ""),
"executed_at": datetime.now(timezone.utc).isoformat(),
"status": status,
"response": response[:2000] if response else "", # 截断过长响应
"duration_ms": duration_ms,
}
# 追加写入 YAML 列表
existing_logs = []
if log_file.exists():
try:
with open(log_file, 'r', encoding='utf-8') as f:
existing_logs = yaml.safe_load(f) or []
except Exception:
existing_logs = []
existing_logs.append(log_entry)
# 保留最近 100 条日志
if len(existing_logs) > 100:
existing_logs = existing_logs[-100:]
with open(log_file, 'w', encoding='utf-8') as f:
yaml.dump(existing_logs, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# 全局单例
_executor: Optional[ScheduleExecutor] = None
def get_schedule_executor() -> ScheduleExecutor:
"""获取全局调度器实例"""
global _executor
if _executor is None:
from utils.settings import SCHEDULE_SCAN_INTERVAL, SCHEDULE_MAX_CONCURRENT
_executor = ScheduleExecutor(
scan_interval=SCHEDULE_SCAN_INTERVAL,
max_concurrent=SCHEDULE_MAX_CONCURRENT,
)
return _executor

View File

@ -0,0 +1,10 @@
{
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python scripts/schedule_manager.py list --format brief"
}
]
}
}

View File

@ -0,0 +1,178 @@
---
name: schedule-job
description: Scheduled Task Management - Create, manage, and view scheduled tasks for users (supports cron recurring tasks and one-time tasks)
---
# Schedule Job - Scheduled Task Management
Manage scheduled tasks for users, supporting cron recurring tasks and one-time scheduled tasks. The system automatically executes AI conversations when tasks are due.
## Quick Start
When a user requests to create a scheduled task:
1. Confirm the task type (recurring cron / one-time once)
2. Determine the time (cron expression or specific time) and timezone
3. Confirm notification method: Check the currently enabled skills in the context and recommend a suitable notification method to the user (e.g., email, Telegram, etc.) for confirmation. If no notification skills are available, proactively ask the user: "How would you like to be notified when the scheduled task completes?"
4. Compose the message content to send to the AI (refer to Message Writing Guidelines)
5. Call schedule_manager.py to create the task
## Instructions
### Tool Path
All operations are executed via shell commands:
```bash
python {skill_dir}/scripts/schedule_manager.py <command> [options]
```
### Available Commands
#### List Tasks
```bash
python {skill_dir}/scripts/schedule_manager.py list
python {skill_dir}/scripts/schedule_manager.py list --format brief
```
#### Add Cron Recurring Task
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Task Name" \
--type cron \
--schedule "0 9 * * *" \
--timezone "Asia/Tokyo" \
--message "[Scheduled Task Triggered] Please execute the following task immediately: Search and summarize today's important tech news, listing the Top 5 in a concise format. After completion, select an appropriate notification method based on the currently enabled skills to push the results to the user."
```
#### Add One-Time Task
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Meeting Reminder" \
--type once \
--scheduled-at "2026-04-01T10:00:00+09:00" \
--message "[Scheduled Task Triggered] A scheduled reminder has arrived. The schedule content is: 'Meeting at 10:00'. Please compose an appropriate reminder message based on the schedule content, then select an appropriate notification method based on the currently enabled skills to send the reminder to the user."
```
#### Edit Task
```bash
python {skill_dir}/scripts/schedule_manager.py edit <task_id> \
--schedule "0 10 * * 1-5" \
--message "New message content"
```
#### Delete Task
```bash
python {skill_dir}/scripts/schedule_manager.py delete <task_id>
```
#### Pause/Resume Task
```bash
python {skill_dir}/scripts/schedule_manager.py toggle <task_id>
```
#### View Execution Logs
```bash
python {skill_dir}/scripts/schedule_manager.py logs --limit 10
python {skill_dir}/scripts/schedule_manager.py logs --task-id <task_id>
```
### Timezone Mapping
Automatically recommend timezone based on user language:
- Chinese (zh) → Asia/Shanghai (UTC+8)
- Japanese (ja/jp) → Asia/Tokyo (UTC+9)
- English (en) → UTC
### Cron Expression Reference
Standard 5-field format: `minute hour day-of-month month day-of-week`
Common examples:
| Expression | Meaning |
|-----------|---------|
| `0 9 * * *` | Every day at 9:00 |
| `0 9 * * 1-5` | Monday to Friday at 9:00 |
| `30 8 * * 1` | Every Monday at 8:30 |
| `0 */2 * * *` | Every 2 hours |
| `0 9,18 * * *` | Every day at 9:00 and 18:00 |
**Note**: Cron expression times are based on the timezone specified by --timezone.
### One-Time Task Time Format
Supports ISO 8601 format (timezone offset recommended):
- `2026-04-01T10:00:00+09:00` (Japan Time)
- `2026-04-01T01:00:00Z` (UTC)
- `2026-04-01T08:00:00+08:00` (China Time)
## Message Writing Guidelines
The message is an execution instruction sent to the AI agent when the scheduled task triggers. The agent has no conversation context at execution time, so the message must include:
1. **Task Trigger Indicator**: Start with "[Scheduled Task Triggered]" (or equivalent in the user's language) to clearly indicate this is a triggered scheduled task execution, NOT a request to create a task
2. **Task Content**: What specifically needs to be done (use imperative verbs like "Please execute immediately", "Please do")
3. **Notification Instruction**: Based on the notification method confirmed by the user, explicitly specify the notification channel in the message (e.g., "notify the user via email"). If the user has not specified, write "select an appropriate notification method based on the currently enabled skills to push the results to the user"
4. **Language**: The message language must match the language the user is currently using in the conversation (e.g., if the user speaks Japanese, write the message in Japanese)
### Important: Avoid Ambiguity
The message will be executed by an AI agent when the scheduled time arrives. The message must clearly indicate this is an **execution instruction**, not a request to **create a scheduled task**.
### Schedule Reminder Scenario (One-Time Task)
❌ Wrong: `"Reminder: You have a meeting to attend now, don't forget!"`
❌ Wrong: `"It's tomorrow at 3 PM, please remind the user to attend the meeting"` (Agent will think this is a request to create a reminder)
✅ Correct: `"[Scheduled Task Triggered] A scheduled reminder has arrived. The schedule content is: 'Product review meeting at 3:00 PM'. Please compose an appropriate reminder message based on the schedule content, then select an appropriate notification method based on the currently enabled skills to send the reminder to the user."`
### Recurring Task Scenario (Cron Task)
❌ Wrong: `"It's 8:40 PM every day, please remind the user to write a daily report. Please send message via Lark Webhook, Webhook URL: xxx"` (Agent will think this is a request to create a daily task)
❌ Wrong: `"Execute: Get latest news"`
✅ Correct: `"[Scheduled Task Triggered] Please execute the following task immediately: Search and summarize today's important tech news, listing the Top 5 in a concise format. After completion, select an appropriate notification method based on the currently enabled skills to push the results to the user."`
✅ Correct: `"[Scheduled Task Triggered] This is a daily reminder task. Please execute immediately: Remind the user to write a daily report. Please send the message via Lark Webhook, Webhook URL: xxx"`
## Examples
**User**: "Set up a daily news summary task at 9 AM"
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Daily News Summary" \
--type cron \
--schedule "0 9 * * *" \
--timezone "Asia/Tokyo" \
--message "[Scheduled Task Triggered] Please execute the following task immediately: Search and summarize today's important tech news, listing the Top 5 in a concise format. After completion, select an appropriate notification method based on the currently enabled skills to push the results to the user."
```
**User**: "Remind me about the meeting tomorrow at 3 PM"
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Meeting Reminder" \
--type once \
--scheduled-at "2026-04-01T15:00:00+09:00" \
--message "[Scheduled Task Triggered] A scheduled reminder has arrived. The schedule content is: 'Meeting at 3:00 PM'. Please compose an appropriate reminder message based on the schedule content, then select an appropriate notification method based on the currently enabled skills to send the reminder to the user."
```
**User**: "Change the daily news task to 10 AM"
```bash
# First check the task list to get the task_id
python {skill_dir}/scripts/schedule_manager.py list
# Then edit
python {skill_dir}/scripts/schedule_manager.py edit <task_id> --schedule "0 10 * * *"
```
## Guidelines
- Before creating a task, use `list` to check existing tasks and avoid duplicates
- Automatically set the appropriate timezone based on user language
- **Must confirm notification method before creating a task**: Check the currently enabled skills, recommend available notification channels to the user and confirm. If no notification skills are available, proactively ask the user how they would like to receive results
- Message content is a complete execution instruction for the AI agent and must include both task content and notification instructions (refer to the Message Writing Guidelines above)
- The message language must match the language the user is currently using in the conversation
- One-time task times cannot be in the past
- When editing a task, only modify the fields the user requested — do not change other fields

View File

@ -0,0 +1,420 @@
#!/usr/bin/env python3
"""
定时任务管理 CLI 工具
用于增删改查用户的定时任务数据存储在 tasks.yaml 文件中
环境变量:
ASSISTANT_ID: 当前 bot ID
USER_IDENTIFIER: 当前用户标识
"""
import argparse
import os
import sys
import yaml
import json
import string
import random
from datetime import datetime, timezone, timedelta
from pathlib import Path
try:
from croniter import croniter
except ImportError:
print("Error: croniter is required. Install with: pip install croniter", file=sys.stderr)
sys.exit(1)
# 语言到时区的映射
LANGUAGE_TIMEZONE_MAP = {
'zh': 'Asia/Shanghai',
'ja': 'Asia/Tokyo',
'jp': 'Asia/Tokyo',
'en': 'UTC',
}
# 时区到 UTC 偏移(小时)
TIMEZONE_OFFSET_MAP = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
'UTC': 0,
'America/New_York': -5,
'America/Los_Angeles': -8,
'Europe/London': 0,
'Europe/Berlin': 1,
}
def get_tasks_dir(bot_id: str, user_id: str) -> Path:
"""获取用户任务目录路径"""
return Path("users") / user_id
def get_tasks_file(bot_id: str, user_id: str) -> Path:
"""获取 tasks.yaml 文件路径"""
return get_tasks_dir(bot_id, user_id) / "tasks.yaml"
def get_logs_dir(bot_id: str, user_id: str) -> Path:
"""获取任务日志目录"""
return get_tasks_dir(bot_id, user_id) / "task_logs"
def load_tasks(bot_id: str, user_id: str) -> dict:
"""加载 tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
if not tasks_file.exists():
return {"tasks": []}
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
return data if data and "tasks" in data else {"tasks": []}
def save_tasks(bot_id: str, user_id: str, data: dict):
"""保存 tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
tasks_file.parent.mkdir(parents=True, exist_ok=True)
with open(tasks_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
def generate_task_id() -> str:
"""生成唯一任务 ID"""
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"task_{ts}_{rand}"
def parse_timezone_offset(tz: str) -> int:
"""获取时区的 UTC 偏移小时数"""
return TIMEZONE_OFFSET_MAP.get(tz, 0)
def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str:
"""
根据 cron 表达式和时区计算下次执行的 UTC 时间
cron 表达式是基于用户本地时间的需要先在本地时间计算下次触发再转换为 UTC
"""
offset_hours = parse_timezone_offset(tz)
offset = timedelta(hours=offset_hours)
# 当前 UTC 时间
now_utc = after or datetime.now(timezone.utc)
# 转为用户本地时间naive
now_local = (now_utc + offset).replace(tzinfo=None)
# 在本地时间上计算下次 cron 触发
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
# 转回 UTC
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def parse_scheduled_at(scheduled_at_str: str) -> str:
"""解析一次性任务的时间字符串,返回 UTC ISO 格式"""
# 尝试解析带时区偏移的 ISO 格式
try:
dt = datetime.fromisoformat(scheduled_at_str)
if dt.tzinfo is None:
# 无时区信息,假设 UTC
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except ValueError:
pass
# 尝试解析常见格式
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]:
try:
dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc)
return dt.isoformat()
except ValueError:
continue
raise ValueError(f"无法解析时间格式: {scheduled_at_str}")
def cmd_list(args, bot_id: str, user_id: str):
"""列出所有任务"""
data = load_tasks(bot_id, user_id)
tasks = data.get("tasks", [])
if not tasks:
print("当前没有定时任务。")
return
if args.format == "brief":
# 简洁格式,用于 PrePrompt hook
print(f"定时任务列表 ({len(tasks)} 个):")
for t in tasks:
status_icon = {"active": "", "paused": "⏸️", "done": "✔️", "expired": ""}.get(t["status"], "")
if t["type"] == "cron":
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}")
else:
print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}")
else:
# 详细格式
for t in tasks:
print(f"--- 任务: {t['name']} ---")
print(f" ID: {t['id']}")
print(f" 类型: {t['type']}")
print(f" 状态: {t['status']}")
if t["type"] == "cron":
print(f" Cron: {t['schedule']}")
print(f" 时区: {t.get('timezone', 'UTC')}")
else:
print(f" 计划时间: {t.get('scheduled_at', 'N/A')}")
print(f" 消息: {t['message']}")
print(f" 下次执行: {t.get('next_run_at', 'N/A')}")
print(f" 上次执行: {t.get('last_executed_at', 'N/A')}")
print(f" 执行次数: {t.get('execution_count', 0)}")
print(f" 创建时间: {t.get('created_at', 'N/A')}")
print()
def cmd_add(args, bot_id: str, user_id: str):
"""添加任务"""
data = load_tasks(bot_id, user_id)
task_id = generate_task_id()
now_utc = datetime.now(timezone.utc).isoformat()
task = {
"id": task_id,
"name": args.name,
"type": args.type,
"schedule": None,
"scheduled_at": None,
"timezone": args.timezone or "UTC",
"message": args.message,
"status": "active",
"created_at": now_utc,
"last_executed_at": None,
"next_run_at": None,
"execution_count": 0,
}
if args.type == "cron":
if not args.schedule:
print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr)
sys.exit(1)
# 验证 cron 表达式
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"])
elif args.type == "once":
if not args.scheduled_at:
print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr)
sys.exit(1)
try:
utc_time = parse_scheduled_at(args.scheduled_at)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
task["scheduled_at"] = utc_time
task["next_run_at"] = utc_time
data["tasks"].append(task)
save_tasks(bot_id, user_id, data)
print(f"任务已创建: {task_id}")
print(f" 名称: {args.name}")
print(f" 类型: {args.type}")
print(f" 下次执行 (UTC): {task['next_run_at']}")
def cmd_edit(args, bot_id: str, user_id: str):
"""编辑任务"""
data = load_tasks(bot_id, user_id)
task = None
for t in data["tasks"]:
if t["id"] == args.task_id:
task = t
break
if not task:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
sys.exit(1)
if args.name:
task["name"] = args.name
if args.message:
task["message"] = args.message
if args.schedule:
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
if args.timezone:
task["timezone"] = args.timezone
if args.scheduled_at:
try:
task["scheduled_at"] = parse_scheduled_at(args.scheduled_at)
task["next_run_at"] = task["scheduled_at"]
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# 重新计算 next_run_at如果是 cron 且修改了 schedule 或 timezone
if task["type"] == "cron" and task.get("schedule"):
task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC"))
save_tasks(bot_id, user_id, data)
print(f"任务已更新: {args.task_id}")
def cmd_delete(args, bot_id: str, user_id: str):
"""删除任务"""
data = load_tasks(bot_id, user_id)
original_count = len(data["tasks"])
data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id]
if len(data["tasks"]) == original_count:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
print(f"任务已删除: {args.task_id}")
def cmd_toggle(args, bot_id: str, user_id: str):
"""暂停/恢复任务"""
data = load_tasks(bot_id, user_id)
for t in data["tasks"]:
if t["id"] == args.task_id:
if t["status"] == "active":
t["status"] = "paused"
print(f"任务已暂停: {args.task_id}")
elif t["status"] == "paused":
t["status"] = "active"
# 恢复时重新计算 next_run_at
if t["type"] == "cron" and t.get("schedule"):
t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC"))
print(f"任务已恢复: {args.task_id}")
else:
print(f"任务状态为 {t['status']},无法切换", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
return
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
sys.exit(1)
def cmd_logs(args, bot_id: str, user_id: str):
"""查看执行日志"""
logs_dir = get_logs_dir(bot_id, user_id)
log_file = logs_dir / "execution.log"
if not log_file.exists():
print("暂无执行日志。")
return
with open(log_file, 'r', encoding='utf-8') as f:
logs = yaml.safe_load(f)
if not logs:
print("暂无执行日志。")
return
# 按任务 ID 过滤
if args.task_id:
logs = [l for l in logs if l.get("task_id") == args.task_id]
# 限制数量
limit = args.limit or 10
logs = logs[-limit:]
if not logs:
print("没有匹配的执行日志。")
return
for log in logs:
status_icon = "" if log.get("status") == "success" else ""
print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}")
response = log.get("response", "")
# 截断过长的响应
if len(response) > 200:
response = response[:200] + "..."
print(f" {response}")
if log.get("duration_ms"):
print(f" 耗时: {log['duration_ms']}ms")
print()
def main():
bot_id = os.getenv("ASSISTANT_ID", "")
user_id = os.getenv("USER_IDENTIFIER", "")
if not bot_id or not user_id:
print("Error: ASSISTANT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr)
sys.exit(1)
parser = argparse.ArgumentParser(description="定时任务管理工具")
subparsers = parser.add_subparsers(dest="command", help="可用命令")
# list
p_list = subparsers.add_parser("list", help="列出所有任务")
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式")
# add
p_add = subparsers.add_parser("add", help="添加任务")
p_add.add_argument("--name", required=True, help="任务名称")
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型")
p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)")
p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)")
p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC")
p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容")
# edit
p_edit = subparsers.add_parser("edit", help="编辑任务")
p_edit.add_argument("task_id", help="任务 ID")
p_edit.add_argument("--name", help="新任务名称")
p_edit.add_argument("--schedule", help="新 Cron 表达式")
p_edit.add_argument("--scheduled-at", help="新执行时间")
p_edit.add_argument("--timezone", help="新时区")
p_edit.add_argument("--message", help="新消息内容")
# delete
p_delete = subparsers.add_parser("delete", help="删除任务")
p_delete.add_argument("task_id", help="任务 ID")
# toggle
p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务")
p_toggle.add_argument("task_id", help="任务 ID")
# logs
p_logs = subparsers.add_parser("logs", help="查看执行日志")
p_logs.add_argument("--task-id", help="按任务 ID 过滤")
p_logs.add_argument("--limit", type=int, default=10, help="显示条数")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
commands = {
"list": cmd_list,
"add": cmd_add,
"edit": cmd_edit,
"delete": cmd_delete,
"toggle": cmd_toggle,
"logs": cmd_logs,
}
commands[args.command](args, bot_id, user_id)
if __name__ == "__main__":
main()

View File

@ -80,7 +80,7 @@ Hook 脚本通过子进程执行,通过环境变量接收参数,通过 stdou
| 环境变量 | 说明 | 适用于 |
|---------|------|--------|
| `BOT_ID` | Bot ID | 所有 hook |
| `ASSISTANT_ID` | Bot ID | 所有 hook |
| `USER_IDENTIFIER` | 用户标识 | 所有 hook |
| `SESSION_ID` | 会话 ID | 所有 hook |
| `LANGUAGE` | 语言代码 | 所有 hook |
@ -99,7 +99,7 @@ import sys
def main():
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('BOT_ID', '')
bot_id = os.environ.get('ASSISTANT_ID', '')
# 输出要注入到 prompt 中的内容
print(f"## User Context\n\n用户: {user_identifier}")

View File

@ -11,7 +11,7 @@ import sys
def main():
"""从环境变量读取参数并输出注入内容"""
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('BOT_ID', '')
bot_id = os.environ.get('ASSISTANT_ID', '')
# 示例:根据 user_identifier 查询用户上下文
# 这里只是演示,实际应该从数据库或其他服务获取

View File

@ -84,6 +84,20 @@ MEM0_ENABLED = os.getenv("MEM0_ENABLED", "true") == "true"
# 召回记忆数量
MEM0_SEMANTIC_SEARCH_TOP_K = int(os.getenv("MEM0_SEMANTIC_SEARCH_TOP_K", "20"))
# ============================================================
# Schedule Job 定时任务配置
# ============================================================
# 是否启用定时任务调度器
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
# 调度器扫描间隔(秒)
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60"))
# 最大并发执行任务数
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5"))
os.environ["OPENAI_API_KEY"] = "your_api_key"
# ============================================================