survey/excel_reader.py
2025-10-30 00:19:42 +08:00

167 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from typing import Dict, List, Any, Optional
from openpyxl import load_workbook
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ExcelQuestionReader:
"""Excel题库文件读取器支持缓存机制"""
def __init__(self, excel_path: str = "public/questions.xlsx"):
self.excel_path = excel_path
self._cache: Optional[Dict[str, Any]] = None
self._cache_timestamp: Optional[float] = None
self._cache_duration = 300 # 缓存5分钟
def _is_cache_valid(self) -> bool:
"""检查缓存是否有效"""
if not self._cache or not self._cache_timestamp:
return False
# 检查文件修改时间
try:
file_mtime = os.path.getmtime(self.excel_path)
return file_mtime <= self._cache_timestamp and \
(datetime.now().timestamp() - self._cache_timestamp) < self._cache_duration
except OSError:
return False
def _load_from_excel(self) -> Dict[str, Any]:
"""从Excel文件加载数据"""
if not os.path.exists(self.excel_path):
raise FileNotFoundError(f"Excel题库文件不存在: {self.excel_path}")
try:
wb = load_workbook(self.excel_path, read_only=True)
ws = wb.active # 使用第一个工作表
# 获取表头
headers = []
for cell in ws[1]:
headers.append(cell.value)
# 读取数据并按标签分类
questions = {
"基础题": [],
"进阶题": [],
"竞赛题": []
}
for row in ws.iter_rows(min_row=2, values_only=True):
if not row[0]: # 跳过空行
continue
# 构建题目字典
question = {}
for i, header in enumerate(headers):
if i < len(row) and header is not None:
# 处理空值
value = row[i] if row[i] is not None else ""
question[str(header)] = value
# 添加额外的字段以保持兼容性
if "标签" in question:
question["题目标签"] = question["标签"]
question["题目类型"] = self._determine_question_type(question["标签"])
# 根据题目类型分类
q_type = question.get("题目类型", "基础题")
if q_type in questions:
questions[q_type].append(question)
wb.close()
return questions
except Exception as e:
logger.error(f"读取Excel文件失败: {e}")
raise
def _determine_question_type(self, tag: str) -> str:
"""根据标签确定题目类型"""
if "竞赛题" in tag:
return "竞赛题"
elif "进阶题" in tag:
return "进阶题"
else:
return "基础题"
def get_questions(self, force_reload: bool = False) -> Dict[str, Any]:
"""获取题目数据,支持缓存"""
if not force_reload and self._is_cache_valid():
return self._cache
try:
questions = self._load_from_excel()
self._cache = questions
self._cache_timestamp = datetime.now().timestamp()
return questions
except Exception as e:
logger.error(f"加载题目数据失败: {e}")
# 如果有缓存数据,降级返回缓存
if self._cache:
logger.warning("使用缓存数据作为降级方案")
return self._cache
raise
def get_all_tags(self) -> List[str]:
"""获取所有标签"""
questions = self.get_questions()
all_tags = set()
for category_questions in questions.values():
for question in category_questions:
tag = question.get("标签", "")
if tag:
all_tags.add(tag)
return sorted(list(all_tags))
def get_questions_by_tag(self, selected_tag: str) -> Dict[str, Any]:
"""根据标签筛选题目"""
if not selected_tag or selected_tag == "全部题目":
return self.get_questions()
questions = self.get_questions()
filtered_questions = {
"基础题": [],
"进阶题": [],
"竞赛题": []
}
for category, category_questions in questions.items():
for question in category_questions:
question_tag = question.get("标签", "")
if selected_tag in question_tag:
filtered_questions[category].append(question)
return filtered_questions
def clear_cache(self):
"""清除缓存"""
self._cache = None
self._cache_timestamp = None
# 全局实例
_excel_reader = ExcelQuestionReader()
def get_questions_data(force_reload: bool = False) -> Dict[str, Any]:
"""获取题目数据(全局函数)"""
return _excel_reader.get_questions(force_reload)
def get_all_tags() -> List[str]:
"""获取所有标签(全局函数)"""
return _excel_reader.get_all_tags()
def get_questions_by_tag(selected_tag: str) -> Dict[str, Any]:
"""根据标签筛选题目(全局函数)"""
return _excel_reader.get_questions_by_tag(selected_tag)
def clear_cache():
"""清除缓存(全局函数)"""
_excel_reader.clear_cache()