#!/usr/bin/env python3 """ 定时任务管理 CLI 工具 用于增删改查用户的定时任务,数据存储在 tasks.yaml 文件中。 环境变量: BOT_ID: 当前 bot ID USER_IDENTIFIER: 当前用户标识 """ import argparse import os import sys import yaml import json import string import random from datetime import datetime, timezone, timedelta from pathlib import Path try: from croniter import croniter except ImportError: print("Error: croniter is required. Install with: pip install croniter", file=sys.stderr) sys.exit(1) # 语言到时区的映射 LANGUAGE_TIMEZONE_MAP = { 'zh': 'Asia/Shanghai', 'ja': 'Asia/Tokyo', 'jp': 'Asia/Tokyo', 'en': 'UTC', } # 时区到 UTC 偏移(小时) TIMEZONE_OFFSET_MAP = { 'Asia/Shanghai': 8, 'Asia/Tokyo': 9, 'UTC': 0, 'America/New_York': -5, 'America/Los_Angeles': -8, 'Europe/London': 0, 'Europe/Berlin': 1, } def get_tasks_dir(bot_id: str, user_id: str) -> Path: """获取用户任务目录路径""" return Path("users") / user_id def get_tasks_file(bot_id: str, user_id: str) -> Path: """获取 tasks.yaml 文件路径""" return get_tasks_dir(bot_id, user_id) / "tasks.yaml" def get_logs_dir(bot_id: str, user_id: str) -> Path: """获取任务日志目录""" return get_tasks_dir(bot_id, user_id) / "task_logs" def load_tasks(bot_id: str, user_id: str) -> dict: """加载 tasks.yaml""" tasks_file = get_tasks_file(bot_id, user_id) if not tasks_file.exists(): return {"tasks": []} with open(tasks_file, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) return data if data and "tasks" in data else {"tasks": []} def save_tasks(bot_id: str, user_id: str, data: dict): """保存 tasks.yaml""" tasks_file = get_tasks_file(bot_id, user_id) tasks_file.parent.mkdir(parents=True, exist_ok=True) with open(tasks_file, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) def generate_task_id() -> str: """生成唯一任务 ID""" ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) return f"task_{ts}_{rand}" def parse_timezone_offset(tz: str) -> int: """获取时区的 UTC 偏移小时数""" return TIMEZONE_OFFSET_MAP.get(tz, 0) def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str: """ 根据 cron 表达式和时区计算下次执行的 UTC 时间。 cron 表达式是基于用户本地时间的,需要先在本地时间计算下次触发,再转换为 UTC。 """ offset_hours = parse_timezone_offset(tz) offset = timedelta(hours=offset_hours) # 当前 UTC 时间 now_utc = after or datetime.now(timezone.utc) # 转为用户本地时间(naive) now_local = (now_utc + offset).replace(tzinfo=None) # 在本地时间上计算下次 cron 触发 cron = croniter(schedule, now_local) next_local = cron.get_next(datetime) # 转回 UTC next_utc = next_local - offset return next_utc.replace(tzinfo=timezone.utc).isoformat() def parse_scheduled_at(scheduled_at_str: str) -> str: """解析一次性任务的时间字符串,返回 UTC ISO 格式""" # 尝试解析带时区偏移的 ISO 格式 try: dt = datetime.fromisoformat(scheduled_at_str) if dt.tzinfo is None: # 无时区信息,假设 UTC dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).isoformat() except ValueError: pass # 尝试解析常见格式 for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]: try: dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc) return dt.isoformat() except ValueError: continue raise ValueError(f"无法解析时间格式: {scheduled_at_str}") def cmd_list(args, bot_id: str, user_id: str): """列出所有任务""" data = load_tasks(bot_id, user_id) tasks = data.get("tasks", []) if not tasks: print("当前没有定时任务。") return if args.format == "brief": # 简洁格式,用于 PrePrompt hook print(f"定时任务列表 ({len(tasks)} 个):") for t in tasks: status_icon = {"active": "✅", "paused": "⏸️", "done": "✔️", "expired": "⏰"}.get(t["status"], "❓") if t["type"] == "cron": print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}次") else: print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}") else: # 详细格式 for t in tasks: print(f"--- 任务: {t['name']} ---") print(f" ID: {t['id']}") print(f" 类型: {t['type']}") print(f" 状态: {t['status']}") if t["type"] == "cron": print(f" Cron: {t['schedule']}") print(f" 时区: {t.get('timezone', 'UTC')}") else: print(f" 计划时间: {t.get('scheduled_at', 'N/A')}") print(f" 消息: {t['message']}") print(f" 下次执行: {t.get('next_run_at', 'N/A')}") print(f" 上次执行: {t.get('last_executed_at', 'N/A')}") print(f" 执行次数: {t.get('execution_count', 0)}") print(f" 创建时间: {t.get('created_at', 'N/A')}") print() def cmd_add(args, bot_id: str, user_id: str): """添加任务""" data = load_tasks(bot_id, user_id) task_id = generate_task_id() now_utc = datetime.now(timezone.utc).isoformat() task = { "id": task_id, "name": args.name, "type": args.type, "schedule": None, "scheduled_at": None, "timezone": args.timezone or "UTC", "message": args.message, "status": "active", "created_at": now_utc, "last_executed_at": None, "next_run_at": None, "execution_count": 0, } if args.type == "cron": if not args.schedule: print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr) sys.exit(1) # 验证 cron 表达式 try: croniter(args.schedule) except (ValueError, KeyError) as e: print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr) sys.exit(1) task["schedule"] = args.schedule task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"]) elif args.type == "once": if not args.scheduled_at: print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr) sys.exit(1) try: utc_time = parse_scheduled_at(args.scheduled_at) except ValueError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) task["scheduled_at"] = utc_time task["next_run_at"] = utc_time data["tasks"].append(task) save_tasks(bot_id, user_id, data) print(f"任务已创建: {task_id}") print(f" 名称: {args.name}") print(f" 类型: {args.type}") print(f" 下次执行 (UTC): {task['next_run_at']}") def cmd_edit(args, bot_id: str, user_id: str): """编辑任务""" data = load_tasks(bot_id, user_id) task = None for t in data["tasks"]: if t["id"] == args.task_id: task = t break if not task: print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) sys.exit(1) if args.name: task["name"] = args.name if args.message: task["message"] = args.message if args.schedule: try: croniter(args.schedule) except (ValueError, KeyError) as e: print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr) sys.exit(1) task["schedule"] = args.schedule if args.timezone: task["timezone"] = args.timezone if args.scheduled_at: try: task["scheduled_at"] = parse_scheduled_at(args.scheduled_at) task["next_run_at"] = task["scheduled_at"] except ValueError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) # 重新计算 next_run_at(如果是 cron 且修改了 schedule 或 timezone) if task["type"] == "cron" and task.get("schedule"): task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC")) save_tasks(bot_id, user_id, data) print(f"任务已更新: {args.task_id}") def cmd_delete(args, bot_id: str, user_id: str): """删除任务""" data = load_tasks(bot_id, user_id) original_count = len(data["tasks"]) data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id] if len(data["tasks"]) == original_count: print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) sys.exit(1) save_tasks(bot_id, user_id, data) print(f"任务已删除: {args.task_id}") def cmd_toggle(args, bot_id: str, user_id: str): """暂停/恢复任务""" data = load_tasks(bot_id, user_id) for t in data["tasks"]: if t["id"] == args.task_id: if t["status"] == "active": t["status"] = "paused" print(f"任务已暂停: {args.task_id}") elif t["status"] == "paused": t["status"] = "active" # 恢复时重新计算 next_run_at if t["type"] == "cron" and t.get("schedule"): t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC")) print(f"任务已恢复: {args.task_id}") else: print(f"任务状态为 {t['status']},无法切换", file=sys.stderr) sys.exit(1) save_tasks(bot_id, user_id, data) return print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) sys.exit(1) def cmd_logs(args, bot_id: str, user_id: str): """查看执行日志""" logs_dir = get_logs_dir(bot_id, user_id) log_file = logs_dir / "execution.log" if not log_file.exists(): print("暂无执行日志。") return with open(log_file, 'r', encoding='utf-8') as f: logs = yaml.safe_load(f) if not logs: print("暂无执行日志。") return # 按任务 ID 过滤 if args.task_id: logs = [l for l in logs if l.get("task_id") == args.task_id] # 限制数量 limit = args.limit or 10 logs = logs[-limit:] if not logs: print("没有匹配的执行日志。") return for log in logs: status_icon = "✅" if log.get("status") == "success" else "❌" print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}") response = log.get("response", "") # 截断过长的响应 if len(response) > 200: response = response[:200] + "..." print(f" {response}") if log.get("duration_ms"): print(f" 耗时: {log['duration_ms']}ms") print() def main(): bot_id = os.getenv("ASSISTANT_ID", "") user_id = os.getenv("USER_IDENTIFIER", "") if not bot_id or not user_id: print("Error: ASSISTANT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr) sys.exit(1) parser = argparse.ArgumentParser(description="定时任务管理工具") subparsers = parser.add_subparsers(dest="command", help="可用命令") # list p_list = subparsers.add_parser("list", help="列出所有任务") p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式") # add p_add = subparsers.add_parser("add", help="添加任务") p_add.add_argument("--name", required=True, help="任务名称") p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型") p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)") p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)") p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC") p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容") # edit p_edit = subparsers.add_parser("edit", help="编辑任务") p_edit.add_argument("task_id", help="任务 ID") p_edit.add_argument("--name", help="新任务名称") p_edit.add_argument("--schedule", help="新 Cron 表达式") p_edit.add_argument("--scheduled-at", help="新执行时间") p_edit.add_argument("--timezone", help="新时区") p_edit.add_argument("--message", help="新消息内容") # delete p_delete = subparsers.add_parser("delete", help="删除任务") p_delete.add_argument("task_id", help="任务 ID") # toggle p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务") p_toggle.add_argument("task_id", help="任务 ID") # logs p_logs = subparsers.add_parser("logs", help="查看执行日志") p_logs.add_argument("--task-id", help="按任务 ID 过滤") p_logs.add_argument("--limit", type=int, default=10, help="显示条数") args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) commands = { "list": cmd_list, "add": cmd_add, "edit": cmd_edit, "delete": cmd_delete, "toggle": cmd_toggle, "logs": cmd_logs, } commands[args.command](args, bot_id, user_id) if __name__ == "__main__": main()