168 lines
5.8 KiB
Python
Executable File
168 lines
5.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
SQLite任务状态数据库管理工具
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import time
|
|
from task_queue.task_status import task_status_store
|
|
|
|
def view_database():
|
|
"""查看数据库内容"""
|
|
print("📊 SQLite任务状态数据库内容")
|
|
print("=" * 40)
|
|
print(f"数据库路径: {task_status_store.db_path}")
|
|
|
|
# 连接数据库
|
|
conn = sqlite3.connect(task_status_store.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# 查看表结构
|
|
print(f"\n📋 表结构:")
|
|
cursor.execute("PRAGMA table_info(task_status)")
|
|
columns = cursor.fetchall()
|
|
for col in columns:
|
|
print(f" {col[1]} ({col[2]})")
|
|
|
|
# 查看所有记录
|
|
print(f"\n📝 所有记录:")
|
|
cursor.execute("SELECT * FROM task_status ORDER BY updated_at DESC")
|
|
rows = cursor.fetchall()
|
|
|
|
if not rows:
|
|
print(" (空数据库)")
|
|
else:
|
|
print(f" 共 {len(rows)} 条记录:")
|
|
for i, row in enumerate(rows):
|
|
task_id, unique_id, status, created_at, updated_at, result, error = row
|
|
created_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at))
|
|
updated_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(updated_at))
|
|
|
|
print(f" {i+1}. {task_id}")
|
|
print(f" 项目ID: {unique_id}")
|
|
print(f" 状态: {status}")
|
|
print(f" 创建: {created_str}")
|
|
print(f" 更新: {updated_str}")
|
|
if result:
|
|
try:
|
|
result_data = json.loads(result)
|
|
print(f" 结果: {result_data.get('message', 'N/A')}")
|
|
except:
|
|
print(f" 结果: {result[:50]}...")
|
|
if error:
|
|
print(f" 错误: {error}")
|
|
print()
|
|
|
|
conn.close()
|
|
|
|
def run_query(sql_query: str):
|
|
"""执行自定义查询"""
|
|
print(f"🔍 执行查询: {sql_query}")
|
|
|
|
try:
|
|
conn = sqlite3.connect(task_status_store.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql_query)
|
|
rows = cursor.fetchall()
|
|
|
|
if not rows:
|
|
print(" (无结果)")
|
|
else:
|
|
print(f" {len(rows)} 条结果:")
|
|
for row in rows:
|
|
print(f" {dict(row)}")
|
|
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"❌ 查询失败: {e}")
|
|
|
|
def interactive_shell():
|
|
"""交互式数据库管理"""
|
|
print("\n🖥️ 交互式数据库管理")
|
|
print("输入 'help' 查看可用命令,输入 'quit' 退出")
|
|
|
|
while True:
|
|
try:
|
|
command = input("\n> ").strip()
|
|
|
|
if command.lower() in ['quit', 'exit', 'q']:
|
|
break
|
|
elif command.lower() == 'help':
|
|
print("""
|
|
可用命令:
|
|
view - 查看所有记录
|
|
stats - 查看统计信息
|
|
pending - 查看待处理任务
|
|
completed - 查看已完成任务
|
|
failed - 查看失败任务
|
|
sql <查询> - 执行SQL查询
|
|
cleanup <天数> - 清理N天前的记录
|
|
count - 统计总任务数
|
|
help - 显示帮助
|
|
quit/exit/q - 退出
|
|
""")
|
|
elif command.lower() == 'view':
|
|
view_database()
|
|
elif command.lower() == 'stats':
|
|
stats = task_status_store.get_statistics()
|
|
print(f"统计信息:")
|
|
print(f" 总任务数: {stats['total_tasks']}")
|
|
print(f" 状态分布: {stats['status_breakdown']}")
|
|
print(f" 最近24小时: {stats['recent_24h']}")
|
|
elif command.lower() == 'pending':
|
|
tasks = task_status_store.search_tasks(status="pending")
|
|
print(f"待处理任务 ({len(tasks)} 个):")
|
|
for task in tasks:
|
|
print(f" - {task['task_id']}: {task['unique_id']}")
|
|
elif command.lower() == 'completed':
|
|
tasks = task_status_store.search_tasks(status="completed")
|
|
print(f"已完成任务 ({len(tasks)} 个):")
|
|
for task in tasks:
|
|
print(f" - {task['task_id']}: {task['unique_id']}")
|
|
elif command.lower() == 'failed':
|
|
tasks = task_status_store.search_tasks(status="failed")
|
|
print(f"失败任务 ({len(tasks)} 个):")
|
|
for task in tasks:
|
|
print(f" - {task['task_id']}: {task['unique_id']}")
|
|
elif command.lower().startswith('sql '):
|
|
sql_query = command[4:]
|
|
run_query(sql_query)
|
|
elif command.lower().startswith('cleanup '):
|
|
try:
|
|
days = int(command[8:])
|
|
count = task_status_store.cleanup_old_tasks(days)
|
|
print(f"✅ 已清理 {count} 条 {days} 天前的记录")
|
|
except ValueError:
|
|
print("❌ 请输入有效的天数")
|
|
elif command.lower() == 'count':
|
|
all_tasks = task_status_store.list_all()
|
|
print(f"总任务数: {len(all_tasks)}")
|
|
else:
|
|
print("❌ 未知命令,输入 'help' 查看帮助")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n👋 再见!")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ 执行错误: {e}")
|
|
|
|
def main():
|
|
"""主函数"""
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == 'view':
|
|
view_database()
|
|
elif sys.argv[1] == 'interactive':
|
|
interactive_shell()
|
|
else:
|
|
print("用法: python db_manager.py [view|interactive]")
|
|
else:
|
|
view_database()
|
|
interactive_shell()
|
|
|
|
if __name__ == "__main__":
|
|
main() |