289 lines
10 KiB
Python
289 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import json
|
||
from typing import Dict, List, Any, Optional
|
||
from openpyxl import load_workbook
|
||
from datetime import datetime
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class ExcelQuestionReader:
|
||
"""Excel题库文件读取器,支持缓存机制"""
|
||
|
||
def __init__(self, excel_path: str = "public/questions.xlsx"):
|
||
self.excel_path = excel_path
|
||
self._cache: Optional[Dict[str, Any]] = None
|
||
self._cache_timestamp: Optional[float] = None
|
||
self._cache_duration = 300 # 缓存5分钟
|
||
|
||
def _is_cache_valid(self) -> bool:
|
||
"""检查缓存是否有效"""
|
||
if not self._cache or not self._cache_timestamp:
|
||
return False
|
||
|
||
# 检查文件修改时间
|
||
try:
|
||
file_mtime = os.path.getmtime(self.excel_path)
|
||
return file_mtime <= self._cache_timestamp and \
|
||
(datetime.now().timestamp() - self._cache_timestamp) < self._cache_duration
|
||
except OSError:
|
||
return False
|
||
|
||
def _load_from_excel(self) -> Dict[str, Any]:
|
||
"""从Excel文件加载数据"""
|
||
if not os.path.exists(self.excel_path):
|
||
raise FileNotFoundError(f"Excel题库文件不存在: {self.excel_path}")
|
||
|
||
try:
|
||
wb = load_workbook(self.excel_path, read_only=True)
|
||
ws = wb.active # 使用第一个工作表
|
||
|
||
# 获取表头
|
||
headers = []
|
||
for cell in ws[1]:
|
||
headers.append(cell.value)
|
||
|
||
# 读取数据并按标签分类
|
||
questions = {
|
||
"基础题": [],
|
||
"进阶题": [],
|
||
"竞赛题": []
|
||
}
|
||
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
if not row[0]: # 跳过空行
|
||
continue
|
||
|
||
# 构建题目字典
|
||
question = {}
|
||
for i, header in enumerate(headers):
|
||
if i < len(row) and header is not None:
|
||
# 处理空值
|
||
value = row[i] if row[i] is not None else ""
|
||
question[str(header)] = value
|
||
|
||
# 添加额外的字段以保持兼容性
|
||
if "标签" in question:
|
||
question["题目标签"] = question["标签"]
|
||
|
||
# 使用题型字段确定题目类型,如果题型字段为空则使用标签
|
||
question_type = question.get("题型", "")
|
||
if not question_type and "标签" in question:
|
||
question_type = self._determine_question_type(question["标签"])
|
||
question["题目类型"] = question_type or "基础题"
|
||
|
||
# 根据题目类型分类
|
||
q_type = question.get("题目类型", "基础题")
|
||
if q_type in questions:
|
||
questions[q_type].append(question)
|
||
|
||
wb.close()
|
||
return questions
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取Excel文件失败: {e}")
|
||
raise
|
||
|
||
def _determine_question_type(self, tag: str) -> str:
|
||
"""根据标签确定题目类型"""
|
||
if "竞赛题" in tag:
|
||
return "竞赛题"
|
||
elif "进阶题" in tag:
|
||
return "进阶题"
|
||
else:
|
||
return "基础题"
|
||
|
||
def get_questions(self, force_reload: bool = False) -> Dict[str, Any]:
|
||
"""获取题目数据,支持缓存"""
|
||
if not force_reload and self._is_cache_valid():
|
||
return self._cache
|
||
|
||
try:
|
||
questions = self._load_from_excel()
|
||
self._cache = questions
|
||
self._cache_timestamp = datetime.now().timestamp()
|
||
return questions
|
||
except Exception as e:
|
||
logger.error(f"加载题目数据失败: {e}")
|
||
# 如果有缓存数据,降级返回缓存
|
||
if self._cache:
|
||
logger.warning("使用缓存数据作为降级方案")
|
||
return self._cache
|
||
raise
|
||
|
||
def get_all_tags(self) -> List[str]:
|
||
"""获取所有标签"""
|
||
questions = self.get_questions()
|
||
all_tags = set()
|
||
|
||
for category_questions in questions.values():
|
||
for question in category_questions:
|
||
tag = question.get("标签", "")
|
||
if tag:
|
||
all_tags.add(tag)
|
||
|
||
return sorted(list(all_tags))
|
||
|
||
def get_questions_by_filters(
|
||
self,
|
||
subject: str = "",
|
||
grade: str = "", # 完整的年级册次信息(如:一年级上册)
|
||
unit: str = ""
|
||
) -> Dict[str, Any]:
|
||
"""根据学科、年级册次、单元筛选题目"""
|
||
questions = self.get_questions()
|
||
filtered_questions = {
|
||
"基础题": [],
|
||
"进阶题": [],
|
||
"竞赛题": []
|
||
}
|
||
|
||
# 如果没有提供任何筛选条件,返回全部题目
|
||
if not any([subject, grade, unit]):
|
||
return questions
|
||
|
||
for category_name, category_questions in questions.items():
|
||
for question in category_questions:
|
||
# 检查是否符合所有筛选条件
|
||
match = True
|
||
|
||
# 学科筛选
|
||
if subject and question.get("学科", "") != subject:
|
||
match = False
|
||
|
||
# 年级册次筛选
|
||
if grade and question.get("年级", "") != grade:
|
||
match = False
|
||
|
||
# 单元筛选(包括期中、期末的特殊处理)
|
||
if unit:
|
||
if unit == "期中":
|
||
# 期中:选择前一半的单元
|
||
if not self._is_in_midterm_units(question, subject, grade):
|
||
match = False
|
||
elif unit == "期末":
|
||
# 期末:选择所有单元(已经通过了年级册次筛选)
|
||
pass # 不需要额外筛选
|
||
elif question.get("单元", "") != unit:
|
||
# 具体单元筛选
|
||
match = False
|
||
|
||
if match:
|
||
filtered_questions[category_name].append(question)
|
||
|
||
return filtered_questions
|
||
|
||
def _is_in_midterm_units(self, question: Dict[str, Any], subject: str, grade: str) -> bool:
|
||
"""判断题目是否属于期中考试范围(前一半单元)"""
|
||
unit = question.get("单元", "")
|
||
if not unit:
|
||
return False
|
||
|
||
# 获取当前年级册次的所有单元
|
||
all_questions = self.get_questions()
|
||
grade_units = set()
|
||
|
||
for category_questions in all_questions.values():
|
||
for q in category_questions:
|
||
if q.get("学科", "") == subject and q.get("年级", "") == grade:
|
||
unit_name = q.get("单元", "")
|
||
if unit_name:
|
||
grade_units.add(unit_name)
|
||
|
||
# 按单元数字排序
|
||
sorted_units = sorted(list(grade_units), key=lambda x: int(x.split('-')[0]) if x.split('-')[0].isdigit() else 0)
|
||
|
||
# 确定期中包含的单元(前一半)
|
||
mid_term_count = (len(sorted_units) + 1) // 2 # 向上取整
|
||
mid_term_units = set(sorted_units[:mid_term_count])
|
||
|
||
# 检查当前题目的单元是否在期中范围内
|
||
return unit in mid_term_units
|
||
|
||
def clear_cache(self):
|
||
"""清除缓存"""
|
||
self._cache = None
|
||
self._cache_timestamp = None
|
||
|
||
# 全局实例
|
||
_excel_reader = ExcelQuestionReader()
|
||
|
||
def get_questions_data(force_reload: bool = False) -> Dict[str, Any]:
|
||
"""获取题目数据(全局函数)"""
|
||
return _excel_reader.get_questions(force_reload)
|
||
|
||
def get_all_tags() -> List[str]:
|
||
"""获取所有标签(全局函数)"""
|
||
return _excel_reader.get_all_tags()
|
||
|
||
def get_questions_by_tag(selected_tag: str) -> Dict[str, Any]:
|
||
"""根据标签筛选题目(全局函数)"""
|
||
return _excel_reader.get_questions_by_tag(selected_tag)
|
||
|
||
def get_questions_by_filters(
|
||
subject: str = "",
|
||
grade: str = "", # 完整的年级册次信息(如:一年级上册)
|
||
unit: str = ""
|
||
) -> Dict[str, Any]:
|
||
"""根据学科、年级册次、单元筛选题目(全局函数)"""
|
||
return _excel_reader.get_questions_by_filters(
|
||
subject=subject,
|
||
grade=grade,
|
||
unit=unit
|
||
)
|
||
|
||
def get_available_filters() -> Dict[str, List[str]]:
|
||
"""获取所有可用的筛选条件(全局函数)"""
|
||
questions = _excel_reader.get_questions()
|
||
|
||
subjects = set()
|
||
grades = set()
|
||
units = set()
|
||
|
||
for category_questions in questions.values():
|
||
for question in category_questions:
|
||
# 收集所有学科
|
||
subject = question.get("学科", "")
|
||
if subject:
|
||
subjects.add(subject)
|
||
|
||
# 收集所有年级
|
||
grade = question.get("年级", "")
|
||
if grade:
|
||
grades.add(grade)
|
||
|
||
# 收集所有单元
|
||
unit = question.get("单元", "")
|
||
if unit:
|
||
units.add(unit)
|
||
|
||
return {
|
||
"subjects": sorted(list(subjects)),
|
||
"grades": _sort_grades_by_chinese_numbers(list(grades)),
|
||
"units": sorted(list(units))
|
||
}
|
||
|
||
def _sort_grades_by_chinese_numbers(grades):
|
||
"""按中文数字顺序排序年级"""
|
||
chinese_number_map = {
|
||
'一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8,
|
||
'九': 9, '十': 10, '十一': 11, '十二': 12
|
||
}
|
||
|
||
import re
|
||
|
||
def extract_chinese_number(grade_text):
|
||
"""从年级文本中提取中文数字"""
|
||
match = re.match(r'^([一二三四五六七八九十]+)', grade_text)
|
||
if match:
|
||
return chinese_number_map.get(match.group(1), 999)
|
||
return 999
|
||
|
||
return sorted(grades, key=extract_chinese_number)
|
||
|
||
def clear_cache():
|
||
"""清除缓存(全局函数)"""
|
||
_excel_reader.clear_cache() |