#!/usr/bin/env python3 """ SQLite task status database management tool """ import sqlite3 import json import time from task_queue.task_status import task_status_store def view_database(): """View database contents""" print("SQLite task status database contents") print("=" * 40) print(f"Database path: {task_status_store.db_path}") # Connect to the database conn = sqlite3.connect(task_status_store.db_path) cursor = conn.cursor() # View table schema print(f"\nTable schema:") cursor.execute("PRAGMA table_info(task_status)") columns = cursor.fetchall() for col in columns: print(f" {col[1]} ({col[2]})") # View all records print(f"\nAll records:") cursor.execute("SELECT * FROM task_status ORDER BY updated_at DESC") rows = cursor.fetchall() if not rows: print(" (empty database)") else: print(f" Total {len(rows)} records:") for i, row in enumerate(rows): task_id, unique_id, status, created_at, updated_at, result, error = row created_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(created_at)) updated_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(updated_at)) print(f" {i+1}. {task_id}") print(f" Project ID: {unique_id}") print(f" Status: {status}") print(f" Created: {created_str}") print(f" Updated: {updated_str}") if result: try: result_data = json.loads(result) print(f" Result: {result_data.get('message', 'N/A')}") except: print(f" Result: {result[:50]}...") if error: print(f" Error: {error}") print() conn.close() def run_query(sql_query: str): """Run a custom query""" print(f"Running query: {sql_query}") try: conn = sqlite3.connect(task_status_store.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(sql_query) rows = cursor.fetchall() if not rows: print(" (no results)") else: print(f" {len(rows)} results:") for row in rows: print(f" {dict(row)}") conn.close() except Exception as e: print(f"Query failed: {e}") def interactive_shell(): """Interactive database management""" print("\n🖥️ Interactive database management") print("Type 'help' to view available commands, or 'quit' to exit") while True: try: command = input("\n> ").strip() if command.lower() in ['quit', 'exit', 'q']: break elif command.lower() == 'help': print(""" Available commands: view - View all records stats - View statistics pending - View pending tasks completed - View completed tasks failed - View failed tasks sql - Run an SQL query cleanup - Clean up records older than N days count - Count total tasks help - Show help quit/exit/q - Exit """) elif command.lower() == 'view': view_database() elif command.lower() == 'stats': stats = task_status_store.get_statistics() print(f"Statistics:") print(f" Total tasks: {stats['total_tasks']}") print(f" Status breakdown: {stats['status_breakdown']}") print(f" Last 24 hours: {stats['recent_24h']}") elif command.lower() == 'pending': tasks = task_status_store.search_tasks(status="pending") print(f"Pending tasks ({len(tasks)}):") for task in tasks: print(f" - {task['task_id']}: {task['unique_id']}") elif command.lower() == 'completed': tasks = task_status_store.search_tasks(status="completed") print(f"Completed tasks ({len(tasks)}):") for task in tasks: print(f" - {task['task_id']}: {task['unique_id']}") elif command.lower() == 'failed': tasks = task_status_store.search_tasks(status="failed") print(f"Failed tasks ({len(tasks)}):") for task in tasks: print(f" - {task['task_id']}: {task['unique_id']}") elif command.lower().startswith('sql '): sql_query = command[4:] run_query(sql_query) elif command.lower().startswith('cleanup '): try: days = int(command[8:]) count = task_status_store.cleanup_old_tasks(days) print(f"Cleaned up {count} records older than {days} days") except ValueError: print("Please enter a valid number of days") elif command.lower() == 'count': all_tasks = task_status_store.list_all() print(f"Total tasks: {len(all_tasks)}") else: print("Unknown command. Type 'help' for help") except KeyboardInterrupt: print("\nGoodbye!") break except Exception as e: print(f"Execution error: {e}") def main(): """Main function""" import sys if len(sys.argv) > 1: if sys.argv[1] == 'view': view_database() elif sys.argv[1] == 'interactive': interactive_shell() else: print("Usage: python db_manager.py [view|interactive]") else: view_database() interactive_shell() if __name__ == "__main__": main()