421 lines
14 KiB
Python
421 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
定时任务管理 CLI 工具
|
||
用于增删改查用户的定时任务,数据存储在 tasks.yaml 文件中。
|
||
|
||
环境变量:
|
||
BOT_ID: 当前 bot ID
|
||
USER_IDENTIFIER: 当前用户标识
|
||
"""
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
import yaml
|
||
import json
|
||
import string
|
||
import random
|
||
from datetime import datetime, timezone, timedelta
|
||
from pathlib import Path
|
||
|
||
try:
|
||
from croniter import croniter
|
||
except ImportError:
|
||
print("Error: croniter is required. Install with: pip install croniter", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
# 语言到时区的映射
|
||
LANGUAGE_TIMEZONE_MAP = {
|
||
'zh': 'Asia/Shanghai',
|
||
'ja': 'Asia/Tokyo',
|
||
'jp': 'Asia/Tokyo',
|
||
'en': 'UTC',
|
||
}
|
||
|
||
# 时区到 UTC 偏移(小时)
|
||
TIMEZONE_OFFSET_MAP = {
|
||
'Asia/Shanghai': 8,
|
||
'Asia/Tokyo': 9,
|
||
'UTC': 0,
|
||
'America/New_York': -5,
|
||
'America/Los_Angeles': -8,
|
||
'Europe/London': 0,
|
||
'Europe/Berlin': 1,
|
||
}
|
||
|
||
|
||
def get_tasks_dir(bot_id: str, user_id: str) -> Path:
|
||
"""获取用户任务目录路径"""
|
||
return Path("users") / user_id
|
||
|
||
|
||
def get_tasks_file(bot_id: str, user_id: str) -> Path:
|
||
"""获取 tasks.yaml 文件路径"""
|
||
return get_tasks_dir(bot_id, user_id) / "tasks.yaml"
|
||
|
||
|
||
def get_logs_dir(bot_id: str, user_id: str) -> Path:
|
||
"""获取任务日志目录"""
|
||
return get_tasks_dir(bot_id, user_id) / "task_logs"
|
||
|
||
|
||
def load_tasks(bot_id: str, user_id: str) -> dict:
|
||
"""加载 tasks.yaml"""
|
||
tasks_file = get_tasks_file(bot_id, user_id)
|
||
if not tasks_file.exists():
|
||
return {"tasks": []}
|
||
with open(tasks_file, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
return data if data and "tasks" in data else {"tasks": []}
|
||
|
||
|
||
def save_tasks(bot_id: str, user_id: str, data: dict):
|
||
"""保存 tasks.yaml"""
|
||
tasks_file = get_tasks_file(bot_id, user_id)
|
||
tasks_file.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(tasks_file, 'w', encoding='utf-8') as f:
|
||
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
||
|
||
|
||
def generate_task_id() -> str:
|
||
"""生成唯一任务 ID"""
|
||
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
|
||
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
|
||
return f"task_{ts}_{rand}"
|
||
|
||
|
||
def parse_timezone_offset(tz: str) -> int:
|
||
"""获取时区的 UTC 偏移小时数"""
|
||
return TIMEZONE_OFFSET_MAP.get(tz, 0)
|
||
|
||
|
||
def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str:
|
||
"""
|
||
根据 cron 表达式和时区计算下次执行的 UTC 时间。
|
||
|
||
cron 表达式是基于用户本地时间的,需要先在本地时间计算下次触发,再转换为 UTC。
|
||
"""
|
||
offset_hours = parse_timezone_offset(tz)
|
||
offset = timedelta(hours=offset_hours)
|
||
|
||
# 当前 UTC 时间
|
||
now_utc = after or datetime.now(timezone.utc)
|
||
# 转为用户本地时间(naive)
|
||
now_local = (now_utc + offset).replace(tzinfo=None)
|
||
|
||
# 在本地时间上计算下次 cron 触发
|
||
cron = croniter(schedule, now_local)
|
||
next_local = cron.get_next(datetime)
|
||
|
||
# 转回 UTC
|
||
next_utc = next_local - offset
|
||
return next_utc.replace(tzinfo=timezone.utc).isoformat()
|
||
|
||
|
||
def parse_scheduled_at(scheduled_at_str: str) -> str:
|
||
"""解析一次性任务的时间字符串,返回 UTC ISO 格式"""
|
||
# 尝试解析带时区偏移的 ISO 格式
|
||
try:
|
||
dt = datetime.fromisoformat(scheduled_at_str)
|
||
if dt.tzinfo is None:
|
||
# 无时区信息,假设 UTC
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(timezone.utc).isoformat()
|
||
except ValueError:
|
||
pass
|
||
|
||
# 尝试解析常见格式
|
||
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]:
|
||
try:
|
||
dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc)
|
||
return dt.isoformat()
|
||
except ValueError:
|
||
continue
|
||
|
||
raise ValueError(f"无法解析时间格式: {scheduled_at_str}")
|
||
|
||
|
||
def cmd_list(args, bot_id: str, user_id: str):
|
||
"""列出所有任务"""
|
||
data = load_tasks(bot_id, user_id)
|
||
tasks = data.get("tasks", [])
|
||
|
||
if not tasks:
|
||
print("当前没有定时任务。")
|
||
return
|
||
|
||
if args.format == "brief":
|
||
# 简洁格式,用于 PrePrompt hook
|
||
print(f"定时任务列表 ({len(tasks)} 个):")
|
||
for t in tasks:
|
||
status_icon = {"active": "✅", "paused": "⏸️", "done": "✔️", "expired": "⏰"}.get(t["status"], "❓")
|
||
if t["type"] == "cron":
|
||
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}次")
|
||
else:
|
||
print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}")
|
||
else:
|
||
# 详细格式
|
||
for t in tasks:
|
||
print(f"--- 任务: {t['name']} ---")
|
||
print(f" ID: {t['id']}")
|
||
print(f" 类型: {t['type']}")
|
||
print(f" 状态: {t['status']}")
|
||
if t["type"] == "cron":
|
||
print(f" Cron: {t['schedule']}")
|
||
print(f" 时区: {t.get('timezone', 'UTC')}")
|
||
else:
|
||
print(f" 计划时间: {t.get('scheduled_at', 'N/A')}")
|
||
print(f" 消息: {t['message']}")
|
||
print(f" 下次执行: {t.get('next_run_at', 'N/A')}")
|
||
print(f" 上次执行: {t.get('last_executed_at', 'N/A')}")
|
||
print(f" 执行次数: {t.get('execution_count', 0)}")
|
||
print(f" 创建时间: {t.get('created_at', 'N/A')}")
|
||
print()
|
||
|
||
|
||
def cmd_add(args, bot_id: str, user_id: str):
|
||
"""添加任务"""
|
||
data = load_tasks(bot_id, user_id)
|
||
|
||
task_id = generate_task_id()
|
||
now_utc = datetime.now(timezone.utc).isoformat()
|
||
|
||
task = {
|
||
"id": task_id,
|
||
"name": args.name,
|
||
"type": args.type,
|
||
"schedule": None,
|
||
"scheduled_at": None,
|
||
"timezone": args.timezone or "UTC",
|
||
"message": args.message,
|
||
"status": "active",
|
||
"created_at": now_utc,
|
||
"last_executed_at": None,
|
||
"next_run_at": None,
|
||
"execution_count": 0,
|
||
}
|
||
|
||
if args.type == "cron":
|
||
if not args.schedule:
|
||
print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr)
|
||
sys.exit(1)
|
||
# 验证 cron 表达式
|
||
try:
|
||
croniter(args.schedule)
|
||
except (ValueError, KeyError) as e:
|
||
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
task["schedule"] = args.schedule
|
||
task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"])
|
||
|
||
elif args.type == "once":
|
||
if not args.scheduled_at:
|
||
print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr)
|
||
sys.exit(1)
|
||
try:
|
||
utc_time = parse_scheduled_at(args.scheduled_at)
|
||
except ValueError as e:
|
||
print(f"Error: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
task["scheduled_at"] = utc_time
|
||
task["next_run_at"] = utc_time
|
||
|
||
data["tasks"].append(task)
|
||
save_tasks(bot_id, user_id, data)
|
||
print(f"任务已创建: {task_id}")
|
||
print(f" 名称: {args.name}")
|
||
print(f" 类型: {args.type}")
|
||
print(f" 下次执行 (UTC): {task['next_run_at']}")
|
||
|
||
|
||
def cmd_edit(args, bot_id: str, user_id: str):
|
||
"""编辑任务"""
|
||
data = load_tasks(bot_id, user_id)
|
||
|
||
task = None
|
||
for t in data["tasks"]:
|
||
if t["id"] == args.task_id:
|
||
task = t
|
||
break
|
||
|
||
if not task:
|
||
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if args.name:
|
||
task["name"] = args.name
|
||
if args.message:
|
||
task["message"] = args.message
|
||
if args.schedule:
|
||
try:
|
||
croniter(args.schedule)
|
||
except (ValueError, KeyError) as e:
|
||
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
task["schedule"] = args.schedule
|
||
if args.timezone:
|
||
task["timezone"] = args.timezone
|
||
if args.scheduled_at:
|
||
try:
|
||
task["scheduled_at"] = parse_scheduled_at(args.scheduled_at)
|
||
task["next_run_at"] = task["scheduled_at"]
|
||
except ValueError as e:
|
||
print(f"Error: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# 重新计算 next_run_at(如果是 cron 且修改了 schedule 或 timezone)
|
||
if task["type"] == "cron" and task.get("schedule"):
|
||
task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC"))
|
||
|
||
save_tasks(bot_id, user_id, data)
|
||
print(f"任务已更新: {args.task_id}")
|
||
|
||
|
||
def cmd_delete(args, bot_id: str, user_id: str):
|
||
"""删除任务"""
|
||
data = load_tasks(bot_id, user_id)
|
||
original_count = len(data["tasks"])
|
||
data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id]
|
||
|
||
if len(data["tasks"]) == original_count:
|
||
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
save_tasks(bot_id, user_id, data)
|
||
print(f"任务已删除: {args.task_id}")
|
||
|
||
|
||
def cmd_toggle(args, bot_id: str, user_id: str):
|
||
"""暂停/恢复任务"""
|
||
data = load_tasks(bot_id, user_id)
|
||
|
||
for t in data["tasks"]:
|
||
if t["id"] == args.task_id:
|
||
if t["status"] == "active":
|
||
t["status"] = "paused"
|
||
print(f"任务已暂停: {args.task_id}")
|
||
elif t["status"] == "paused":
|
||
t["status"] = "active"
|
||
# 恢复时重新计算 next_run_at
|
||
if t["type"] == "cron" and t.get("schedule"):
|
||
t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC"))
|
||
print(f"任务已恢复: {args.task_id}")
|
||
else:
|
||
print(f"任务状态为 {t['status']},无法切换", file=sys.stderr)
|
||
sys.exit(1)
|
||
save_tasks(bot_id, user_id, data)
|
||
return
|
||
|
||
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
def cmd_logs(args, bot_id: str, user_id: str):
|
||
"""查看执行日志"""
|
||
logs_dir = get_logs_dir(bot_id, user_id)
|
||
log_file = logs_dir / "execution.log"
|
||
|
||
if not log_file.exists():
|
||
print("暂无执行日志。")
|
||
return
|
||
|
||
with open(log_file, 'r', encoding='utf-8') as f:
|
||
logs = yaml.safe_load(f)
|
||
|
||
if not logs:
|
||
print("暂无执行日志。")
|
||
return
|
||
|
||
# 按任务 ID 过滤
|
||
if args.task_id:
|
||
logs = [l for l in logs if l.get("task_id") == args.task_id]
|
||
|
||
# 限制数量
|
||
limit = args.limit or 10
|
||
logs = logs[-limit:]
|
||
|
||
if not logs:
|
||
print("没有匹配的执行日志。")
|
||
return
|
||
|
||
for log in logs:
|
||
status_icon = "✅" if log.get("status") == "success" else "❌"
|
||
print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}")
|
||
response = log.get("response", "")
|
||
# 截断过长的响应
|
||
if len(response) > 200:
|
||
response = response[:200] + "..."
|
||
print(f" {response}")
|
||
if log.get("duration_ms"):
|
||
print(f" 耗时: {log['duration_ms']}ms")
|
||
print()
|
||
|
||
|
||
def main():
|
||
bot_id = os.getenv("ASSISTANT_ID", "")
|
||
user_id = os.getenv("USER_IDENTIFIER", "")
|
||
|
||
if not bot_id or not user_id:
|
||
print("Error: ASSISTANT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
parser = argparse.ArgumentParser(description="定时任务管理工具")
|
||
subparsers = parser.add_subparsers(dest="command", help="可用命令")
|
||
|
||
# list
|
||
p_list = subparsers.add_parser("list", help="列出所有任务")
|
||
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式")
|
||
|
||
# add
|
||
p_add = subparsers.add_parser("add", help="添加任务")
|
||
p_add.add_argument("--name", required=True, help="任务名称")
|
||
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型")
|
||
p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)")
|
||
p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)")
|
||
p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC")
|
||
p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容")
|
||
|
||
# edit
|
||
p_edit = subparsers.add_parser("edit", help="编辑任务")
|
||
p_edit.add_argument("task_id", help="任务 ID")
|
||
p_edit.add_argument("--name", help="新任务名称")
|
||
p_edit.add_argument("--schedule", help="新 Cron 表达式")
|
||
p_edit.add_argument("--scheduled-at", help="新执行时间")
|
||
p_edit.add_argument("--timezone", help="新时区")
|
||
p_edit.add_argument("--message", help="新消息内容")
|
||
|
||
# delete
|
||
p_delete = subparsers.add_parser("delete", help="删除任务")
|
||
p_delete.add_argument("task_id", help="任务 ID")
|
||
|
||
# toggle
|
||
p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务")
|
||
p_toggle.add_argument("task_id", help="任务 ID")
|
||
|
||
# logs
|
||
p_logs = subparsers.add_parser("logs", help="查看执行日志")
|
||
p_logs.add_argument("--task-id", help="按任务 ID 过滤")
|
||
p_logs.add_argument("--limit", type=int, default=10, help="显示条数")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if not args.command:
|
||
parser.print_help()
|
||
sys.exit(1)
|
||
|
||
commands = {
|
||
"list": cmd_list,
|
||
"add": cmd_add,
|
||
"edit": cmd_edit,
|
||
"delete": cmd_delete,
|
||
"toggle": cmd_toggle,
|
||
"logs": cmd_logs,
|
||
}
|
||
|
||
commands[args.command](args, bot_id, user_id)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|