catalog-agent/db_manager.py
2025-10-18 09:20:59 +08:00

168 lines
5.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
SQLite任务状态数据库管理工具
"""
import sqlite3
import json
import time
from task_queue.task_status import task_status_store
def view_database():
"""查看数据库内容"""
print("📊 SQLite任务状态数据库内容")
print("=" * 40)
print(f"数据库路径: {task_status_store.db_path}")
# 连接数据库
conn = sqlite3.connect(task_status_store.db_path)
cursor = conn.cursor()
# 查看表结构
print(f"\n📋 表结构:")
cursor.execute("PRAGMA table_info(task_status)")
columns = cursor.fetchall()
for col in columns:
print(f" {col[1]} ({col[2]})")
# 查看所有记录
print(f"\n📝 所有记录:")
cursor.execute("SELECT * FROM task_status ORDER BY updated_at DESC")
rows = cursor.fetchall()
if not rows:
print(" (空数据库)")
else:
print(f"{len(rows)} 条记录:")
for i, row in enumerate(rows):
task_id, unique_id, status, created_at, updated_at, result, error = row
created_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at))
updated_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(updated_at))
print(f" {i+1}. {task_id}")
print(f" 项目ID: {unique_id}")
print(f" 状态: {status}")
print(f" 创建: {created_str}")
print(f" 更新: {updated_str}")
if result:
try:
result_data = json.loads(result)
print(f" 结果: {result_data.get('message', 'N/A')}")
except:
print(f" 结果: {result[:50]}...")
if error:
print(f" 错误: {error}")
print()
conn.close()
def run_query(sql_query: str):
"""执行自定义查询"""
print(f"🔍 执行查询: {sql_query}")
try:
conn = sqlite3.connect(task_status_store.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(sql_query)
rows = cursor.fetchall()
if not rows:
print(" (无结果)")
else:
print(f" {len(rows)} 条结果:")
for row in rows:
print(f" {dict(row)}")
conn.close()
except Exception as e:
print(f"❌ 查询失败: {e}")
def interactive_shell():
"""交互式数据库管理"""
print("\n🖥️ 交互式数据库管理")
print("输入 'help' 查看可用命令,输入 'quit' 退出")
while True:
try:
command = input("\n> ").strip()
if command.lower() in ['quit', 'exit', 'q']:
break
elif command.lower() == 'help':
print("""
可用命令:
view - 查看所有记录
stats - 查看统计信息
pending - 查看待处理任务
completed - 查看已完成任务
failed - 查看失败任务
sql <查询> - 执行SQL查询
cleanup <天数> - 清理N天前的记录
count - 统计总任务数
help - 显示帮助
quit/exit/q - 退出
""")
elif command.lower() == 'view':
view_database()
elif command.lower() == 'stats':
stats = task_status_store.get_statistics()
print(f"统计信息:")
print(f" 总任务数: {stats['total_tasks']}")
print(f" 状态分布: {stats['status_breakdown']}")
print(f" 最近24小时: {stats['recent_24h']}")
elif command.lower() == 'pending':
tasks = task_status_store.search_tasks(status="pending")
print(f"待处理任务 ({len(tasks)} 个):")
for task in tasks:
print(f" - {task['task_id']}: {task['unique_id']}")
elif command.lower() == 'completed':
tasks = task_status_store.search_tasks(status="completed")
print(f"已完成任务 ({len(tasks)} 个):")
for task in tasks:
print(f" - {task['task_id']}: {task['unique_id']}")
elif command.lower() == 'failed':
tasks = task_status_store.search_tasks(status="failed")
print(f"失败任务 ({len(tasks)} 个):")
for task in tasks:
print(f" - {task['task_id']}: {task['unique_id']}")
elif command.lower().startswith('sql '):
sql_query = command[4:]
run_query(sql_query)
elif command.lower().startswith('cleanup '):
try:
days = int(command[8:])
count = task_status_store.cleanup_old_tasks(days)
print(f"✅ 已清理 {count}{days} 天前的记录")
except ValueError:
print("❌ 请输入有效的天数")
elif command.lower() == 'count':
all_tasks = task_status_store.list_all()
print(f"总任务数: {len(all_tasks)}")
else:
print("❌ 未知命令,输入 'help' 查看帮助")
except KeyboardInterrupt:
print("\n👋 再见!")
break
except Exception as e:
print(f"❌ 执行错误: {e}")
def main():
"""主函数"""
import sys
if len(sys.argv) > 1:
if sys.argv[1] == 'view':
view_database()
elif sys.argv[1] == 'interactive':
interactive_shell()
else:
print("用法: python db_manager.py [view|interactive]")
else:
view_database()
interactive_shell()
if __name__ == "__main__":
main()