Merge branch 'feature/moshui20260330-schedule-job' into dev

This commit is contained in:
朱潮 2026-04-01 10:37:16 +08:00
commit 44db634cbb
2 changed files with 105 additions and 104 deletions

View File

@ -166,11 +166,11 @@ async def _execute_command(skill_path: str, command: str, hook_type: str, config
try:
# 设置环境变量,传递给子进程
env = os.environ.copy()
env['ASSISTANT_ID'] = getattr(config, 'bot_id', '')
env['USER_IDENTIFIER'] = getattr(config, 'user_identifier', '')
env['TRACE_ID'] = getattr(config, 'trace_id', '')
env['SESSION_ID'] = getattr(config, 'session_id', '')
env['LANGUAGE'] = getattr(config, 'language', '')
env['ASSISTANT_ID'] = str(getattr(config, 'bot_id', ''))
env['USER_IDENTIFIER'] = str(getattr(config, 'user_identifier', ''))
env['TRACE_ID'] = str(getattr(config, 'trace_id', ''))
env['SESSION_ID'] = str(getattr(config, 'session_id', ''))
env['LANGUAGE'] = str(getattr(config, 'language', ''))
env['HOOK_TYPE'] = hook_type
# 合并 config 中的自定义 shell 环境变量

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3
"""
定时任务管理 CLI 工具
用于增删改查用户的定时任务数据存储在 tasks.yaml 文件中
Scheduled Task Manager CLI Tool
Add, delete, modify, and query user scheduled tasks. Data is stored in tasks.yaml.
环境变量:
ASSISTANT_ID: 当前 bot ID
USER_IDENTIFIER: 当前用户标识
Environment variables:
ASSISTANT_ID: Current bot ID
USER_IDENTIFIER: Current user identifier
"""
import argparse
@ -25,7 +25,7 @@ except ImportError:
sys.exit(1)
# 语言到时区的映射
# Language to timezone mapping
LANGUAGE_TIMEZONE_MAP = {
'zh': 'Asia/Shanghai',
'ja': 'Asia/Tokyo',
@ -33,7 +33,7 @@ LANGUAGE_TIMEZONE_MAP = {
'en': 'UTC',
}
# 时区到 UTC 偏移(小时)
# Timezone to UTC offset (hours)
TIMEZONE_OFFSET_MAP = {
'Asia/Shanghai': 8,
'Asia/Tokyo': 9,
@ -46,22 +46,22 @@ TIMEZONE_OFFSET_MAP = {
def get_tasks_dir(bot_id: str, user_id: str) -> Path:
"""获取用户任务目录路径"""
"""Get user task directory path"""
return Path("users") / user_id
def get_tasks_file(bot_id: str, user_id: str) -> Path:
"""获取 tasks.yaml 文件路径"""
"""Get tasks.yaml file path"""
return get_tasks_dir(bot_id, user_id) / "tasks.yaml"
def get_logs_dir(bot_id: str, user_id: str) -> Path:
"""获取任务日志目录"""
"""Get task logs directory"""
return get_tasks_dir(bot_id, user_id) / "task_logs"
def load_tasks(bot_id: str, user_id: str) -> dict:
"""加载 tasks.yaml"""
"""Load tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
if not tasks_file.exists():
return {"tasks": []}
@ -71,7 +71,7 @@ def load_tasks(bot_id: str, user_id: str) -> dict:
def save_tasks(bot_id: str, user_id: str, data: dict):
"""保存 tasks.yaml"""
"""Save tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id)
tasks_file.parent.mkdir(parents=True, exist_ok=True)
with open(tasks_file, 'w', encoding='utf-8') as f:
@ -79,53 +79,54 @@ def save_tasks(bot_id: str, user_id: str, data: dict):
def generate_task_id() -> str:
"""生成唯一任务 ID"""
"""Generate unique task ID"""
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"task_{ts}_{rand}"
def parse_timezone_offset(tz: str) -> int:
"""获取时区的 UTC 偏移小时数"""
"""Get UTC offset hours for a timezone"""
return TIMEZONE_OFFSET_MAP.get(tz, 0)
def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str:
"""
根据 cron 表达式和时区计算下次执行的 UTC 时间
Calculate next execution UTC time based on cron expression and timezone.
cron 表达式是基于用户本地时间的需要先在本地时间计算下次触发再转换为 UTC
The cron expression is based on user's local time, so we first calculate
the next trigger in local time, then convert to UTC.
"""
offset_hours = parse_timezone_offset(tz)
offset = timedelta(hours=offset_hours)
# 当前 UTC 时间
# Current UTC time
now_utc = after or datetime.now(timezone.utc)
# 转为用户本地时间naive
# Convert to user's local time (naive)
now_local = (now_utc + offset).replace(tzinfo=None)
# 在本地时间上计算下次 cron 触发
# Calculate next cron trigger in local time
cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime)
# 转回 UTC
# Convert back to UTC
next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat()
def parse_scheduled_at(scheduled_at_str: str) -> str:
"""解析一次性任务的时间字符串,返回 UTC ISO 格式"""
# 尝试解析带时区偏移的 ISO 格式
"""Parse one-time task time string, return UTC ISO format"""
# Try to parse ISO format with timezone offset
try:
dt = datetime.fromisoformat(scheduled_at_str)
if dt.tzinfo is None:
# 无时区信息,假设 UTC
# No timezone info, assume UTC
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
except ValueError:
pass
# 尝试解析常见格式
# Try to parse common formats
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]:
try:
dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc)
@ -133,49 +134,49 @@ def parse_scheduled_at(scheduled_at_str: str) -> str:
except ValueError:
continue
raise ValueError(f"无法解析时间格式: {scheduled_at_str}")
raise ValueError(f"Cannot parse time format: {scheduled_at_str}")
def cmd_list(args, bot_id: str, user_id: str):
"""列出所有任务"""
"""List all tasks"""
data = load_tasks(bot_id, user_id)
tasks = data.get("tasks", [])
if not tasks:
print("当前没有定时任务。")
print("No scheduled tasks.")
return
if args.format == "brief":
# 简洁格式,用于 PrePrompt hook
print(f"定时任务列表 ({len(tasks)}):")
# Brief format for PrePrompt hook
print(f"Scheduled tasks ({len(tasks)}):")
for t in tasks:
status_icon = {"active": "", "paused": "⏸️", "done": "✔️", "expired": ""}.get(t["status"], "")
if t["type"] == "cron":
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}")
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | executed {t.get('execution_count', 0)} times")
else:
print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}")
print(f" {status_icon} [{t['id']}] {t['name']} | once: {t.get('scheduled_at', 'N/A')}")
else:
# 详细格式
# Detail format
for t in tasks:
print(f"--- 任务: {t['name']} ---")
print(f"--- Task: {t['name']} ---")
print(f" ID: {t['id']}")
print(f" 类型: {t['type']}")
print(f" 状态: {t['status']}")
print(f" Type: {t['type']}")
print(f" Status: {t['status']}")
if t["type"] == "cron":
print(f" Cron: {t['schedule']}")
print(f" 时区: {t.get('timezone', 'UTC')}")
print(f" Timezone: {t.get('timezone', 'UTC')}")
else:
print(f" 计划时间: {t.get('scheduled_at', 'N/A')}")
print(f" 消息: {t['message']}")
print(f" 下次执行: {t.get('next_run_at', 'N/A')}")
print(f" 上次执行: {t.get('last_executed_at', 'N/A')}")
print(f" 执行次数: {t.get('execution_count', 0)}")
print(f" 创建时间: {t.get('created_at', 'N/A')}")
print(f" Scheduled at: {t.get('scheduled_at', 'N/A')}")
print(f" Message: {t['message']}")
print(f" Next run: {t.get('next_run_at', 'N/A')}")
print(f" Last run: {t.get('last_executed_at', 'N/A')}")
print(f" Executions: {t.get('execution_count', 0)}")
print(f" Created at: {t.get('created_at', 'N/A')}")
print()
def cmd_add(args, bot_id: str, user_id: str):
"""添加任务"""
"""Add a task"""
data = load_tasks(bot_id, user_id)
task_id = generate_task_id()
@ -198,20 +199,20 @@ def cmd_add(args, bot_id: str, user_id: str):
if args.type == "cron":
if not args.schedule:
print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr)
print("Error: --schedule is required for cron type tasks", file=sys.stderr)
sys.exit(1)
# 验证 cron 表达式
# Validate cron expression
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
print(f"Error: Invalid cron expression '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"])
elif args.type == "once":
if not args.scheduled_at:
print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr)
print("Error: --scheduled-at is required for once type tasks", file=sys.stderr)
sys.exit(1)
try:
utc_time = parse_scheduled_at(args.scheduled_at)
@ -223,14 +224,14 @@ def cmd_add(args, bot_id: str, user_id: str):
data["tasks"].append(task)
save_tasks(bot_id, user_id, data)
print(f"任务已创建: {task_id}")
print(f" 名称: {args.name}")
print(f" 类型: {args.type}")
print(f" 下次执行 (UTC): {task['next_run_at']}")
print(f"Task created: {task_id}")
print(f" Name: {args.name}")
print(f" Type: {args.type}")
print(f" Next run (UTC): {task['next_run_at']}")
def cmd_edit(args, bot_id: str, user_id: str):
"""编辑任务"""
"""Edit a task"""
data = load_tasks(bot_id, user_id)
task = None
@ -240,7 +241,7 @@ def cmd_edit(args, bot_id: str, user_id: str):
break
if not task:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1)
if args.name:
@ -251,7 +252,7 @@ def cmd_edit(args, bot_id: str, user_id: str):
try:
croniter(args.schedule)
except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
print(f"Error: Invalid cron expression '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1)
task["schedule"] = args.schedule
if args.timezone:
@ -264,91 +265,91 @@ def cmd_edit(args, bot_id: str, user_id: str):
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# 重新计算 next_run_at如果是 cron 且修改了 schedule 或 timezone
# Recalculate next_run_at (if cron and schedule or timezone changed)
if task["type"] == "cron" and task.get("schedule"):
task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC"))
save_tasks(bot_id, user_id, data)
print(f"任务已更新: {args.task_id}")
print(f"Task updated: {args.task_id}")
def cmd_delete(args, bot_id: str, user_id: str):
"""删除任务"""
"""Delete a task"""
data = load_tasks(bot_id, user_id)
original_count = len(data["tasks"])
data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id]
if len(data["tasks"]) == original_count:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
print(f"任务已删除: {args.task_id}")
print(f"Task deleted: {args.task_id}")
def cmd_toggle(args, bot_id: str, user_id: str):
"""暂停/恢复任务"""
"""Pause/resume task"""
data = load_tasks(bot_id, user_id)
for t in data["tasks"]:
if t["id"] == args.task_id:
if t["status"] == "active":
t["status"] = "paused"
print(f"任务已暂停: {args.task_id}")
print(f"Task paused: {args.task_id}")
elif t["status"] == "paused":
t["status"] = "active"
# 恢复时重新计算 next_run_at
# Recalculate next_run_at when resuming
if t["type"] == "cron" and t.get("schedule"):
t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC"))
print(f"任务已恢复: {args.task_id}")
print(f"Task resumed: {args.task_id}")
else:
print(f"任务状态为 {t['status']},无法切换", file=sys.stderr)
print(f"Task status is {t['status']}, cannot toggle", file=sys.stderr)
sys.exit(1)
save_tasks(bot_id, user_id, data)
return
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1)
def cmd_logs(args, bot_id: str, user_id: str):
"""查看执行日志"""
"""View execution logs"""
logs_dir = get_logs_dir(bot_id, user_id)
log_file = logs_dir / "execution.log"
if not log_file.exists():
print("暂无执行日志。")
print("No execution logs yet.")
return
with open(log_file, 'r', encoding='utf-8') as f:
logs = yaml.safe_load(f)
if not logs:
print("暂无执行日志。")
print("No execution logs yet.")
return
# 按任务 ID 过滤
# Filter by task ID
if args.task_id:
logs = [l for l in logs if l.get("task_id") == args.task_id]
# 限制数量
# Limit count
limit = args.limit or 10
logs = logs[-limit:]
if not logs:
print("没有匹配的执行日志。")
print("No matching logs found.")
return
for log in logs:
status_icon = "" if log.get("status") == "success" else ""
print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}")
response = log.get("response", "")
# 截断过长的响应
# Truncate long responses
if len(response) > 200:
response = response[:200] + "..."
print(f" {response}")
if log.get("duration_ms"):
print(f" 耗时: {log['duration_ms']}ms")
print(f" Duration: {log['duration_ms']}ms")
print()
@ -357,46 +358,46 @@ def main():
user_id = os.getenv("USER_IDENTIFIER", "")
if not bot_id or not user_id:
print("Error: ASSISTANT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr)
print("Error: ASSISTANT_ID and USER_IDENTIFIER environment variables must be set", file=sys.stderr)
sys.exit(1)
parser = argparse.ArgumentParser(description="定时任务管理工具")
subparsers = parser.add_subparsers(dest="command", help="可用命令")
parser = argparse.ArgumentParser(description="Scheduled task manager")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# list
p_list = subparsers.add_parser("list", help="列出所有任务")
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式")
p_list = subparsers.add_parser("list", help="List all tasks")
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="Output format")
# add
p_add = subparsers.add_parser("add", help="添加任务")
p_add.add_argument("--name", required=True, help="任务名称")
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型")
p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)")
p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)")
p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC")
p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容")
p_add = subparsers.add_parser("add", help="Add a task")
p_add.add_argument("--name", required=True, help="Task name")
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="Task type")
p_add.add_argument("--schedule", help="Cron expression (required for cron type)")
p_add.add_argument("--scheduled-at", help="Execution time in ISO 8601 format (required for once type)")
p_add.add_argument("--timezone", help="Timezone (e.g. Asia/Tokyo), default UTC")
p_add.add_argument("--message", required=True, help="Message content to send to AI")
# edit
p_edit = subparsers.add_parser("edit", help="编辑任务")
p_edit.add_argument("task_id", help="任务 ID")
p_edit.add_argument("--name", help="新任务名称")
p_edit.add_argument("--schedule", help="新 Cron 表达式")
p_edit.add_argument("--scheduled-at", help="新执行时间")
p_edit.add_argument("--timezone", help="新时区")
p_edit.add_argument("--message", help="新消息内容")
p_edit = subparsers.add_parser("edit", help="Edit a task")
p_edit.add_argument("task_id", help="Task ID")
p_edit.add_argument("--name", help="New task name")
p_edit.add_argument("--schedule", help="New cron expression")
p_edit.add_argument("--scheduled-at", help="New execution time")
p_edit.add_argument("--timezone", help="New timezone")
p_edit.add_argument("--message", help="New message content")
# delete
p_delete = subparsers.add_parser("delete", help="删除任务")
p_delete.add_argument("task_id", help="任务 ID")
p_delete = subparsers.add_parser("delete", help="Delete a task")
p_delete.add_argument("task_id", help="Task ID")
# toggle
p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务")
p_toggle.add_argument("task_id", help="任务 ID")
p_toggle = subparsers.add_parser("toggle", help="Pause/resume a task")
p_toggle.add_argument("task_id", help="Task ID")
# logs
p_logs = subparsers.add_parser("logs", help="查看执行日志")
p_logs.add_argument("--task-id", help="按任务 ID 过滤")
p_logs.add_argument("--limit", type=int, default=10, help="显示条数")
p_logs = subparsers.add_parser("logs", help="View execution logs")
p_logs.add_argument("--task-id", help="Filter by task ID")
p_logs.add_argument("--limit", type=int, default=10, help="Number of entries to show")
args = parser.parse_args()