#!/usr/bin/env python3 """ 任务状态SQLite存储系统 """ import json import os import sqlite3 import time from typing import Dict, Optional, Any, List from pathlib import Path class TaskStatusStore: """基于SQLite的任务状态存储器""" def __init__(self, db_path: str = "projects/queue_data/task_status.db"): self.db_path = db_path # 确保目录存在 Path(db_path).parent.mkdir(parents=True, exist_ok=True) self._init_database() def _init_database(self): """初始化数据库表""" with sqlite3.connect(self.db_path) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS task_status ( task_id TEXT PRIMARY KEY, unique_id TEXT NOT NULL, status TEXT NOT NULL, created_at REAL NOT NULL, updated_at REAL NOT NULL, result TEXT, error TEXT ) ''') conn.commit() def set_status(self, task_id: str, unique_id: str, status: str, result: Optional[Dict] = None, error: Optional[str] = None): """设置任务状态""" current_time = time.time() with sqlite3.connect(self.db_path) as conn: conn.execute(''' INSERT OR REPLACE INTO task_status (task_id, unique_id, status, created_at, updated_at, result, error) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( task_id, unique_id, status, current_time, current_time, json.dumps(result) if result else None, error )) conn.commit() def get_status(self, task_id: str) -> Optional[Dict]: """获取任务状态""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( 'SELECT * FROM task_status WHERE task_id = ?', (task_id,) ) row = cursor.fetchone() if not row: return None result = dict(row) # 解析JSON字段 if result['result']: result['result'] = json.loads(result['result']) return result def update_status(self, task_id: str, status: str, result: Optional[Dict] = None, error: Optional[str] = None): """更新任务状态""" with sqlite3.connect(self.db_path) as conn: # 检查任务是否存在 cursor = conn.execute( 'SELECT task_id FROM task_status WHERE task_id = ?', (task_id,) ) if not cursor.fetchone(): return False # 更新状态 conn.execute(''' UPDATE task_status SET status = ?, updated_at = ?, result = ?, error = ? WHERE task_id = ? ''', ( status, time.time(), json.dumps(result) if result else None, error, task_id )) conn.commit() return True def delete_status(self, task_id: str): """删除任务状态""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( 'DELETE FROM task_status WHERE task_id = ?', (task_id,) ) conn.commit() return cursor.rowcount > 0 def list_all(self) -> Dict[str, Dict]: """列出所有任务状态""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( 'SELECT * FROM task_status ORDER BY updated_at DESC' ) all_tasks = {} for row in cursor: result = dict(row) # 解析JSON字段 if result['result']: result['result'] = json.loads(result['result']) all_tasks[result['task_id']] = result return all_tasks def get_by_unique_id(self, unique_id: str) -> List[Dict]: """根据项目ID获取所有相关任务""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( 'SELECT * FROM task_status WHERE unique_id = ? ORDER BY updated_at DESC', (unique_id,) ) tasks = [] for row in cursor: result = dict(row) if result['result']: result['result'] = json.loads(result['result']) tasks.append(result) return tasks def cleanup_old_tasks(self, older_than_days: int = 7) -> int: """清理旧任务记录""" cutoff_time = time.time() - (older_than_days * 24 * 3600) with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( 'DELETE FROM task_status WHERE updated_at < ?', (cutoff_time,) ) conn.commit() return cursor.rowcount def get_statistics(self) -> Dict[str, Any]: """获取任务统计信息""" with sqlite3.connect(self.db_path) as conn: # 总任务数 total = conn.execute('SELECT COUNT(*) FROM task_status').fetchone()[0] # 按状态分组统计 status_stats = conn.execute(''' SELECT status, COUNT(*) as count FROM task_status GROUP BY status ''').fetchall() # 最近24小时的任务 recent = time.time() - (24 * 3600) recent_tasks = conn.execute( 'SELECT COUNT(*) FROM task_status WHERE updated_at > ?', (recent,) ).fetchone()[0] return { 'total_tasks': total, 'status_breakdown': dict(status_stats), 'recent_24h': recent_tasks, 'database_path': self.db_path } def search_tasks(self, status: Optional[str] = None, unique_id: Optional[str] = None, limit: int = 100) -> List[Dict]: """搜索任务""" query = 'SELECT * FROM task_status WHERE 1=1' params = [] if status: query += ' AND status = ?' params.append(status) if unique_id: query += ' AND unique_id = ?' params.append(unique_id) query += ' ORDER BY updated_at DESC LIMIT ?' params.append(limit) with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute(query, params) tasks = [] for row in cursor: result = dict(row) if result['result']: result['result'] = json.loads(result['result']) tasks.append(result) return tasks # 全局状态存储实例 task_status_store = TaskStatusStore()