#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import json from typing import Dict, List, Any, Optional from openpyxl import load_workbook from datetime import datetime import logging logger = logging.getLogger(__name__) class ExcelQuestionReader: """Excel题库文件读取器,支持缓存机制""" def __init__(self, excel_path: str = "public/questions.xlsx"): self.excel_path = excel_path self._cache: Optional[Dict[str, Any]] = None self._cache_timestamp: Optional[float] = None self._cache_duration = 300 # 缓存5分钟 def _is_cache_valid(self) -> bool: """检查缓存是否有效""" if not self._cache or not self._cache_timestamp: return False # 检查文件修改时间 try: file_mtime = os.path.getmtime(self.excel_path) return file_mtime <= self._cache_timestamp and \ (datetime.now().timestamp() - self._cache_timestamp) < self._cache_duration except OSError: return False def _load_from_excel(self) -> Dict[str, Any]: """从Excel文件加载数据""" if not os.path.exists(self.excel_path): raise FileNotFoundError(f"Excel题库文件不存在: {self.excel_path}") try: wb = load_workbook(self.excel_path, read_only=True) ws = wb.active # 使用第一个工作表 # 获取表头 headers = [] for cell in ws[1]: headers.append(cell.value) # 读取数据并按标签分类 questions = { "基础题": [], "进阶题": [], "竞赛题": [] } for row in ws.iter_rows(min_row=2, values_only=True): if not row[0]: # 跳过空行 continue # 构建题目字典 question = {} for i, header in enumerate(headers): if i < len(row) and header is not None: # 处理空值 value = row[i] if row[i] is not None else "" question[str(header)] = value # 添加额外的字段以保持兼容性 if "标签" in question: question["题目标签"] = question["标签"] # 使用题型字段确定题目类型,如果题型字段为空则使用标签 question_type = question.get("题型", "") if not question_type and "标签" in question: question_type = self._determine_question_type(question["标签"]) question["题目类型"] = question_type or "基础题" # 根据题目类型分类 q_type = question.get("题目类型", "基础题") if q_type in questions: questions[q_type].append(question) wb.close() return questions except Exception as e: logger.error(f"读取Excel文件失败: {e}") raise def _determine_question_type(self, tag: str) -> str: """根据标签确定题目类型""" if "竞赛题" in tag: return "竞赛题" elif "进阶题" in tag: return "进阶题" else: return "基础题" def get_questions(self, force_reload: bool = False) -> Dict[str, Any]: """获取题目数据,支持缓存""" if not force_reload and self._is_cache_valid(): return self._cache try: questions = self._load_from_excel() self._cache = questions self._cache_timestamp = datetime.now().timestamp() return questions except Exception as e: logger.error(f"加载题目数据失败: {e}") # 如果有缓存数据,降级返回缓存 if self._cache: logger.warning("使用缓存数据作为降级方案") return self._cache raise def get_all_tags(self) -> List[str]: """获取所有标签""" questions = self.get_questions() all_tags = set() for category_questions in questions.values(): for question in category_questions: tag = question.get("标签", "") if tag: all_tags.add(tag) return sorted(list(all_tags)) def get_questions_by_filters( self, subject: str = "", grade: str = "", # 完整的年级册次信息(如:一年级上册) unit: str = "" ) -> Dict[str, Any]: """根据学科、年级册次、单元筛选题目""" questions = self.get_questions() filtered_questions = { "基础题": [], "进阶题": [], "竞赛题": [] } # 如果没有提供任何筛选条件,返回全部题目 if not any([subject, grade, unit]): return questions for category_name, category_questions in questions.items(): for question in category_questions: # 检查是否符合所有筛选条件 match = True # 学科筛选 if subject and question.get("学科", "") != subject: match = False # 年级册次筛选 if grade and question.get("年级", "") != grade: match = False # 单元筛选(包括期中、期末的特殊处理) if unit: if unit == "期中": # 期中:选择前一半的单元 if not self._is_in_midterm_units(question, subject, grade): match = False elif unit == "期末": # 期末:选择所有单元(已经通过了年级册次筛选) pass # 不需要额外筛选 elif question.get("单元", "") != unit: # 具体单元筛选 match = False if match: filtered_questions[category_name].append(question) return filtered_questions def _is_in_midterm_units(self, question: Dict[str, Any], subject: str, grade: str) -> bool: """判断题目是否属于期中考试范围(前一半单元)""" unit = question.get("单元", "") if not unit: return False # 获取当前年级册次的所有单元 all_questions = self.get_questions() grade_units = set() for category_questions in all_questions.values(): for q in category_questions: if q.get("学科", "") == subject and q.get("年级", "") == grade: unit_name = q.get("单元", "") if unit_name: grade_units.add(unit_name) # 按单元数字排序 sorted_units = sorted(list(grade_units), key=lambda x: int(x.split('-')[0]) if x.split('-')[0].isdigit() else 0) # 确定期中包含的单元(前一半) mid_term_count = (len(sorted_units) + 1) // 2 # 向上取整 mid_term_units = set(sorted_units[:mid_term_count]) # 检查当前题目的单元是否在期中范围内 return unit in mid_term_units def clear_cache(self): """清除缓存""" self._cache = None self._cache_timestamp = None # 全局实例 _excel_reader = ExcelQuestionReader() def get_questions_data(force_reload: bool = False) -> Dict[str, Any]: """获取题目数据(全局函数)""" return _excel_reader.get_questions(force_reload) def get_all_tags() -> List[str]: """获取所有标签(全局函数)""" return _excel_reader.get_all_tags() def get_questions_by_tag(selected_tag: str) -> Dict[str, Any]: """根据标签筛选题目(全局函数)""" return _excel_reader.get_questions_by_tag(selected_tag) def get_questions_by_filters( subject: str = "", grade: str = "", # 完整的年级册次信息(如:一年级上册) unit: str = "" ) -> Dict[str, Any]: """根据学科、年级册次、单元筛选题目(全局函数)""" return _excel_reader.get_questions_by_filters( subject=subject, grade=grade, unit=unit ) def get_available_filters() -> Dict[str, List[str]]: """获取所有可用的筛选条件(全局函数)""" questions = _excel_reader.get_questions() subjects = set() grades = set() units = set() for category_questions in questions.values(): for question in category_questions: # 收集所有学科 subject = question.get("学科", "") if subject: subjects.add(subject) # 收集所有年级 grade = question.get("年级", "") if grade: grades.add(grade) # 收集所有单元 unit = question.get("单元", "") if unit: units.add(unit) return { "subjects": sorted(list(subjects)), "grades": _sort_grades_by_chinese_numbers(list(grades)), "units": sorted(list(units)) } def _sort_grades_by_chinese_numbers(grades): """按中文数字顺序排序年级""" chinese_number_map = { '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10, '十一': 11, '十二': 12 } import re def extract_chinese_number(grade_text): """从年级文本中提取中文数字""" match = re.match(r'^([一二三四五六七八九十]+)', grade_text) if match: return chinese_number_map.get(match.group(1), 999) return 999 return sorted(grades, key=extract_chinese_number) def clear_cache(): """清除缓存(全局函数)""" _excel_reader.clear_cache()