schedule 优化

This commit is contained in:
朱潮 2026-04-01 10:37:03 +08:00
parent fd0fbc422d
commit d6ee567758

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
定时任务管理 CLI 工具 Scheduled Task Manager CLI Tool
用于增删改查用户的定时任务数据存储在 tasks.yaml 文件中 Add, delete, modify, and query user scheduled tasks. Data is stored in tasks.yaml.
环境变量: Environment variables:
ASSISTANT_ID: 当前 bot ID ASSISTANT_ID: Current bot ID
USER_IDENTIFIER: 当前用户标识 USER_IDENTIFIER: Current user identifier
""" """
import argparse import argparse
@ -25,7 +25,7 @@ except ImportError:
sys.exit(1) sys.exit(1)
# 语言到时区的映射 # Language to timezone mapping
LANGUAGE_TIMEZONE_MAP = { LANGUAGE_TIMEZONE_MAP = {
'zh': 'Asia/Shanghai', 'zh': 'Asia/Shanghai',
'ja': 'Asia/Tokyo', 'ja': 'Asia/Tokyo',
@ -33,7 +33,7 @@ LANGUAGE_TIMEZONE_MAP = {
'en': 'UTC', 'en': 'UTC',
} }
# 时区到 UTC 偏移(小时) # Timezone to UTC offset (hours)
TIMEZONE_OFFSET_MAP = { TIMEZONE_OFFSET_MAP = {
'Asia/Shanghai': 8, 'Asia/Shanghai': 8,
'Asia/Tokyo': 9, 'Asia/Tokyo': 9,
@ -46,22 +46,22 @@ TIMEZONE_OFFSET_MAP = {
def get_tasks_dir(bot_id: str, user_id: str) -> Path: def get_tasks_dir(bot_id: str, user_id: str) -> Path:
"""获取用户任务目录路径""" """Get user task directory path"""
return Path("users") / user_id return Path("users") / user_id
def get_tasks_file(bot_id: str, user_id: str) -> Path: def get_tasks_file(bot_id: str, user_id: str) -> Path:
"""获取 tasks.yaml 文件路径""" """Get tasks.yaml file path"""
return get_tasks_dir(bot_id, user_id) / "tasks.yaml" return get_tasks_dir(bot_id, user_id) / "tasks.yaml"
def get_logs_dir(bot_id: str, user_id: str) -> Path: def get_logs_dir(bot_id: str, user_id: str) -> Path:
"""获取任务日志目录""" """Get task logs directory"""
return get_tasks_dir(bot_id, user_id) / "task_logs" return get_tasks_dir(bot_id, user_id) / "task_logs"
def load_tasks(bot_id: str, user_id: str) -> dict: def load_tasks(bot_id: str, user_id: str) -> dict:
"""加载 tasks.yaml""" """Load tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id) tasks_file = get_tasks_file(bot_id, user_id)
if not tasks_file.exists(): if not tasks_file.exists():
return {"tasks": []} return {"tasks": []}
@ -71,7 +71,7 @@ def load_tasks(bot_id: str, user_id: str) -> dict:
def save_tasks(bot_id: str, user_id: str, data: dict): def save_tasks(bot_id: str, user_id: str, data: dict):
"""保存 tasks.yaml""" """Save tasks.yaml"""
tasks_file = get_tasks_file(bot_id, user_id) tasks_file = get_tasks_file(bot_id, user_id)
tasks_file.parent.mkdir(parents=True, exist_ok=True) tasks_file.parent.mkdir(parents=True, exist_ok=True)
with open(tasks_file, 'w', encoding='utf-8') as f: with open(tasks_file, 'w', encoding='utf-8') as f:
@ -79,53 +79,54 @@ def save_tasks(bot_id: str, user_id: str, data: dict):
def generate_task_id() -> str: def generate_task_id() -> str:
"""生成唯一任务 ID""" """Generate unique task ID"""
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) rand = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"task_{ts}_{rand}" return f"task_{ts}_{rand}"
def parse_timezone_offset(tz: str) -> int: def parse_timezone_offset(tz: str) -> int:
"""获取时区的 UTC 偏移小时数""" """Get UTC offset hours for a timezone"""
return TIMEZONE_OFFSET_MAP.get(tz, 0) return TIMEZONE_OFFSET_MAP.get(tz, 0)
def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str: def compute_next_run_cron(schedule: str, tz: str, after: datetime = None) -> str:
""" """
根据 cron 表达式和时区计算下次执行的 UTC 时间 Calculate next execution UTC time based on cron expression and timezone.
cron 表达式是基于用户本地时间的需要先在本地时间计算下次触发再转换为 UTC The cron expression is based on user's local time, so we first calculate
the next trigger in local time, then convert to UTC.
""" """
offset_hours = parse_timezone_offset(tz) offset_hours = parse_timezone_offset(tz)
offset = timedelta(hours=offset_hours) offset = timedelta(hours=offset_hours)
# 当前 UTC 时间 # Current UTC time
now_utc = after or datetime.now(timezone.utc) now_utc = after or datetime.now(timezone.utc)
# 转为用户本地时间naive # Convert to user's local time (naive)
now_local = (now_utc + offset).replace(tzinfo=None) now_local = (now_utc + offset).replace(tzinfo=None)
# 在本地时间上计算下次 cron 触发 # Calculate next cron trigger in local time
cron = croniter(schedule, now_local) cron = croniter(schedule, now_local)
next_local = cron.get_next(datetime) next_local = cron.get_next(datetime)
# 转回 UTC # Convert back to UTC
next_utc = next_local - offset next_utc = next_local - offset
return next_utc.replace(tzinfo=timezone.utc).isoformat() return next_utc.replace(tzinfo=timezone.utc).isoformat()
def parse_scheduled_at(scheduled_at_str: str) -> str: def parse_scheduled_at(scheduled_at_str: str) -> str:
"""解析一次性任务的时间字符串,返回 UTC ISO 格式""" """Parse one-time task time string, return UTC ISO format"""
# 尝试解析带时区偏移的 ISO 格式 # Try to parse ISO format with timezone offset
try: try:
dt = datetime.fromisoformat(scheduled_at_str) dt = datetime.fromisoformat(scheduled_at_str)
if dt.tzinfo is None: if dt.tzinfo is None:
# 无时区信息,假设 UTC # No timezone info, assume UTC
dt = dt.replace(tzinfo=timezone.utc) dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat() return dt.astimezone(timezone.utc).isoformat()
except ValueError: except ValueError:
pass pass
# 尝试解析常见格式 # Try to parse common formats
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]: for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"]:
try: try:
dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc) dt = datetime.strptime(scheduled_at_str, fmt).replace(tzinfo=timezone.utc)
@ -133,49 +134,49 @@ def parse_scheduled_at(scheduled_at_str: str) -> str:
except ValueError: except ValueError:
continue continue
raise ValueError(f"无法解析时间格式: {scheduled_at_str}") raise ValueError(f"Cannot parse time format: {scheduled_at_str}")
def cmd_list(args, bot_id: str, user_id: str): def cmd_list(args, bot_id: str, user_id: str):
"""列出所有任务""" """List all tasks"""
data = load_tasks(bot_id, user_id) data = load_tasks(bot_id, user_id)
tasks = data.get("tasks", []) tasks = data.get("tasks", [])
if not tasks: if not tasks:
print("当前没有定时任务。") print("No scheduled tasks.")
return return
if args.format == "brief": if args.format == "brief":
# 简洁格式,用于 PrePrompt hook # Brief format for PrePrompt hook
print(f"定时任务列表 ({len(tasks)}):") print(f"Scheduled tasks ({len(tasks)}):")
for t in tasks: for t in tasks:
status_icon = {"active": "", "paused": "⏸️", "done": "✔️", "expired": ""}.get(t["status"], "") status_icon = {"active": "", "paused": "⏸️", "done": "✔️", "expired": ""}.get(t["status"], "")
if t["type"] == "cron": if t["type"] == "cron":
print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | 已执行{t.get('execution_count', 0)}") print(f" {status_icon} [{t['id']}] {t['name']} | cron: {t['schedule']} ({t.get('timezone', 'UTC')}) | executed {t.get('execution_count', 0)} times")
else: else:
print(f" {status_icon} [{t['id']}] {t['name']} | 一次性: {t.get('scheduled_at', 'N/A')}") print(f" {status_icon} [{t['id']}] {t['name']} | once: {t.get('scheduled_at', 'N/A')}")
else: else:
# 详细格式 # Detail format
for t in tasks: for t in tasks:
print(f"--- 任务: {t['name']} ---") print(f"--- Task: {t['name']} ---")
print(f" ID: {t['id']}") print(f" ID: {t['id']}")
print(f" 类型: {t['type']}") print(f" Type: {t['type']}")
print(f" 状态: {t['status']}") print(f" Status: {t['status']}")
if t["type"] == "cron": if t["type"] == "cron":
print(f" Cron: {t['schedule']}") print(f" Cron: {t['schedule']}")
print(f" 时区: {t.get('timezone', 'UTC')}") print(f" Timezone: {t.get('timezone', 'UTC')}")
else: else:
print(f" 计划时间: {t.get('scheduled_at', 'N/A')}") print(f" Scheduled at: {t.get('scheduled_at', 'N/A')}")
print(f" 消息: {t['message']}") print(f" Message: {t['message']}")
print(f" 下次执行: {t.get('next_run_at', 'N/A')}") print(f" Next run: {t.get('next_run_at', 'N/A')}")
print(f" 上次执行: {t.get('last_executed_at', 'N/A')}") print(f" Last run: {t.get('last_executed_at', 'N/A')}")
print(f" 执行次数: {t.get('execution_count', 0)}") print(f" Executions: {t.get('execution_count', 0)}")
print(f" 创建时间: {t.get('created_at', 'N/A')}") print(f" Created at: {t.get('created_at', 'N/A')}")
print() print()
def cmd_add(args, bot_id: str, user_id: str): def cmd_add(args, bot_id: str, user_id: str):
"""添加任务""" """Add a task"""
data = load_tasks(bot_id, user_id) data = load_tasks(bot_id, user_id)
task_id = generate_task_id() task_id = generate_task_id()
@ -198,20 +199,20 @@ def cmd_add(args, bot_id: str, user_id: str):
if args.type == "cron": if args.type == "cron":
if not args.schedule: if not args.schedule:
print("Error: cron 类型任务必须提供 --schedule 参数", file=sys.stderr) print("Error: --schedule is required for cron type tasks", file=sys.stderr)
sys.exit(1) sys.exit(1)
# 验证 cron 表达式 # Validate cron expression
try: try:
croniter(args.schedule) croniter(args.schedule)
except (ValueError, KeyError) as e: except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr) print(f"Error: Invalid cron expression '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
task["schedule"] = args.schedule task["schedule"] = args.schedule
task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"]) task["next_run_at"] = compute_next_run_cron(args.schedule, task["timezone"])
elif args.type == "once": elif args.type == "once":
if not args.scheduled_at: if not args.scheduled_at:
print("Error: once 类型任务必须提供 --scheduled-at 参数", file=sys.stderr) print("Error: --scheduled-at is required for once type tasks", file=sys.stderr)
sys.exit(1) sys.exit(1)
try: try:
utc_time = parse_scheduled_at(args.scheduled_at) utc_time = parse_scheduled_at(args.scheduled_at)
@ -223,14 +224,14 @@ def cmd_add(args, bot_id: str, user_id: str):
data["tasks"].append(task) data["tasks"].append(task)
save_tasks(bot_id, user_id, data) save_tasks(bot_id, user_id, data)
print(f"任务已创建: {task_id}") print(f"Task created: {task_id}")
print(f" 名称: {args.name}") print(f" Name: {args.name}")
print(f" 类型: {args.type}") print(f" Type: {args.type}")
print(f" 下次执行 (UTC): {task['next_run_at']}") print(f" Next run (UTC): {task['next_run_at']}")
def cmd_edit(args, bot_id: str, user_id: str): def cmd_edit(args, bot_id: str, user_id: str):
"""编辑任务""" """Edit a task"""
data = load_tasks(bot_id, user_id) data = load_tasks(bot_id, user_id)
task = None task = None
@ -240,7 +241,7 @@ def cmd_edit(args, bot_id: str, user_id: str):
break break
if not task: if not task:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1) sys.exit(1)
if args.name: if args.name:
@ -251,7 +252,7 @@ def cmd_edit(args, bot_id: str, user_id: str):
try: try:
croniter(args.schedule) croniter(args.schedule)
except (ValueError, KeyError) as e: except (ValueError, KeyError) as e:
print(f"Error: 无效的 cron 表达式 '{args.schedule}': {e}", file=sys.stderr) print(f"Error: Invalid cron expression '{args.schedule}': {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
task["schedule"] = args.schedule task["schedule"] = args.schedule
if args.timezone: if args.timezone:
@ -264,91 +265,91 @@ def cmd_edit(args, bot_id: str, user_id: str):
print(f"Error: {e}", file=sys.stderr) print(f"Error: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
# 重新计算 next_run_at如果是 cron 且修改了 schedule 或 timezone # Recalculate next_run_at (if cron and schedule or timezone changed)
if task["type"] == "cron" and task.get("schedule"): if task["type"] == "cron" and task.get("schedule"):
task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC")) task["next_run_at"] = compute_next_run_cron(task["schedule"], task.get("timezone", "UTC"))
save_tasks(bot_id, user_id, data) save_tasks(bot_id, user_id, data)
print(f"任务已更新: {args.task_id}") print(f"Task updated: {args.task_id}")
def cmd_delete(args, bot_id: str, user_id: str): def cmd_delete(args, bot_id: str, user_id: str):
"""删除任务""" """Delete a task"""
data = load_tasks(bot_id, user_id) data = load_tasks(bot_id, user_id)
original_count = len(data["tasks"]) original_count = len(data["tasks"])
data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id] data["tasks"] = [t for t in data["tasks"] if t["id"] != args.task_id]
if len(data["tasks"]) == original_count: if len(data["tasks"]) == original_count:
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1) sys.exit(1)
save_tasks(bot_id, user_id, data) save_tasks(bot_id, user_id, data)
print(f"任务已删除: {args.task_id}") print(f"Task deleted: {args.task_id}")
def cmd_toggle(args, bot_id: str, user_id: str): def cmd_toggle(args, bot_id: str, user_id: str):
"""暂停/恢复任务""" """Pause/resume task"""
data = load_tasks(bot_id, user_id) data = load_tasks(bot_id, user_id)
for t in data["tasks"]: for t in data["tasks"]:
if t["id"] == args.task_id: if t["id"] == args.task_id:
if t["status"] == "active": if t["status"] == "active":
t["status"] = "paused" t["status"] = "paused"
print(f"任务已暂停: {args.task_id}") print(f"Task paused: {args.task_id}")
elif t["status"] == "paused": elif t["status"] == "paused":
t["status"] = "active" t["status"] = "active"
# 恢复时重新计算 next_run_at # Recalculate next_run_at when resuming
if t["type"] == "cron" and t.get("schedule"): if t["type"] == "cron" and t.get("schedule"):
t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC")) t["next_run_at"] = compute_next_run_cron(t["schedule"], t.get("timezone", "UTC"))
print(f"任务已恢复: {args.task_id}") print(f"Task resumed: {args.task_id}")
else: else:
print(f"任务状态为 {t['status']},无法切换", file=sys.stderr) print(f"Task status is {t['status']}, cannot toggle", file=sys.stderr)
sys.exit(1) sys.exit(1)
save_tasks(bot_id, user_id, data) save_tasks(bot_id, user_id, data)
return return
print(f"Error: 任务 {args.task_id} 不存在", file=sys.stderr) print(f"Error: Task {args.task_id} not found", file=sys.stderr)
sys.exit(1) sys.exit(1)
def cmd_logs(args, bot_id: str, user_id: str): def cmd_logs(args, bot_id: str, user_id: str):
"""查看执行日志""" """View execution logs"""
logs_dir = get_logs_dir(bot_id, user_id) logs_dir = get_logs_dir(bot_id, user_id)
log_file = logs_dir / "execution.log" log_file = logs_dir / "execution.log"
if not log_file.exists(): if not log_file.exists():
print("暂无执行日志。") print("No execution logs yet.")
return return
with open(log_file, 'r', encoding='utf-8') as f: with open(log_file, 'r', encoding='utf-8') as f:
logs = yaml.safe_load(f) logs = yaml.safe_load(f)
if not logs: if not logs:
print("暂无执行日志。") print("No execution logs yet.")
return return
# 按任务 ID 过滤 # Filter by task ID
if args.task_id: if args.task_id:
logs = [l for l in logs if l.get("task_id") == args.task_id] logs = [l for l in logs if l.get("task_id") == args.task_id]
# 限制数量 # Limit count
limit = args.limit or 10 limit = args.limit or 10
logs = logs[-limit:] logs = logs[-limit:]
if not logs: if not logs:
print("没有匹配的执行日志。") print("No matching logs found.")
return return
for log in logs: for log in logs:
status_icon = "" if log.get("status") == "success" else "" status_icon = "" if log.get("status") == "success" else ""
print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}") print(f"{status_icon} [{log.get('executed_at', 'N/A')}] {log.get('task_name', 'N/A')}")
response = log.get("response", "") response = log.get("response", "")
# 截断过长的响应 # Truncate long responses
if len(response) > 200: if len(response) > 200:
response = response[:200] + "..." response = response[:200] + "..."
print(f" {response}") print(f" {response}")
if log.get("duration_ms"): if log.get("duration_ms"):
print(f" 耗时: {log['duration_ms']}ms") print(f" Duration: {log['duration_ms']}ms")
print() print()
@ -357,46 +358,46 @@ def main():
user_id = os.getenv("USER_IDENTIFIER", "") user_id = os.getenv("USER_IDENTIFIER", "")
if not bot_id or not user_id: if not bot_id or not user_id:
print("Error: ASSISTANT_ID 和 USER_IDENTIFIER 环境变量必须设置", file=sys.stderr) print("Error: ASSISTANT_ID and USER_IDENTIFIER environment variables must be set", file=sys.stderr)
sys.exit(1) sys.exit(1)
parser = argparse.ArgumentParser(description="定时任务管理工具") parser = argparse.ArgumentParser(description="Scheduled task manager")
subparsers = parser.add_subparsers(dest="command", help="可用命令") subparsers = parser.add_subparsers(dest="command", help="Available commands")
# list # list
p_list = subparsers.add_parser("list", help="列出所有任务") p_list = subparsers.add_parser("list", help="List all tasks")
p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="输出格式") p_list.add_argument("--format", choices=["brief", "detail"], default="detail", help="Output format")
# add # add
p_add = subparsers.add_parser("add", help="添加任务") p_add = subparsers.add_parser("add", help="Add a task")
p_add.add_argument("--name", required=True, help="任务名称") p_add.add_argument("--name", required=True, help="Task name")
p_add.add_argument("--type", required=True, choices=["cron", "once"], help="任务类型") p_add.add_argument("--type", required=True, choices=["cron", "once"], help="Task type")
p_add.add_argument("--schedule", help="Cron 表达式 (cron 类型必填)") p_add.add_argument("--schedule", help="Cron expression (required for cron type)")
p_add.add_argument("--scheduled-at", help="执行时间 ISO 8601 格式 (once 类型必填)") p_add.add_argument("--scheduled-at", help="Execution time in ISO 8601 format (required for once type)")
p_add.add_argument("--timezone", help="时区 (如 Asia/Tokyo),默认 UTC") p_add.add_argument("--timezone", help="Timezone (e.g. Asia/Tokyo), default UTC")
p_add.add_argument("--message", required=True, help="发送给 AI 的消息内容") p_add.add_argument("--message", required=True, help="Message content to send to AI")
# edit # edit
p_edit = subparsers.add_parser("edit", help="编辑任务") p_edit = subparsers.add_parser("edit", help="Edit a task")
p_edit.add_argument("task_id", help="任务 ID") p_edit.add_argument("task_id", help="Task ID")
p_edit.add_argument("--name", help="新任务名称") p_edit.add_argument("--name", help="New task name")
p_edit.add_argument("--schedule", help="新 Cron 表达式") p_edit.add_argument("--schedule", help="New cron expression")
p_edit.add_argument("--scheduled-at", help="新执行时间") p_edit.add_argument("--scheduled-at", help="New execution time")
p_edit.add_argument("--timezone", help="新时区") p_edit.add_argument("--timezone", help="New timezone")
p_edit.add_argument("--message", help="新消息内容") p_edit.add_argument("--message", help="New message content")
# delete # delete
p_delete = subparsers.add_parser("delete", help="删除任务") p_delete = subparsers.add_parser("delete", help="Delete a task")
p_delete.add_argument("task_id", help="任务 ID") p_delete.add_argument("task_id", help="Task ID")
# toggle # toggle
p_toggle = subparsers.add_parser("toggle", help="暂停/恢复任务") p_toggle = subparsers.add_parser("toggle", help="Pause/resume a task")
p_toggle.add_argument("task_id", help="任务 ID") p_toggle.add_argument("task_id", help="Task ID")
# logs # logs
p_logs = subparsers.add_parser("logs", help="查看执行日志") p_logs = subparsers.add_parser("logs", help="View execution logs")
p_logs.add_argument("--task-id", help="按任务 ID 过滤") p_logs.add_argument("--task-id", help="Filter by task ID")
p_logs.add_argument("--limit", type=int, default=10, help="显示条数") p_logs.add_argument("--limit", type=int, default=10, help="Number of entries to show")
args = parser.parse_args() args = parser.parse_args()