Merge branch 'onprem-dev' into onprem-release

This commit is contained in:
朱潮 2026-04-11 10:38:56 +08:00
commit 675d7d3e12
20 changed files with 1289 additions and 42 deletions

View File

@ -40,7 +40,7 @@ RUN pip install --no-cache-dir -r requirements.txt
# 安装 Playwright 并下载 Chromium
RUN pip install --no-cache-dir playwright && \
playwright install chromium
RUN npm install -g playwright sharp && \
RUN npm install -g playwright sharp nodemailer dotenv && \
npx playwright install chromium
# 复制应用代码

View File

@ -41,7 +41,7 @@ RUN pip install --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple/ -r req
# 安装 Playwright 并下载 Chromium
RUN pip install --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple/ playwright && \
playwright install chromium
RUN npm install -g playwright sharp && \
RUN npm install -g playwright sharp nodemailer dotenv && \
npx playwright install chromium
# 安装modelscope

View File

@ -165,21 +165,30 @@ async def _execute_command(skill_path: str, command: str, hook_type: str, config
"""
try:
# 设置环境变量,传递给子进程
# 注意subprocess 要求所有 env 值必须是 str 类型,
# getattr 可能返回 None属性存在但值为 None需要确保转换为 str
env = os.environ.copy()
env['BOT_ID'] = getattr(config, 'bot_id', '')
env['USER_IDENTIFIER'] = getattr(config, 'user_identifier', '')
env['SESSION_ID'] = getattr(config, 'session_id', '')
env['LANGUAGE'] = getattr(config, 'language', '')
env['ASSISTANT_ID'] = str(getattr(config, 'bot_id', ''))
env['USER_IDENTIFIER'] = str(getattr(config, 'user_identifier', ''))
env['TRACE_ID'] = str(getattr(config, 'trace_id', ''))
env['SESSION_ID'] = str(getattr(config, 'session_id', ''))
env['LANGUAGE'] = str(getattr(config, 'language', ''))
env['HOOK_TYPE'] = hook_type
# 合并 config 中的自定义 shell 环境变量
shell_env = getattr(config, 'shell_env', None)
if shell_env:
# 确保所有自定义环境变量值也是字符串
env.update({k: str(v) if v is not None else '' for k, v in shell_env.items()})
# 对于 PreSave传递 content
if hook_type == 'PreSave':
env['CONTENT'] = kwargs.get('content', '')
env['ROLE'] = kwargs.get('role', '')
env['CONTENT'] = str(kwargs.get('content', '') or '')
env['ROLE'] = str(kwargs.get('role', '') or '')
# 对于 PostAgent传递 response
if hook_type == 'PostAgent':
env['RESPONSE'] = kwargs.get('response', '')
env['RESPONSE'] = str(kwargs.get('response', '') or '')
metadata = kwargs.get('metadata', {})
env['METADATA'] = json.dumps(metadata) if metadata else ''

View File

@ -31,7 +31,6 @@ services:
# 应用配置
- BACKEND_HOST=http://api-dev.gbase.ai
- MAX_CONTEXT_TOKENS=262144
- DEFAULT_THINKING_ENABLE=true
# PostgreSQL 配置
- CHECKPOINT_DB_URL=postgresql://postgres:E5ACJo6zJub4QS@postgres:5432/agent_db
- R2_UPLOAD_CONFIG=/app/config/local-upload.yaml

View File

@ -12,7 +12,6 @@ services:
# 应用配置
- BACKEND_HOST=http://api-dev.gbase.ai
- MAX_CONTEXT_TOKENS=262144
- DEFAULT_THINKING_ENABLE=true
- R2_UPLOAD_CONFIG=/app/config/s3-upload-sparticle.yaml
volumes:
# 挂载项目数据目录

View File

@ -108,6 +108,7 @@ async def lifespan(app: FastAPI):
close_global_mem0
)
from utils.settings import CHECKPOINT_CLEANUP_ENABLED, MEM0_ENABLED
from utils.settings import SCHEDULE_ENABLED
# 1. 初始化共享的数据库连接池
db_pool_manager = await init_global_db_pool()
@ -141,10 +142,28 @@ async def lifespan(app: FastAPI):
db_pool_manager.start_cleanup_scheduler()
logger.info("Checkpoint cleanup scheduler started")
# 6. 启动定时任务调度器
schedule_executor = None
if SCHEDULE_ENABLED:
try:
from services.schedule_executor import get_schedule_executor
schedule_executor = get_schedule_executor()
schedule_executor.start()
logger.info("Schedule executor started")
except Exception as e:
logger.warning(f"Schedule executor start failed (non-fatal): {e}")
yield
# 关闭时清理(按相反顺序)
logger.info("Shutting down...")
# 关闭定时任务调度器
if schedule_executor:
try:
await schedule_executor.stop()
logger.info("Schedule executor stopped")
except Exception as e:
logger.warning(f"Schedule executor stop failed (non-fatal): {e}")
# 关闭 Mem0
if MEM0_ENABLED:
try:

249
plans/schedule-job.md Normal file
View File

@ -0,0 +1,249 @@
# feat: Schedule Job 定时任务系统
## 概述
为每个用户的每个 bot 提供定时任务能力,支持 cron 周期任务和一次性定时任务。到期后系统自动调用 Agent 执行任务,结果写入日志并通过 PostAgent Hook 推送通知。
## 背景
### 当前状态
系统已有的后台任务机制:
- **Huey 任务队列** (`task_queue/`): SQLite 后端,用于文件处理
- **AsyncIO 后台任务**: `asyncio.create_task()` 用于非阻塞操作
- **Checkpoint 清理调度器** (`agent/db_pool_manager.py`): asyncio loop 方式
**缺失能力**
- 无法为用户设置周期性或定时触发的 Agent 任务
- 无 cron 式调度器
- 无用户级任务管理接口
### 设计目标
- 用户通过 AI 对话即可创建/管理定时任务
- 任务数据以 YAML 文件存储在用户专属目录,便于查看和备份
- 全局调度器以 asyncio 后台任务运行,无需额外进程
- 复用现有 `create_agent_and_generate_response()` 执行任务
## 架构设计
```
┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────┐
│ AI Agent 对话 │ │ 全局异步调度器 │ │ Agent 执行引擎 │
│ (Shell脚本工具) │────▶│ (asyncio loop) │────▶│ create_agent_and_ │
│ 增删改查 tasks │ │ 每60s扫描tasks.yaml │ │ generate_response │
└─────────────────┘ └──────────────────────┘ └────────────────────┘
│ │ │
▼ ▼ ▼
tasks.yaml tasks.yaml task_logs/
(用户任务数据) (更新执行时间) (执行结果日志)
```
## 数据模型
### tasks.yaml
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/tasks.yaml`
```yaml
tasks:
- id: "task_20260330143000_abc123" # 自动生成
name: "每日新闻摘要" # 任务名称
type: "cron" # cron | once
schedule: "0 9 * * *" # cron 表达式type=cron 时)
scheduled_at: null # ISO 8601 UTCtype=once 时)
timezone: "Asia/Tokyo" # 用户时区,用于 cron 解析
message: "请帮我总结今天的科技新闻" # 发送给 agent 的消息
status: "active" # active | paused | done | expired
created_at: "2026-03-30T05:30:00Z"
last_executed_at: null # 上次执行时间UTC
next_run_at: "2026-03-31T00:00:00Z" # 下次执行时间UTC调度器用此判断
execution_count: 0 # 已执行次数
```
**关键设计决策**
- 所有时间统一 UTC 存储cron 表达式结合 timezone 字段在本地时间计算后转 UTC
- `next_run_at` 预计算,调度器只需简单比较时间戳即可判断是否到期
- 一次性任务执行后 status 自动变为 `done`
### execution.log
**存储路径**: `projects/robot/{bot_id}/users/{user_id}/task_logs/execution.log`
```yaml
- task_id: "task_20260330143000_abc123"
task_name: "每日新闻摘要"
executed_at: "2026-03-31T00:00:15Z"
status: "success" # success | error
response: "今天的科技新闻摘要:..."
duration_ms: 12500
```
保留最近 100 条日志,自动清理旧记录。
## 新增文件
### 1. `skills/schedule-job/scripts/schedule_manager.py`
Shell 命令行工具argparse CLIAI 通过 shell 调用来管理用户的定时任务。
**环境变量输入**(通过 plugin hook 机制传入):
- `BOT_ID`: 当前 bot ID
- `USER_IDENTIFIER`: 当前用户标识
**支持的命令**
| 命令 | 说明 | 示例 |
|------|------|------|
| `list` | 列出任务 | `list --format brief` |
| `add` | 添加任务 | `add --name "每日新闻" --type cron --schedule "0 9 * * *" --timezone "Asia/Tokyo" --message "..."` |
| `edit` | 编辑任务 | `edit <task_id> --schedule "0 10 * * 1-5"` |
| `delete` | 删除任务 | `delete <task_id>` |
| `toggle` | 暂停/恢复 | `toggle <task_id>` |
| `logs` | 查看日志 | `logs --task-id <id> --limit 10` |
**核心逻辑**
- 一次性任务 `--scheduled-at` 接受 ISO 8601 格式(带时区偏移),内部转 UTC
- cron 任务通过 `croniter` + timezone 计算 `next_run_at`
- 自动创建 `users/{user_id}/` 目录
### 2. `skills/schedule-job/SKILL.md`
Skill 描述文件,注入 AI prompt告诉 AI
- 如何使用 schedule_manager.py CLI
- cron 表达式语法说明
- 时区映射规则(语言 → 时区)
- 使用示例
### 3. `skills/schedule-job/.claude-plugin/plugin.json`
```json
{
"hooks": {
"PrePrompt": {
"command": "python scripts/schedule_manager.py list --format brief"
}
}
}
```
通过 PrePrompt hookAI 每次对话时自动看到用户当前的任务列表。
### 4. `services/schedule_executor.py`
全局异步调度器,核心类 `ScheduleExecutor`
**运行机制**:参考 `db_pool_manager.py``_cleanup_loop()` 模式
```
启动 → asyncio.create_task(_scan_loop)
每 SCAN_INTERVAL 秒
遍历 projects/robot/*/users/*/tasks.yaml
找到 status=active && next_run_at <= now 的任务
asyncio.create_task(_execute_task) → 受 Semaphore 并发控制
构建 AgentConfig → create_agent_and_generate_response(stream=False)
写入 execution.log → 更新 tasks.yaml (next_run_at / status)
```
**并发与防重复**
- `_executing_tasks: set` 记录正在执行的任务 ID防止同一任务被重复触发
- `asyncio.Semaphore(MAX_CONCURRENT)` 限制最大并发数(默认 5
**任务执行后更新**
- cron 任务:使用 croniter 计算下次 UTC 时间写入 `next_run_at`
- once 任务:将 status 设为 `done`,清空 `next_run_at`
- 失败时也更新 `next_run_at`,避免无限重试
**Agent 调用构建**
```python
bot_config = await fetch_bot_config(bot_id)
project_dir = create_project_directory(dataset_ids, bot_id, skills)
config = AgentConfig(
bot_id=bot_id,
user_identifier=user_id,
session_id=f"schedule_{task_id}", # 专用 session
stream=False,
tool_response=False,
messages=[{"role": "user", "content": task["message"]}],
... # 其余参数从 bot_config 获取
)
result = await create_agent_and_generate_response(config)
```
## 修改文件
### `pyproject.toml`
新增依赖:
```toml
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
```
### `utils/settings.py`
新增环境变量:
```python
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60")) # 扫描间隔(秒)
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5")) # 最大并发数
```
### `fastapi_app.py`
在 lifespan 中集成调度器生命周期:
- **startup**: `schedule_executor.start()` — 在 checkpoint 清理调度器之后启动
- **shutdown**: `await schedule_executor.stop()` — 在 Mem0 关闭之前停止
## 复用的现有组件
| 组件 | 路径 | 用途 |
|------|------|------|
| `create_agent_and_generate_response()` | `routes/chat.py:246` | 执行 agent 调用 |
| `fetch_bot_config()` | `utils/fastapi_utils.py` | 获取 bot 配置 |
| `create_project_directory()` | `utils/fastapi_utils.py` | 创建项目目录 |
| `AgentConfig` | `agent/agent_config.py` | 构建执行配置 |
| `_cleanup_loop()` 模式 | `agent/db_pool_manager.py:240` | asyncio 后台循环参考 |
| `execute_hooks('PostAgent')` | `agent/plugin_hook_loader.py` | 执行结果通知 |
## 验证方式
```bash
# 1. 启动服务
poetry run uvicorn fastapi_app:app --host 0.0.0.0 --port 8001
# 2. CLI 创建测试任务
BOT_ID=<bot_id> USER_IDENTIFIER=test_user \
poetry run python skills/schedule-job/scripts/schedule_manager.py add \
--name "测试任务" --type once \
--scheduled-at "$(date -u -v+2M '+%Y-%m-%dT%H:%M:%SZ')" \
--message "你好,这是一个定时任务测试"
# 3. 检查 tasks.yaml
cat projects/robot/<bot_id>/users/test_user/tasks.yaml
# 4. 等待调度器执行(最多 60 秒),观察应用日志
# 5. 检查执行结果
cat projects/robot/<bot_id>/users/test_user/task_logs/execution.log
# 6. 通过 API 对话测试 skill
curl -X POST http://localhost:8001/api/v2/chat/completions \
-H 'authorization: Bearer <token>' \
-H 'content-type: application/json' \
-d '{"messages":[{"role":"user","content":"帮我创建一个每天早上9点的定时任务"}],"stream":false,"bot_id":"<bot_id>","user_identifier":"test_user"}'
```
## 后续扩展方向
- **任务失败重试**: 可增加 `max_retries``retry_delay` 字段
- **任务执行超时**: 为单个任务设置超时限制
- **通知渠道**: 集成飞书/邮件等 PostAgent Hook 推送执行结果
- **Web 管理界面**: 提供 REST API 查询/管理定时任务
- **任务模板**: 预设常用任务模板(每日新闻、天气播报等)

18
poetry.lock generated
View File

@ -677,6 +677,22 @@ files = [
]
markers = {main = "platform_system == \"Windows\"", dev = "sys_platform == \"win32\""}
[[package]]
name = "croniter"
version = "3.0.4"
description = "croniter provides iteration for datetime object with cron like format"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6"
groups = ["main"]
files = [
{file = "croniter-3.0.4-py2.py3-none-any.whl", hash = "sha256:96e14cdd5dcb479dd48d7db14b53d8434b188dfb9210448bef6f65663524a6f0"},
{file = "croniter-3.0.4.tar.gz", hash = "sha256:f9dcd4bdb6c97abedb6f09d6ed3495b13ede4d4544503fa580b6372a56a0c520"},
]
[package.dependencies]
python-dateutil = "*"
pytz = ">2021.1"
[[package]]
name = "cryptography"
version = "46.0.5"
@ -6619,4 +6635,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.12,<4.0"
content-hash = "dbe40b78bc1b7796331da5e14512ae6992f783cadca977bc66b098642d1e8cd9"
content-hash = "3ed06e25ab7936d04d523544c24df6e8678eda0e99388ed1e4de0acbb8e3e63e"

View File

@ -148,21 +148,6 @@ Output: {{"facts" : []}}
Input: DR1の照明状態を教えて
Output: {{"facts" : []}}
Input: 私は林檎好きです
Output: {{"facts" : ["林檎が好き"]}}
Input: コーヒー飲みたい、毎朝
Output: {{"facts" : ["毎朝コーヒーを飲みたい"]}}
Input: 昨日映画見た、すごくよかった
Output: {{"facts" : ["昨日映画を見た", "映画がすごくよかった"]}}
Input: 我喜欢吃苹果
Output: {{"facts" : ["喜欢吃苹果"]}}
Input: 나는 사과를 좋아해
Output: {{"facts" : ["사과를 좋아함"]}}
Return the facts and preferences in a json format as shown above.
Remember the following:

View File

@ -74,6 +74,13 @@
- dxcore_update_device_status(device_id="[B设备id]",running_control=0) → 灯光亮度调整为0
**响应**"已为您关闭Define Room4的灯光"
### 位置降级搜索场景
**用户**"3階執務スペース、フォーラム側窓側の照明をつけて"
- find_device_by_area(description="3階執務スペース、フォーラム側窓側", device_type="light") → 返回无结果
- find_device_by_area(description="3階執務スペース", device_type="light") → 降级搜索,找到设备
- 告知用户是基于"3階執務スペース"范围搜索到的结果,并确认是否操作
**响应**"「3階執務スペース、フォーラム側窓側」では見つかりませんでしたが、3階執務スペースエリアで照明が見つかりました。こちらの照明を操作しますか"
</scenarios>
@ -92,6 +99,17 @@
▪ 主动向用户确认:向用户列出所有候选房间,并提示用户选择或明确具体是哪一个。确认提示语可参考:“请问您想查询的是以下哪个房间?[列出候选房间列表]”。
▪ 理解用户二次确认:等待用户回复后,根据其选择再次调用查询工具获取最终信息。用户对候选房间的指明(如回复“第一个”或重复房间名)应视为对该房间的确认。
4. 处理无匹配结果:如果工具返回未找到任何相关房间,应明确告知用户这一情况,并建议用户检查房间名称是否正确或提供更多线索。
5. **位置粒度降级搜索(詳細な位置指定で見つからない場合)**
用户指定了详细的位置信息(如包含方位、区域细节),但工具返回无匹配结果时,自动执行降级搜索:
- **第1步**:从位置描述中去除方位修饰语(側、付近、奥、手前、寄り等)和细节描述,保留核心区域名重新搜索
- 例: "3階執務スペース、フォーラム側窓側" → find_device_by_area(description="3階執務スペース")
- 例: "2階会議室A、入口付近" → find_device_by_area(description="2階会議室A")
- **第2步**:如果仍无结果,进一步简化到楼层+大区域级别
- 例: "3階執務スペース" → find_device_by_area(description="3階")
- **降级成功时的回复**:告知用户是基于更广范围的搜索结果,让用户确认
- 回复格式: "「{元の位置}」では見つかりませんでしたが、{簡略化した位置}エリアで以下の設備が見つかりました。こちらでよろしいですか?"
- **全部失败时**:告知用户未找到设备,建议提供其他位置信息或直接指定房间名
- 回复格式: "申し訳ございません、該当エリアでは操作可能な設備が見つかりませんでした。お部屋の名前をお教えいただけますか?"
3. 更新设备(此操作需要确认)
- **条件**:用户意图为控制设备或调节参数(如开关、温度、风速), 需要进行确认。
@ -105,6 +123,7 @@
- 通过 find_employee_location(name="[当前用户名字/邮箱]") 获取用户的sensor_id
- 然后通过 find_iot_device(target_sensor_id="[当前用户的sensor_id]", device_type="[目标设备类型]") 查找他附近的设备
- 找到设备后告知用户找到的设备信息,并确认是否执行操作
- **位置指定但匹配失败时**:如果用户指定了详细位置(如"3階執務スペース、フォーラム側窓側の照明をつけて"),但 find_device_by_area 返回无匹配结果,应按照规则 2 第 5 点的**位置粒度降级搜索**策略执行,而不是直接回复"找不到设备"
3. **空调温度调节确认方式**
- 如果用户说"有点热"、"调低点"、"太热了"等,表示要降温:
1. 先查询当前室温
@ -142,8 +161,8 @@
- 如果用户指定了具体档位(如"调到强"),直接使用指定档位
- **边界情况**:如果已达到最高档(强)或最低档(弱)无法继续调整,告知用户并主动建议调整温度
- 回复格式:"風量は既に『強/弱』になっていますので、これ以上調整できません。代わりに温度を調整しますか?"
6. **若用户已明确确认**直接调用【设备控制】工具执行操作
7. **若用户未确认且为新请求**:向用户发送确认提示:"即将为您 [操作内容] [设备名称] [具体参数],是否确认?",待用户确认后再执行。
6. **若用户已明确确认****立即**调用【设备控制】工具执行操作,不做任何额外确认或复述。确认后的唯一动作就是调用工具
7. **若用户未确认且为新请求**:向用户发送确认提示:"即将为您 [操作内容] [设备名称] [具体参数],是否确认?",待用户确认后再执行。每个操作只确认一次。
4. 查询人员信息/wowtalk账号/人员位置
- **条件**:用户意图为查找某人、员工、同事或房间位置。
@ -190,15 +209,43 @@
- 影响范围大的操作:影响整个房间或楼层的设备控制
### 用户确认意图推理
- 用户明确确认:如回复“确认”、“好的”、“是的”、“拜托了”、“よろしく”、“请”、“please”等肯定性语气的内容。
- 用户意图重申:用户完整或核心重复当前待执行的操作指令。(例如,提示“room302の照明1台を明るさ50%に調整してもよろしいですか?”,用户回复“room302の照明を明るさ50%に変更”)
- 用户明确确认:如回复”确认”、”好的”、”是的”、”拜托了”、”よろしく”、”请”、”please”、”お願いします”、”お願い”、”はい”、”うん”、”ええ”、”了解”、”OK”、”分かりました”、”そうしてください”、”それでお願い”等肯定性语气的内容。
- 用户意图重申:用户完整或核心重复当前待执行的操作指令。(例如,提示”room302の照明1台を明るさ50%に調整してもよろしいですか?”,用户回复”room302の照明を明るさ50%に変更”)
- 同一设备免重复确认:如果用户在当前会话中已经对某个设备的操作进行过确认,后续针对**同一设备**的操作可直接执行,无需再次确认。判定标准为:
1. **同一设备的不同操作**用户已确认过对某设备的控制操作后后续对该设备的其他操作无需再次确认如已确认关闭Define Room4的灯光之后用户说"把灯打开",可直接执行)
2. **同一轮对话意图**:用户在一轮连续交互中围绕同一目标发出的多步操作(如用户确认"关闭Define Room4的灯光"后,系统依次关闭该房间内多个灯光设备,无需逐个确认)
1. **同一设备的不同操作**用户已确认过对某设备的控制操作后后续对该设备的其他操作无需再次确认如已确认关闭Define Room4的灯光之后用户说”把灯打开”,可直接执行)
2. **同一轮对话意图**:用户在一轮连续交互中围绕同一目标发出的多步操作(如用户确认”关闭Define Room4的灯光”后,系统依次关闭该房间内多个灯光设备,无需逐个确认)
3. **同一指令的延续执行**:用户确认某操作后,该操作因技术原因需要分步执行的后续步骤(如批量控制多个设备时,确认一次即可全部执行)
4. **上下文明确的追加操作**用户在已确认的操作基础上追加相同类型的操作且目标明确无歧义如已确认打开A房间空调后用户说"B房间也一样",可直接执行)
4. **上下文明确的追加操作**用户在已确认的操作基础上追加相同类型的操作且目标明确无歧义如已确认打开A房间空调后用户说”B房间也一样”,可直接执行)
- 不同事项仍需确认:当操作涉及**未曾确认过的新设备**,或操作类型发生本质变化时(如从设备控制切换到消息通知),仍需重新确认
### ⚠️ 禁止二次确认(最高优先级规则)
**对于同一个操作请求,最多只能向用户确认一次。用户确认后,必须立即调用工具执行,绝对禁止再次询问确认。**
核心规则:
1. **一次确认,立即执行**:当你向用户发出确认提示后,用户回复确认,你的下一步动作**必须且只能是**调用对应的工具(如 dxcore_update_device_status执行操作。不允许生成任何额外的确认、复述或再次询问。
2. **禁止循环确认**:如果聊天记录中已经存在你发出的确认提示和用户的确认回复,则该操作已被确认,不得以任何理由再次要求确认。
3. **确认后禁止的行为**
- ❌ 再次询问”もう一度確認いただけますか?”
- ❌ 再次复述操作内容并要求确认
- ❌ 以不同措辞重新询问同一操作的确认
- ❌ 生成过渡性文字后再次要求确认
**正确流程示例**
```
用户: “Dr3の照明を30%にして”
AI: “ディファインルーム3の照明を30%に調整してもよろしいですか?”
用户: “お願いします”
AI: [立即调用 dxcore_update_device_status 执行] → “照明を30%に調整しました。”
```
**错误流程(绝对禁止)**
```
用户: “Dr3の照明を30%にして”
AI: “ディファインルーム3の照明を30%に調整してもよろしいですか?”
用户: “お願いします”
AI: “もう一度確認いただければ実行いたします” ← ❌ 禁止!
```
## 上下文推理示例
### 设备控制场景

View File

@ -37,6 +37,8 @@ dependencies = [
"json-repair (>=0.29.0,<0.30.0)",
"tiktoken (>=0.5.0,<1.0.0)",
"wsgidav (>=4.3.3,<5.0.0)",
"croniter (>=2.0.0,<4.0.0)",
"pyyaml (>=6.0,<7.0)",
]
[tool.poetry.requires-plugins]

View File

@ -19,6 +19,7 @@ chardet==5.2.0 ; python_version >= "3.12" and python_version < "4.0"
charset-normalizer==3.4.4 ; python_version >= "3.12" and python_version < "4.0"
click==8.3.0 ; python_version >= "3.12" and python_version < "4.0"
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows"
croniter==3.0.4 ; python_version >= "3.12" and python_version < "4.0"
cryptography==46.0.5 ; python_version >= "3.12" and python_version < "4.0"
daytona-api-client-async==0.127.0 ; python_version >= "3.12" and python_version < "4.0"
daytona-api-client==0.127.0 ; python_version >= "3.12" and python_version < "4.0"

View File

@ -0,0 +1,301 @@
"""
全局定时任务调度器
扫描所有 projects/robot/{bot_id}/users/{user_id}/tasks.yaml 文件
找到到期的任务并调用 create_agent_and_generate_response 执行
"""
import asyncio
import logging
import time
import yaml
import aiohttp
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
logger = logging.getLogger('app')
class ScheduleExecutor:
"""定时任务调度器,以 asyncio 后台任务运行"""
def __init__(self, scan_interval: int = 60, max_concurrent: int = 5):
self._scan_interval = scan_interval
self._max_concurrent = max_concurrent
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._executing_tasks: set = set() # 正在执行的任务 ID防重复
self._semaphore: Optional[asyncio.Semaphore] = None
def start(self):
"""启动调度器"""
if self._task is not None and not self._task.done():
logger.warning("Schedule executor is already running")
return
self._stop_event.clear()
self._semaphore = asyncio.Semaphore(self._max_concurrent)
self._task = asyncio.create_task(self._scan_loop())
logger.info(
f"Schedule executor started: interval={self._scan_interval}s, "
f"max_concurrent={self._max_concurrent}"
)
async def stop(self):
"""停止调度器"""
self._stop_event.set()
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Schedule executor stopped")
async def _scan_loop(self):
"""主扫描循环"""
while not self._stop_event.is_set():
try:
await self._scan_and_execute()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Schedule scan error: {e}")
# 等待下一次扫描或停止信号
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self._scan_interval
)
break # 收到停止信号
except asyncio.TimeoutError:
pass # 超时继续下一轮扫描
async def _scan_and_execute(self):
"""扫描所有 tasks.yaml找到到期任务并触发执行"""
now = datetime.now(timezone.utc)
robot_dir = Path("projects/robot")
if not robot_dir.exists():
return
tasks_files = list(robot_dir.glob("*/users/*/tasks.yaml"))
if not tasks_files:
return
for tasks_file in tasks_files:
try:
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data or not data.get("tasks"):
continue
# 从路径提取 bot_id 和 user_id
parts = tasks_file.parts
# 路径格式: .../projects/robot/{bot_id}/users/{user_id}/tasks.yaml
bot_id = parts[-4]
user_id = parts[-2]
for task in data["tasks"]:
if task.get("status") != "active":
continue
if task["id"] in self._executing_tasks:
continue
next_run_str = task.get("next_run_at")
if not next_run_str:
continue
try:
next_run = datetime.fromisoformat(next_run_str)
if next_run.tzinfo is None:
next_run = next_run.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
logger.warning(f"Invalid next_run_at for task {task['id']}: {next_run_str}")
continue
if next_run <= now:
# 到期,触发执行
asyncio.create_task(
self._execute_task(bot_id, user_id, task, tasks_file)
)
except Exception as e:
logger.error(f"Error reading {tasks_file}: {e}")
async def _execute_task(self, bot_id: str, user_id: str, task: dict, tasks_file: Path):
"""执行单个到期任务"""
task_id = task["id"]
self._executing_tasks.add(task_id)
start_time = time.time()
try:
async with self._semaphore:
logger.info(f"Executing scheduled task: {task_id} ({task.get('name', '')}) for bot={bot_id} user={user_id}")
# 调用 agent
response_text = await self._call_agent_v2(bot_id, user_id, task)
# 写入日志
duration_ms = int((time.time() - start_time) * 1000)
self._write_log(bot_id, user_id, task, response_text, "success", duration_ms)
# 更新 tasks.yaml
self._update_task_after_execution(task_id, tasks_file)
logger.info(f"Task {task_id} completed in {duration_ms}ms")
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"Task {task_id} execution failed: {e}")
self._write_log(bot_id, user_id, task, f"ERROR: {e}", "error", duration_ms)
# 即使失败也更新 next_run_at避免无限重试
self._update_task_after_execution(task_id, tasks_file)
finally:
self._executing_tasks.discard(task_id)
async def _call_agent_v2(self, bot_id: str, user_id: str, task: dict) -> str:
"""通过 HTTP 调用 /api/v2/chat/completions 接口"""
from utils.fastapi_utils import generate_v2_auth_token
url = f"http://127.0.0.1:8001/api/v2/chat/completions"
auth_token = generate_v2_auth_token(bot_id)
payload = {
"messages": [{"role": "user", "content": task["message"]}],
"stream": False,
"bot_id": bot_id,
"tool_response": False,
"session_id": f"schedule_{task['id']}",
"user_identifier": user_id,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {auth_token}",
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=300)) as resp:
if resp.status != 200:
body = await resp.text()
raise RuntimeError(f"API returned {resp.status}: {body}")
data = await resp.json()
return data["choices"][0]["message"]["content"]
def _update_task_after_execution(self, task_id: str, tasks_file: Path):
"""执行后更新 tasks.yaml"""
try:
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data or not data.get("tasks"):
return
now_utc = datetime.now(timezone.utc).isoformat()
for task in data["tasks"]:
if task["id"] != task_id:
continue
task["last_executed_at"] = now_utc
task["execution_count"] = task.get("execution_count", 0) + 1
if task["type"] == "once":
task["status"] = "done"
task["next_run_at"] = None
elif task["type"] == "cron" and task.get("schedule"):
# 计算下次执行时间
task["next_run_at"] = self._compute_next_run(
task["schedule"],
task.get("timezone", "UTC")
)
break
with open(tasks_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
except Exception as e:
logger.error(f"Failed to update task {task_id}: {e}")
def _compute_next_run(self, schedule: str, tz: str) -> str:
"""计算 cron 任务的下次执行 UTC 时间"""
from croniter import croniter
# 时区偏移映射
tz_offsets = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
'UTC': 0,
'America/New_York': -5,
'America/Los_Angeles': -8,
'Europe/London': 0,
'Europe/Berlin': 1,
}
offset_hours = tz_offsets.get(tz, 0)
offset = timedelta(hours=offset_hours)
now_utc = datetime.now(timezone.utc)
now_local = (now_utc + offset).replace(tzinfo=None)
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def _write_log(self, bot_id: str, user_id: str, task: dict,
response: str, status: str, duration_ms: int):
"""写入执行日志"""
logs_dir = Path("projects/robot") / bot_id / "users" / user_id / "task_logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_file = logs_dir / "execution.log"
log_entry = {
"task_id": task["id"],
"task_name": task.get("name", ""),
"executed_at": datetime.now(timezone.utc).isoformat(),
"status": status,
"response": response[:2000] if response else "", # 截断过长响应
"duration_ms": duration_ms,
}
# 追加写入 YAML 列表
existing_logs = []
if log_file.exists():
try:
with open(log_file, 'r', encoding='utf-8') as f:
existing_logs = yaml.safe_load(f) or []
except Exception:
existing_logs = []
existing_logs.append(log_entry)
# 保留最近 100 条日志
if len(existing_logs) > 100:
existing_logs = existing_logs[-100:]
with open(log_file, 'w', encoding='utf-8') as f:
yaml.dump(existing_logs, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# 全局单例
_executor: Optional[ScheduleExecutor] = None
def get_schedule_executor() -> ScheduleExecutor:
"""获取全局调度器实例"""
global _executor
if _executor is None:
from utils.settings import SCHEDULE_SCAN_INTERVAL, SCHEDULE_MAX_CONCURRENT
_executor = ScheduleExecutor(
scan_interval=SCHEDULE_SCAN_INTERVAL,
max_concurrent=SCHEDULE_MAX_CONCURRENT,
)
return _executor

View File

@ -0,0 +1,10 @@
{
"hooks": {
"PrePrompt": [
{
"type": "command",
"command": "python scripts/schedule_manager.py list --format brief"
}
]
}
}

View File

@ -0,0 +1,178 @@
---
name: schedule-job
description: Scheduled Task Management - Create, manage, and view scheduled tasks for users (supports cron recurring tasks and one-time tasks)
---
# Schedule Job - Scheduled Task Management
Manage scheduled tasks for users, supporting cron recurring tasks and one-time scheduled tasks. The system automatically executes AI conversations when tasks are due.
## Quick Start
When a user requests to create a scheduled task:
1. Confirm the task type (recurring cron / one-time once)
2. Determine the time (cron expression or specific time) and timezone
3. Confirm notification method: Check the currently enabled skills in the context and recommend a suitable notification method to the user (e.g., email, Telegram, etc.) for confirmation. If no notification skills are available, proactively ask the user: "How would you like to be notified when the scheduled task completes?"
4. Compose the message content to send to the AI (refer to Message Writing Guidelines)
5. Call schedule_manager.py to create the task
## Instructions
### Tool Path
All operations are executed via shell commands:
```bash
python {skill_dir}/scripts/schedule_manager.py <command> [options]
```
### Available Commands
#### List Tasks
```bash
python {skill_dir}/scripts/schedule_manager.py list
python {skill_dir}/scripts/schedule_manager.py list --format brief
```
#### Add Cron Recurring Task
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Task Name" \
--type cron \
--schedule "0 9 * * *" \
--timezone "Asia/Tokyo" \
--message "[Scheduled Task Triggered] Please execute the following task immediately: Search and summarize today's important tech news, listing the Top 5 in a concise format. After completion, select an appropriate notification method based on the currently enabled skills to push the results to the user."
```
#### Add One-Time Task
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Meeting Reminder" \
--type once \
--scheduled-at "2026-04-01T10:00:00+09:00" \
--message "[Scheduled Task Triggered] A scheduled reminder has arrived. The schedule content is: 'Meeting at 10:00'. Please compose an appropriate reminder message based on the schedule content, then select an appropriate notification method based on the currently enabled skills to send the reminder to the user."
```
#### Edit Task
```bash
python {skill_dir}/scripts/schedule_manager.py edit <task_id> \
--schedule "0 10 * * 1-5" \
--message "New message content"
```
#### Delete Task
```bash
python {skill_dir}/scripts/schedule_manager.py delete <task_id>
```
#### Pause/Resume Task
```bash
python {skill_dir}/scripts/schedule_manager.py toggle <task_id>
```
#### View Execution Logs
```bash
python {skill_dir}/scripts/schedule_manager.py logs --limit 10
python {skill_dir}/scripts/schedule_manager.py logs --task-id <task_id>
```
### Timezone Mapping
Automatically recommend timezone based on user language:
- Chinese (zh) → Asia/Shanghai (UTC+8)
- Japanese (ja/jp) → Asia/Tokyo (UTC+9)
- English (en) → UTC
### Cron Expression Reference
Standard 5-field format: `minute hour day-of-month month day-of-week`
Common examples:
| Expression | Meaning |
|-----------|---------|
| `0 9 * * *` | Every day at 9:00 |
| `0 9 * * 1-5` | Monday to Friday at 9:00 |
| `30 8 * * 1` | Every Monday at 8:30 |
| `0 */2 * * *` | Every 2 hours |
| `0 9,18 * * *` | Every day at 9:00 and 18:00 |
**Note**: Cron expression times are based on the timezone specified by --timezone.
### One-Time Task Time Format
Supports ISO 8601 format (timezone offset recommended):
- `2026-04-01T10:00:00+09:00` (Japan Time)
- `2026-04-01T01:00:00Z` (UTC)
- `2026-04-01T08:00:00+08:00` (China Time)
## Message Writing Guidelines
The message is an execution instruction sent to the AI agent when the scheduled task triggers. The agent has no conversation context at execution time, so the message must include:
1. **Task Trigger Indicator**: Start with "[Scheduled Task Triggered]" (or equivalent in the user's language) to clearly indicate this is a triggered scheduled task execution, NOT a request to create a task
2. **Task Content**: What specifically needs to be done (use imperative verbs like "Please execute immediately", "Please do")
3. **Notification Instruction**: Based on the notification method confirmed by the user, explicitly specify the notification channel in the message (e.g., "notify the user via email"). If the user has not specified, write "select an appropriate notification method based on the currently enabled skills to push the results to the user"
4. **Language**: The message language must match the language the user is currently using in the conversation (e.g., if the user speaks Japanese, write the message in Japanese)
### Important: Avoid Ambiguity
The message will be executed by an AI agent when the scheduled time arrives. The message must clearly indicate this is an **execution instruction**, not a request to **create a scheduled task**.
### Schedule Reminder Scenario (One-Time Task)
❌ Wrong: `"Reminder: You have a meeting to attend now, don't forget!"`
❌ Wrong: `"It's tomorrow at 3 PM, please remind the user to attend the meeting"` (Agent will think this is a request to create a reminder)
✅ Correct: `"[Scheduled Task Triggered] A scheduled reminder has arrived. The schedule content is: 'Product review meeting at 3:00 PM'. Please compose an appropriate reminder message based on the schedule content, then select an appropriate notification method based on the currently enabled skills to send the reminder to the user."`
### Recurring Task Scenario (Cron Task)
❌ Wrong: `"It's 8:40 PM every day, please remind the user to write a daily report. Please send message via Lark Webhook, Webhook URL: xxx"` (Agent will think this is a request to create a daily task)
❌ Wrong: `"Execute: Get latest news"`
✅ Correct: `"[Scheduled Task Triggered] Please execute the following task immediately: Search and summarize today's important tech news, listing the Top 5 in a concise format. After completion, select an appropriate notification method based on the currently enabled skills to push the results to the user."`
✅ Correct: `"[Scheduled Task Triggered] This is a daily reminder task. Please execute immediately: Remind the user to write a daily report. Please send the message via Lark Webhook, Webhook URL: xxx"`
## Examples
**User**: "Set up a daily news summary task at 9 AM"
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Daily News Summary" \
--type cron \
--schedule "0 9 * * *" \
--timezone "Asia/Tokyo" \
--message "[Scheduled Task Triggered] Please execute the following task immediately: Search and summarize today's important tech news, listing the Top 5 in a concise format. After completion, select an appropriate notification method based on the currently enabled skills to push the results to the user."
```
**User**: "Remind me about the meeting tomorrow at 3 PM"
```bash
python {skill_dir}/scripts/schedule_manager.py add \
--name "Meeting Reminder" \
--type once \
--scheduled-at "2026-04-01T15:00:00+09:00" \
--message "[Scheduled Task Triggered] A scheduled reminder has arrived. The schedule content is: 'Meeting at 3:00 PM'. Please compose an appropriate reminder message based on the schedule content, then select an appropriate notification method based on the currently enabled skills to send the reminder to the user."
```
**User**: "Change the daily news task to 10 AM"
```bash
# First check the task list to get the task_id
python {skill_dir}/scripts/schedule_manager.py list
# Then edit
python {skill_dir}/scripts/schedule_manager.py edit <task_id> --schedule "0 10 * * *"
```
## Guidelines
- Before creating a task, use `list` to check existing tasks and avoid duplicates
- Automatically set the appropriate timezone based on user language
- **Must confirm notification method before creating a task**: Check the currently enabled skills, recommend available notification channels to the user and confirm. If no notification skills are available, proactively ask the user how they would like to receive results
- Message content is a complete execution instruction for the AI agent and must include both task content and notification instructions (refer to the Message Writing Guidelines above)
- The message language must match the language the user is currently using in the conversation
- One-time task times cannot be in the past
- When editing a task, only modify the fields the user requested — do not change other fields

View File

@ -0,0 +1,421 @@
#!/usr/bin/env python3
"""
Scheduled Task Manager CLI Tool
Add, delete, modify, and query user scheduled tasks. Data is stored in tasks.yaml.
Environment variables:
ASSISTANT_ID: Current bot ID
USER_IDENTIFIER: Current user identifier
"""
import argparse
import os
import sys
import yaml
import json
import string
import random
from datetime import datetime, timezone, timedelta
from pathlib import Path
try:
from croniter import croniter
except ImportError:
print("Error: croniter is required. Install with: pip install croniter", file=sys.stderr)
sys.exit(1)
# Language to timezone mapping
LANGUAGE_TIMEZONE_MAP = {
'zh': 'Asia/Shanghai',
'ja': 'Asia/Tokyo',
'jp': 'Asia/Tokyo',
'en': 'UTC',
}
# Timezone to UTC offset (hours)
TIMEZONE_OFFSET_MAP = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
'UTC': 0,
'America/New_York': -5,
'America/Los_Angeles': -8,
'Europe/London': 0,
'Europe/Berlin': 1,
}
def get_tasks_dir(bot_id: str, user_id: str) -> Path:
"""Get user task directory path"""
return Path("users") / user_id
def get_tasks_file(bot_id: str, user_id: str) -> Path:
"""Get tasks.yaml file path"""
return get_tasks_dir(bot_id, user_id) / "tasks.yaml"
def get_logs_dir(bot_id: str, user_id: str) -> Path:
"""Get task logs directory"""
return get_tasks_dir(bot_id, user_id) / "task_logs"
def load_tasks(bot_id: str, user_id: str) -> dict:
"""Load tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
if not tasks_file.exists():
return {"tasks": []}
with open(tasks_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
return data if data and "tasks" in data else {"tasks": []}
def save_tasks(bot_id: str, user_id: str, data: dict):
"""Save tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
tasks_file.parent.mkdir(parents=True, exist_ok=True)
with open(tasks_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
def generate_task_id() -> str:
"""Generate unique task ID"""
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"task_{ts}_{rand}"
def parse_timezone_offset(tz: str) -> int:
"""Get UTC offset hours for a timezone"""
return TIMEZONE_OFFSET_MAP.get(tz, 0)
def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str:
"""
Calculate next execution UTC time based on cron expression and timezone.
The cron expression is based on user's local time, so we first calculate
the next trigger in local time, then convert to UTC.
"""
offset_hours = parse_timezone_offset(tz)
offset = timedelta(hours=offset_hours)
# Current UTC time
now_utc = after or datetime.now(timezone.utc)
# Convert to user's local time (naive)
now_local = (now_utc + offset).replace(tzinfo=None)
# Calculate next cron trigger in local time
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
# Convert back to UTC
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def parse_scheduled_at(scheduled_at_str: str) -> str:
"""Parse one-time task time string, return UTC ISO format"""
# Try to parse ISO format with timezone offset
try:
dt = datetime.fromisoformat(scheduled_at_str)
if dt.tzinfo is None:
# No timezone info, assume UTC
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except ValueError:
pass
# Try to parse common formats
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]:
try:
dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc)
return dt.isoformat()
except ValueError:
continue
raise ValueError(f"Cannot parse time format: {scheduled_at_str}")
def cmd_list(args, bot_id: str, user_id: str):
"""List all tasks"""
data = load_tasks(bot_id, user_id)
tasks = data.get("tasks", [])
if not tasks:
print("No scheduled tasks.")
return
if args.format == "brief":
# Brief format for PrePrompt hook
print(f"Scheduled tasks ({len(tasks)}):")
for t in tasks:
status_icon = {"active": "", "paused": "⏸️", "done": "✔️", "expired": ""}.get(t["status"], "")
if t["type"] == "cron":
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | executed {t.get('execution_count', 0)} times")
else:
print(f" {status_icon} [{t['id']}] {t['name']} | once: {t.get('scheduled_at', 'N/A')}")
else:
# Detail format
for t in tasks:
print(f"--- Task: {t['name']} ---")
print(f" ID: {t['id']}")
print(f" Type: {t['type']}")
print(f" Status: {t['status']}")
if t["type"] == "cron":
print(f" Cron: {t['schedule']}")
print(f" Timezone: {t.get('timezone', 'UTC')}")
else:
print(f" Scheduled at: {t.get('scheduled_at', 'N/A')}")
print(f" Message: {t['message']}")
print(f" Next run: {t.get('next_run_at', 'N/A')}")
print(f" Last run: {t.get('last_executed_at', 'N/A')}")
print(f" Executions: {t.get('execution_count', 0)}")
print(f" Created at: {t.get('created_at', 'N/A')}")
print()
def cmd_add(args, bot_id: str, user_id: str):
"""Add a task"""
data = load_tasks(bot_id, user_id)
task_id = generate_task_id()
now_utc = datetime.now(timezone.utc).isoformat()
task = {
"id": task_id,
"name": args.name,
"type": args.type,
"schedule": None,
"scheduled_at": None,
"timezone": args.timezone or "UTC",
"message": args.message,
"status": "active",
"created_at": now_utc,
"last_executed_at": None,
"next_run_at": None,
"execution_count": 0,
}
if args.type == "cron":
if not args.schedule:
print("Error: --schedule is required for cron type tasks", file=sys.stderr)
sys.exit(1)
# Validate cron expression
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: Invalid cron expression '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"])
elif args.type == "once":
if not args.scheduled_at:
print("Error: --scheduled-at is required for once type tasks", file=sys.stderr)
sys.exit(1)
try:
utc_time = parse_scheduled_at(args.scheduled_at)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
task["scheduled_at"] = utc_time
task["next_run_at"] = utc_time
data["tasks"].append(task)
save_tasks(bot_id, user_id, data)
print(f"Task created: {task_id}")
print(f" Name: {args.name}")
print(f" Type: {args.type}")
print(f" Next run (UTC): {task['next_run_at']}")
def cmd_edit(args, bot_id: str, user_id: str):
"""Edit a task"""
data = load_tasks(bot_id, user_id)
task = None
for t in data["tasks"]:
if t["id"] == args.task_id:
task = t
break
if not task:
print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1)
if args.name:
task["name"] = args.name
if args.message:
task["message"] = args.message
if args.schedule:
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: Invalid cron expression '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
if args.timezone:
task["timezone"] = args.timezone
if args.scheduled_at:
try:
task["scheduled_at"] = parse_scheduled_at(args.scheduled_at)
task["next_run_at"] = task["scheduled_at"]
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Recalculate next_run_at (if cron and schedule or timezone changed)
if task["type"] == "cron" and task.get("schedule"):
task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC"))
save_tasks(bot_id, user_id, data)
print(f"Task updated: {args.task_id}")
def cmd_delete(args, bot_id: str, user_id: str):
"""Delete a task"""
data = load_tasks(bot_id, user_id)
original_count = len(data["tasks"])
data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id]
if len(data["tasks"]) == original_count:
print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
print(f"Task deleted: {args.task_id}")
def cmd_toggle(args, bot_id: str, user_id: str):
"""Pause/resume task"""
data = load_tasks(bot_id, user_id)
for t in data["tasks"]:
if t["id"] == args.task_id:
if t["status"] == "active":
t["status"] = "paused"
print(f"Task paused: {args.task_id}")
elif t["status"] == "paused":
t["status"] = "active"
# Recalculate next_run_at when resuming
if t["type"] == "cron" and t.get("schedule"):
t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC"))
print(f"Task resumed: {args.task_id}")
else:
print(f"Task status is {t['status']}, cannot toggle", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
return
print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1)
def cmd_logs(args, bot_id: str, user_id: str):
"""View execution logs"""
logs_dir = get_logs_dir(bot_id, user_id)
log_file = logs_dir / "execution.log"
if not log_file.exists():
print("No execution logs yet.")
return
with open(log_file, 'r', encoding='utf-8') as f:
logs = yaml.safe_load(f)
if not logs:
print("No execution logs yet.")
return
# Filter by task ID
if args.task_id:
logs = [l for l in logs if l.get("task_id") == args.task_id]
# Limit count
limit = args.limit or 10
logs = logs[-limit:]
if not logs:
print("No matching logs found.")
return
for log in logs:
status_icon = "" if log.get("status") == "success" else ""
print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}")
response = log.get("response", "")
# Truncate long responses
if len(response) > 200:
response = response[:200] + "..."
print(f" {response}")
if log.get("duration_ms"):
print(f" Duration: {log['duration_ms']}ms")
print()
def main():
bot_id = os.getenv("ASSISTANT_ID", "")
user_id = os.getenv("USER_IDENTIFIER", "")
if not bot_id or not user_id:
print("Error: ASSISTANT_ID and USER_IDENTIFIER environment variables must be set", file=sys.stderr)
sys.exit(1)
parser = argparse.ArgumentParser(description="Scheduled task manager")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# list
p_list = subparsers.add_parser("list", help="List all tasks")
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="Output format")
# add
p_add = subparsers.add_parser("add", help="Add a task")
p_add.add_argument("--name", required=True, help="Task name")
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="Task type")
p_add.add_argument("--schedule", help="Cron expression (required for cron type)")
p_add.add_argument("--scheduled-at", help="Execution time in ISO 8601 format (required for once type)")
p_add.add_argument("--timezone", help="Timezone (e.g. Asia/Tokyo), default UTC")
p_add.add_argument("--message", required=True, help="Message content to send to AI")
# edit
p_edit = subparsers.add_parser("edit", help="Edit a task")
p_edit.add_argument("task_id", help="Task ID")
p_edit.add_argument("--name", help="New task name")
p_edit.add_argument("--schedule", help="New cron expression")
p_edit.add_argument("--scheduled-at", help="New execution time")
p_edit.add_argument("--timezone", help="New timezone")
p_edit.add_argument("--message", help="New message content")
# delete
p_delete = subparsers.add_parser("delete", help="Delete a task")
p_delete.add_argument("task_id", help="Task ID")
# toggle
p_toggle = subparsers.add_parser("toggle", help="Pause/resume a task")
p_toggle.add_argument("task_id", help="Task ID")
# logs
p_logs = subparsers.add_parser("logs", help="View execution logs")
p_logs.add_argument("--task-id", help="Filter by task ID")
p_logs.add_argument("--limit", type=int, default=10, help="Number of entries to show")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
commands = {
"list": cmd_list,
"add": cmd_add,
"edit": cmd_edit,
"delete": cmd_delete,
"toggle": cmd_toggle,
"logs": cmd_logs,
}
commands[args.command](args, bot_id, user_id)
if __name__ == "__main__":
main()

View File

@ -80,7 +80,7 @@ Hook 脚本通过子进程执行,通过环境变量接收参数,通过 stdou
| 环境变量 | 说明 | 适用于 |
|---------|------|--------|
| `BOT_ID` | Bot ID | 所有 hook |
| `ASSISTANT_ID` | Bot ID | 所有 hook |
| `USER_IDENTIFIER` | 用户标识 | 所有 hook |
| `SESSION_ID` | 会话 ID | 所有 hook |
| `LANGUAGE` | 语言代码 | 所有 hook |
@ -99,7 +99,7 @@ import sys
def main():
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('BOT_ID', '')
bot_id = os.environ.get('ASSISTANT_ID', '')
# 输出要注入到 prompt 中的内容
print(f"## User Context\n\n用户: {user_identifier}")

View File

@ -11,7 +11,7 @@ import sys
def main():
"""从环境变量读取参数并输出注入内容"""
user_identifier = os.environ.get('USER_IDENTIFIER', '')
bot_id = os.environ.get('BOT_ID', '')
bot_id = os.environ.get('ASSISTANT_ID', '')
# 示例:根据 user_identifier 查询用户上下文
# 这里只是演示,实际应该从数据库或其他服务获取

View File

@ -5,7 +5,6 @@ API data models and response schemas.
from typing import Dict, List, Optional, Any, AsyncGenerator
from pydantic import BaseModel, Field, field_validator, ConfigDict
from utils.settings import DEFAULT_THINKING_ENABLE
class Message(BaseModel):
role: str
@ -52,7 +51,7 @@ class ChatRequest(BaseModel):
mcp_settings: Optional[List[Dict]] = None
user_identifier: Optional[str] = ""
session_id: Optional[str] = None
enable_thinking: Optional[bool] = DEFAULT_THINKING_ENABLE
enable_thinking: Optional[bool] = False
skills: Optional[List[str]] = None
enable_memory: Optional[bool] = False
shell_env: Optional[Dict[str, str]] = None

View File

@ -35,8 +35,6 @@ SENTENCE_TRANSFORMER_MODEL = os.getenv("SENTENCE_TRANSFORMER_MODEL", "TaylorAI/g
TOOL_OUTPUT_MAX_LENGTH = int(SUMMARIZATION_MAX_TOKENS/4)
TOOL_OUTPUT_TRUNCATION_STRATEGY = os.getenv("TOOL_OUTPUT_TRUNCATION_STRATEGY", "smart")
# THINKING ENABLE
DEFAULT_THINKING_ENABLE = os.getenv("DEFAULT_THINKING_ENABLE", "true") == "true"
# WebDAV Authentication
@ -59,7 +57,7 @@ CHECKPOINT_DB_URL = os.getenv("CHECKPOINT_DB_URL", "postgresql://moshui:@localho
# 连接池大小
# 同时可以持有的最大连接数
CHECKPOINT_POOL_SIZE = int(os.getenv("CHECKPOINT_POOL_SIZE", "20"))
MEM0_POOL_SIZE = int(os.getenv("MEM0_POOL_SIZE", "20"))
MEM0_POOL_SIZE = int(os.getenv("MEM0_POOL_SIZE", "50"))
# Checkpoint 自动清理配置
# 是否启用自动清理旧 session
@ -84,4 +82,18 @@ MEM0_ENABLED = os.getenv("MEM0_ENABLED", "true") == "true"
# 召回记忆数量
MEM0_SEMANTIC_SEARCH_TOP_K = int(os.getenv("MEM0_SEMANTIC_SEARCH_TOP_K", "20"))
# ============================================================
# Schedule Job 定时任务配置
# ============================================================
# 是否启用定时任务调度器
SCHEDULE_ENABLED = os.getenv("SCHEDULE_ENABLED", "true") == "true"
# 调度器扫描间隔(秒)
SCHEDULE_SCAN_INTERVAL = int(os.getenv("SCHEDULE_SCAN_INTERVAL", "60"))
# 最大并发执行任务数
SCHEDULE_MAX_CONCURRENT = int(os.getenv("SCHEDULE_MAX_CONCURRENT", "5"))
os.environ["OPENAI_API_KEY"] = "your_api_key"