#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import json from typing import Dict, List, Any, Optional from openpyxl import load_workbook from datetime import datetime import logging logger = logging.getLogger(__name__) class ExcelQuestionReader: """Excel题库文件读取器,支持缓存机制""" def __init__(self, excel_path: str = "public/questions.xlsx"): self.excel_path = excel_path self._cache: Optional[Dict[str, Any]] = None self._cache_timestamp: Optional[float] = None self._cache_duration = 300 # 缓存5分钟 def _is_cache_valid(self) -> bool: """检查缓存是否有效""" if not self._cache or not self._cache_timestamp: return False # 检查文件修改时间 try: file_mtime = os.path.getmtime(self.excel_path) return file_mtime <= self._cache_timestamp and \ (datetime.now().timestamp() - self._cache_timestamp) < self._cache_duration except OSError: return False def _load_from_excel(self) -> Dict[str, Any]: """从Excel文件加载数据""" if not os.path.exists(self.excel_path): raise FileNotFoundError(f"Excel题库文件不存在: {self.excel_path}") try: wb = load_workbook(self.excel_path, read_only=True) ws = wb.active # 使用第一个工作表 # 获取表头 headers = [] for cell in ws[1]: headers.append(cell.value) # 读取数据并按标签分类 questions = { "基础题": [], "进阶题": [], "竞赛题": [] } for row in ws.iter_rows(min_row=2, values_only=True): if not row[0]: # 跳过空行 continue # 构建题目字典 question = {} for i, header in enumerate(headers): if i < len(row) and header is not None: # 处理空值 value = row[i] if row[i] is not None else "" question[str(header)] = value # 添加额外的字段以保持兼容性 if "标签" in question: question["题目标签"] = question["标签"] question["题目类型"] = self._determine_question_type(question["标签"]) # 根据题目类型分类 q_type = question.get("题目类型", "基础题") if q_type in questions: questions[q_type].append(question) wb.close() return questions except Exception as e: logger.error(f"读取Excel文件失败: {e}") raise def _determine_question_type(self, tag: str) -> str: """根据标签确定题目类型""" if "竞赛题" in tag: return "竞赛题" elif "进阶题" in tag: return "进阶题" else: return "基础题" def get_questions(self, force_reload: bool = False) -> Dict[str, Any]: """获取题目数据,支持缓存""" if not force_reload and self._is_cache_valid(): return self._cache try: questions = self._load_from_excel() self._cache = questions self._cache_timestamp = datetime.now().timestamp() return questions except Exception as e: logger.error(f"加载题目数据失败: {e}") # 如果有缓存数据,降级返回缓存 if self._cache: logger.warning("使用缓存数据作为降级方案") return self._cache raise def get_all_tags(self) -> List[str]: """获取所有标签""" questions = self.get_questions() all_tags = set() for category_questions in questions.values(): for question in category_questions: tag = question.get("标签", "") if tag: all_tags.add(tag) return sorted(list(all_tags)) def get_questions_by_tag(self, selected_tag: str) -> Dict[str, Any]: """根据标签筛选题目""" if not selected_tag or selected_tag == "全部题目": return self.get_questions() questions = self.get_questions() filtered_questions = { "基础题": [], "进阶题": [], "竞赛题": [] } for category, category_questions in questions.items(): for question in category_questions: question_tag = question.get("标签", "") if selected_tag in question_tag: filtered_questions[category].append(question) return filtered_questions def clear_cache(self): """清除缓存""" self._cache = None self._cache_timestamp = None # 全局实例 _excel_reader = ExcelQuestionReader() def get_questions_data(force_reload: bool = False) -> Dict[str, Any]: """获取题目数据(全局函数)""" return _excel_reader.get_questions(force_reload) def get_all_tags() -> List[str]: """获取所有标签(全局函数)""" return _excel_reader.get_all_tags() def get_questions_by_tag(selected_tag: str) -> Dict[str, Any]: """根据标签筛选题目(全局函数)""" return _excel_reader.get_questions_by_tag(selected_tag) def clear_cache(): """清除缓存(全局函数)""" _excel_reader.clear_cache()