survey/excel_reader.py
2025-11-15 23:51:08 +08:00

289 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from typing import Dict, List, Any, Optional
from openpyxl import load_workbook
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ExcelQuestionReader:
"""Excel题库文件读取器支持缓存机制"""
def __init__(self, excel_path: str = "public/questions.xlsx"):
self.excel_path = excel_path
self._cache: Optional[Dict[str, Any]] = None
self._cache_timestamp: Optional[float] = None
self._cache_duration = 300 # 缓存5分钟
def _is_cache_valid(self) -> bool:
"""检查缓存是否有效"""
if not self._cache or not self._cache_timestamp:
return False
# 检查文件修改时间
try:
file_mtime = os.path.getmtime(self.excel_path)
return file_mtime <= self._cache_timestamp and \
(datetime.now().timestamp() - self._cache_timestamp) < self._cache_duration
except OSError:
return False
def _load_from_excel(self) -> Dict[str, Any]:
"""从Excel文件加载数据"""
if not os.path.exists(self.excel_path):
raise FileNotFoundError(f"Excel题库文件不存在: {self.excel_path}")
try:
wb = load_workbook(self.excel_path, read_only=True)
ws = wb.active # 使用第一个工作表
# 获取表头
headers = []
for cell in ws[1]:
headers.append(cell.value)
# 读取数据并按标签分类
questions = {
"基础题": [],
"进阶题": [],
"竞赛题": []
}
for row in ws.iter_rows(min_row=2, values_only=True):
if not row[0]: # 跳过空行
continue
# 构建题目字典
question = {}
for i, header in enumerate(headers):
if i < len(row) and header is not None:
# 处理空值
value = row[i] if row[i] is not None else ""
question[str(header)] = value
# 添加额外的字段以保持兼容性
if "标签" in question:
question["题目标签"] = question["标签"]
# 使用题型字段确定题目类型,如果题型字段为空则使用标签
question_type = question.get("题型", "")
if not question_type and "标签" in question:
question_type = self._determine_question_type(question["标签"])
question["题目类型"] = question_type or "基础题"
# 根据题目类型分类
q_type = question.get("题目类型", "基础题")
if q_type in questions:
questions[q_type].append(question)
wb.close()
return questions
except Exception as e:
logger.error(f"读取Excel文件失败: {e}")
raise
def _determine_question_type(self, tag: str) -> str:
"""根据标签确定题目类型"""
if "竞赛题" in tag:
return "竞赛题"
elif "进阶题" in tag:
return "进阶题"
else:
return "基础题"
def get_questions(self, force_reload: bool = False) -> Dict[str, Any]:
"""获取题目数据,支持缓存"""
if not force_reload and self._is_cache_valid():
return self._cache
try:
questions = self._load_from_excel()
self._cache = questions
self._cache_timestamp = datetime.now().timestamp()
return questions
except Exception as e:
logger.error(f"加载题目数据失败: {e}")
# 如果有缓存数据,降级返回缓存
if self._cache:
logger.warning("使用缓存数据作为降级方案")
return self._cache
raise
def get_all_tags(self) -> List[str]:
"""获取所有标签"""
questions = self.get_questions()
all_tags = set()
for category_questions in questions.values():
for question in category_questions:
tag = question.get("标签", "")
if tag:
all_tags.add(tag)
return sorted(list(all_tags))
def get_questions_by_filters(
self,
subject: str = "",
grade: str = "", # 完整的年级册次信息(如:一年级上册)
unit: str = ""
) -> Dict[str, Any]:
"""根据学科、年级册次、单元筛选题目"""
questions = self.get_questions()
filtered_questions = {
"基础题": [],
"进阶题": [],
"竞赛题": []
}
# 如果没有提供任何筛选条件,返回全部题目
if not any([subject, grade, unit]):
return questions
for category_name, category_questions in questions.items():
for question in category_questions:
# 检查是否符合所有筛选条件
match = True
# 学科筛选
if subject and question.get("学科", "") != subject:
match = False
# 年级册次筛选
if grade and question.get("年级", "") != grade:
match = False
# 单元筛选(包括期中、期末的特殊处理)
if unit:
if unit == "期中":
# 期中:选择前一半的单元
if not self._is_in_midterm_units(question, subject, grade):
match = False
elif unit == "期末":
# 期末:选择所有单元(已经通过了年级册次筛选)
pass # 不需要额外筛选
elif question.get("单元", "") != unit:
# 具体单元筛选
match = False
if match:
filtered_questions[category_name].append(question)
return filtered_questions
def _is_in_midterm_units(self, question: Dict[str, Any], subject: str, grade: str) -> bool:
"""判断题目是否属于期中考试范围(前一半单元)"""
unit = question.get("单元", "")
if not unit:
return False
# 获取当前年级册次的所有单元
all_questions = self.get_questions()
grade_units = set()
for category_questions in all_questions.values():
for q in category_questions:
if q.get("学科", "") == subject and q.get("年级", "") == grade:
unit_name = q.get("单元", "")
if unit_name:
grade_units.add(unit_name)
# 按单元数字排序
sorted_units = sorted(list(grade_units), key=lambda x: int(x.split('-')[0]) if x.split('-')[0].isdigit() else 0)
# 确定期中包含的单元(前一半)
mid_term_count = (len(sorted_units) + 1) // 2 # 向上取整
mid_term_units = set(sorted_units[:mid_term_count])
# 检查当前题目的单元是否在期中范围内
return unit in mid_term_units
def clear_cache(self):
"""清除缓存"""
self._cache = None
self._cache_timestamp = None
# 全局实例
_excel_reader = ExcelQuestionReader()
def get_questions_data(force_reload: bool = False) -> Dict[str, Any]:
"""获取题目数据(全局函数)"""
return _excel_reader.get_questions(force_reload)
def get_all_tags() -> List[str]:
"""获取所有标签(全局函数)"""
return _excel_reader.get_all_tags()
def get_questions_by_tag(selected_tag: str) -> Dict[str, Any]:
"""根据标签筛选题目(全局函数)"""
return _excel_reader.get_questions_by_tag(selected_tag)
def get_questions_by_filters(
subject: str = "",
grade: str = "", # 完整的年级册次信息(如:一年级上册)
unit: str = ""
) -> Dict[str, Any]:
"""根据学科、年级册次、单元筛选题目(全局函数)"""
return _excel_reader.get_questions_by_filters(
subject=subject,
grade=grade,
unit=unit
)
def get_available_filters() -> Dict[str, List[str]]:
"""获取所有可用的筛选条件(全局函数)"""
questions = _excel_reader.get_questions()
subjects = set()
grades = set()
units = set()
for category_questions in questions.values():
for question in category_questions:
# 收集所有学科
subject = question.get("学科", "")
if subject:
subjects.add(subject)
# 收集所有年级
grade = question.get("年级", "")
if grade:
grades.add(grade)
# 收集所有单元
unit = question.get("单元", "")
if unit:
units.add(unit)
return {
"subjects": sorted(list(subjects)),
"grades": _sort_grades_by_chinese_numbers(list(grades)),
"units": sorted(list(units))
}
def _sort_grades_by_chinese_numbers(grades):
"""按中文数字顺序排序年级"""
chinese_number_map = {
'': 1, '': 2, '': 3, '': 4, '': 5, '': 6, '': 7, '': 8,
'': 9, '': 10, '十一': 11, '十二': 12
}
import re
def extract_chinese_number(grade_text):
"""从年级文本中提取中文数字"""
match = re.match(r'^([一二三四五六七八九十]+)', grade_text)
if match:
return chinese_number_map.get(match.group(1), 999)
return 999
return sorted(grades, key=extract_chinese_number)
def clear_cache():
"""清除缓存(全局函数)"""
_excel_reader.clear_cache()