diff --git a/README.md b/README.md index 9b13af5..4c3d397 100644 --- a/README.md +++ b/README.md @@ -132,14 +132,23 @@ A: 确保服务器正在运行,检查Docker容器状态或使用本地开发 A: 检查网络连接和外部API是否可访问 **Q: 题目加载失败** -A: 确保`public/questions.json`文件存在且格式正确 +A: 确保`public/questions.xlsx`文件存在且格式正确,系统已升级为直接读取Excel文件 ### 开发环境要求 - Python 3.12+ - Poetry依赖管理 +- openpyxl库(用于读取Excel文件) - 网络连接(用于AI API调用) +### 题库管理 + +系统现在直接使用Excel文件作为题库: +- **题库文件**: `public/questions.xlsx` +- **工作表名称**: "单选题" +- **字段**: 序号、题干、选项A-H、解析、分数、答案、标签等 +- **自动分类**: 系统会根据标签自动将题目分为基础题、进阶题、竞赛题 + --- 🎉 现在您可以开始使用完整的学科能力测评系统了! \ No newline at end of file diff --git a/enhanced_survey_system.py b/enhanced_survey_system.py index c8fd1dc..5e700a5 100644 --- a/enhanced_survey_system.py +++ b/enhanced_survey_system.py @@ -113,16 +113,16 @@ class ReportGenerator: analysis_text = "# 考试答题情况分析\n\n" # 添加总分信息 - total_score = student_info.get('total_score', 0) if student_info else 0 + total_score = student_info['total_score'] if student_info and student_info['total_score'] is not None else 0 analysis_text += f"## 总分:{total_score} 分\n\n" analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n" analysis_text += "|------|------|----------|----------|----------|------|\n" # 使用独立字段中的用户信息 - student_name = result['student_name'] if result else student_info.get('name', '未知') - student_school = result['student_school'] if result else student_info.get('school', '未知') - student_grade = result['student_grade'] if result else student_info.get('grade', '未知') + student_name = result['student_name'] if result else (student_info['name'] if student_info and student_info['name'] is not None else '未知') + student_school = result['student_school'] if result else (student_info['school'] if student_info and student_info['school'] is not None else '未知') + student_grade = result['student_grade'] if result else (student_info['grade'] if student_info and student_info['grade'] is not None else '未知') selected_tag = result['selected_tag'] if result else '未指定' # 添加基本信息 @@ -176,6 +176,7 @@ class ReportGenerator: async def generate_report(self, session_id): """生成测评报告""" + analysis_data = None try: # 获取会话数据 analysis_data = self.generate_analysis_text({'id': session_id}) @@ -201,7 +202,8 @@ class ReportGenerator: except Exception as e: print(f"生成报告失败: {e}") # 保存分析数据到数据库,允许后续重新生成 - self.save_analysis_data_for_regeneration(session_id, analysis_data) + if analysis_data: + self.save_analysis_data_for_regeneration(session_id, analysis_data) return { 'success': False, 'error': str(e), diff --git a/excel_reader.py b/excel_reader.py new file mode 100644 index 0000000..20d8ddf --- /dev/null +++ b/excel_reader.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import json +from typing import Dict, List, Any, Optional +from openpyxl import load_workbook +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + +class ExcelQuestionReader: + """Excel题库文件读取器,支持缓存机制""" + + def __init__(self, excel_path: str = "public/questions.xlsx"): + self.excel_path = excel_path + self._cache: Optional[Dict[str, Any]] = None + self._cache_timestamp: Optional[float] = None + self._cache_duration = 300 # 缓存5分钟 + + def _is_cache_valid(self) -> bool: + """检查缓存是否有效""" + if not self._cache or not self._cache_timestamp: + return False + + # 检查文件修改时间 + try: + file_mtime = os.path.getmtime(self.excel_path) + return file_mtime <= self._cache_timestamp and \ + (datetime.now().timestamp() - self._cache_timestamp) < self._cache_duration + except OSError: + return False + + def _load_from_excel(self) -> Dict[str, Any]: + """从Excel文件加载数据""" + if not os.path.exists(self.excel_path): + raise FileNotFoundError(f"Excel题库文件不存在: {self.excel_path}") + + try: + wb = load_workbook(self.excel_path, read_only=True) + ws = wb.active # 使用第一个工作表 + + # 获取表头 + headers = [] + for cell in ws[1]: + headers.append(cell.value) + + # 读取数据并按标签分类 + questions = { + "基础题": [], + "进阶题": [], + "竞赛题": [] + } + + for row in ws.iter_rows(min_row=2, values_only=True): + if not row[0]: # 跳过空行 + continue + + # 构建题目字典 + question = {} + for i, header in enumerate(headers): + if i < len(row) and header is not None: + # 处理空值 + value = row[i] if row[i] is not None else "" + question[str(header)] = value + + # 添加额外的字段以保持兼容性 + if "标签" in question: + question["题目标签"] = question["标签"] + question["题目类型"] = self._determine_question_type(question["标签"]) + + # 根据题目类型分类 + q_type = question.get("题目类型", "基础题") + if q_type in questions: + questions[q_type].append(question) + + wb.close() + return questions + + except Exception as e: + logger.error(f"读取Excel文件失败: {e}") + raise + + def _determine_question_type(self, tag: str) -> str: + """根据标签确定题目类型""" + if "竞赛题" in tag: + return "竞赛题" + elif "进阶题" in tag: + return "进阶题" + else: + return "基础题" + + def get_questions(self, force_reload: bool = False) -> Dict[str, Any]: + """获取题目数据,支持缓存""" + if not force_reload and self._is_cache_valid(): + return self._cache + + try: + questions = self._load_from_excel() + self._cache = questions + self._cache_timestamp = datetime.now().timestamp() + return questions + except Exception as e: + logger.error(f"加载题目数据失败: {e}") + # 如果有缓存数据,降级返回缓存 + if self._cache: + logger.warning("使用缓存数据作为降级方案") + return self._cache + raise + + def get_all_tags(self) -> List[str]: + """获取所有标签""" + questions = self.get_questions() + all_tags = set() + + for category_questions in questions.values(): + for question in category_questions: + tag = question.get("标签", "") + if tag: + all_tags.add(tag) + + return sorted(list(all_tags)) + + def get_questions_by_tag(self, selected_tag: str) -> Dict[str, Any]: + """根据标签筛选题目""" + if not selected_tag or selected_tag == "全部题目": + return self.get_questions() + + questions = self.get_questions() + filtered_questions = { + "基础题": [], + "进阶题": [], + "竞赛题": [] + } + + for category, category_questions in questions.items(): + for question in category_questions: + question_tag = question.get("标签", "") + if selected_tag in question_tag: + filtered_questions[category].append(question) + + return filtered_questions + + def clear_cache(self): + """清除缓存""" + self._cache = None + self._cache_timestamp = None + +# 全局实例 +_excel_reader = ExcelQuestionReader() + +def get_questions_data(force_reload: bool = False) -> Dict[str, Any]: + """获取题目数据(全局函数)""" + return _excel_reader.get_questions(force_reload) + +def get_all_tags() -> List[str]: + """获取所有标签(全局函数)""" + return _excel_reader.get_all_tags() + +def get_questions_by_tag(selected_tag: str) -> Dict[str, Any]: + """根据标签筛选题目(全局函数)""" + return _excel_reader.get_questions_by_tag(selected_tag) + +def clear_cache(): + """清除缓存(全局函数)""" + _excel_reader.clear_cache() \ No newline at end of file diff --git a/generate/analyze_tags.py b/generate/analyze_tags.py index ea42185..e60a2ce 100644 --- a/generate/analyze_tags.py +++ b/generate/analyze_tags.py @@ -2,11 +2,16 @@ # -*- coding: utf-8 -*- import json +import sys +import os + +# 添加父目录到路径以便导入模块 +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from excel_reader import get_questions_data def analyze_tags(): """分析题库中的所有标签""" - with open('/Users/moshui/Documents/survey/public/questions.json', 'r', encoding='utf-8') as f: - questions = json.load(f) + questions = get_questions_data() all_tags = set() tag_counts = {} diff --git a/main.py b/main.py index 9b9df60..c7f2070 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ from typing import Dict, List, Optional, Any import asyncio import threading from enhanced_survey_system import enhanced_system, get_east8_time_string, get_east8_time +from excel_reader import get_questions_data, get_all_tags, get_questions_by_tag app = FastAPI(title="Enhanced Survey System") @@ -314,8 +315,7 @@ async def delete_report(report_id: str): async def get_questions(): """获取题库数据""" try: - with open('public/questions.json', 'r', encoding='utf-8') as f: - questions_data = json.load(f) + questions_data = get_questions_data() return questions_data except FileNotFoundError: raise HTTPException(status_code=404, detail="题库文件不存在") @@ -330,22 +330,12 @@ async def get_tags(): tags_data = json.load(f) return tags_data except FileNotFoundError: - # 如果tags.json不存在,生成备用标签数据 + # 如果tags.json不存在,从Excel生成备用标签数据 try: - with open('public/questions.json', 'r', encoding='utf-8') as f: - questions_data = json.load(f) - - all_tags = set() - for questions in questions_data.values(): - for question in questions: - tags = question.get('题目标签', '') or question.get('标签', '') - if tags: - for tag in tags.split(r'[\s,,]+'): - if tag.strip(): - all_tags.add(tag.strip()) + all_tags = get_all_tags() backup_tags = { - "tags": sorted(list(all_tags)), + "tags": all_tags, "tag_counts": {}, "total_unique_tags": len(all_tags) } @@ -381,12 +371,8 @@ async def get_filtered_questions(session_id: str): selected_tag = session_data['selected_tag'] or '' questions_config = json.loads(session_data['questions_config']) - # 加载所有题目 - with open('public/questions.json', 'r', encoding='utf-8') as f: - all_questions = json.load(f) - - # 根据标签筛选题目 - filtered_questions = filter_questions_by_tag(all_questions, selected_tag) + # 根据标签筛选题目(直接从Excel读取) + filtered_questions = get_questions_by_tag(selected_tag) # 根据配置选择题目 selected_questions = select_questions_by_config(filtered_questions, questions_config) @@ -404,28 +390,6 @@ async def get_filtered_questions(session_id: str): except Exception as e: raise HTTPException(status_code=500, detail=f"获取筛选题目失败: {str(e)}") -def filter_questions_by_tag(all_questions: Dict, selected_tag: str) -> Dict: - """根据标签筛选题目""" - if not selected_tag: - return all_questions - - filtered = { - "基础题": [], - "进阶题": [], - "竞赛题": [] - } - - for question_type in ["基础题", "进阶题", "竞赛题"]: - for question in all_questions.get(question_type, []): - question_tags = question.get('题目标签', '') or question.get('标签', '') - # 正确分割标签(按空格、逗号等分隔符) - if isinstance(question_tags, str): - import re - tag_list = re.split(r'[\s,,]+', question_tags.strip()) - if selected_tag in tag_list: - filtered[question_type].append(question) - - return filtered def select_questions_by_config(filtered_questions: Dict, questions_config: Dict) -> List[Dict]: """根据配置从筛选后的题目中选择题目""" diff --git a/public/questions.json b/public/questions.json.backup similarity index 100% rename from public/questions.json rename to public/questions.json.backup diff --git a/public/questions.xlsx b/public/questions.xlsx new file mode 100644 index 0000000..9f35265 Binary files /dev/null and b/public/questions.xlsx differ diff --git a/public/tags.json b/public/tags.json index 91b38f2..5dd5982 100644 --- a/public/tags.json +++ b/public/tags.json @@ -1,42 +1,42 @@ { "tags": [ "进阶题", - "竞赛题", "基础题", + "竞赛题", "知识记忆", "探究思维", "生活关联", "科学态度", "科学视野", "创新应用", - "一年级上册1-周围的植物", - "四年级上册1-声音", - "四年级上册3-运动和力", - "八年级上册4-水与人类", - "六年级上册1-微小世界", - "三年级上册2-水", - "二年级上册1-造房子", - "六年级上册4-能量", - "五年级上册4-健康生活", - "五年级上册3-计量时间", "六年级上册3-工具与技术", - "七年级上册2-丰富多彩的生命世界", - "三年级上册3-物体的运动", - "八年级上册2-力与空间探索", - "二年级上册2-地球家园", "七年级上册5-探索技术与工程的世界", + "三年级上册2-水", + "七年级上册2-丰富多彩的生命世界", "八年级上册1-对环境的察觉", - "四年级上册2-呼吸与消化", + "八年级上册4-水与人类", "七年级上册1-探索自然科学", - "五年级上册1-光", - "八年级上册5-建筑机构与工程", - "五年级上册2-地球表面的变化", - "八年级上册3-电路探秘", - "七年级上册3-广袤浩瀚的宇宙", - "六年级上册2-地球的运动", - "三年级上册1-天气", + "八年级上册2-力与空间探索", "一年级上册2-我们自己", + "五年级上册4-健康生活", + "六年级上册1-微小世界", + "六年级上册2-地球的运动", + "四年级上册1-声音", "七年级上册4-多种多样的运动", + "五年级上册1-光", + "三年级上册3-物体的运动", + "八年级上册5-建筑机构与工程", + "二年级上册2-地球家园", + "七年级上册3-广袤浩瀚的宇宙", + "八年级上册3-电路探秘", + "四年级上册2-呼吸与消化", + "四年级上册3-运动和力", + "五年级上册2-地球表面的变化", + "六年级上册4-能量", + "五年级上册3-计量时间", + "一年级上册1-周围的植物", + "三年级上册1-天气", + "二年级上册1-造房子", "观察能力", "创新设计", "操作技能", @@ -54,8 +54,8 @@ "数据处理", "实验技能", "空间思维", - "数据分析", - "辩证思维" + "辩证思维", + "数据分析" ], "tag_counts": { "基础题": 560, diff --git a/validate_system.py b/validate_system.py index 837c367..99fa8de 100644 --- a/validate_system.py +++ b/validate_system.py @@ -33,19 +33,19 @@ def validate_system(): except Exception as e: tests.append(("❌ 测评配置页", str(e))) - # 3. 测试题库文件 + # 3. 测试题库API(通过API接口验证Excel数据) try: - response = requests.get(f"{base_url}/public/questions.json", timeout=5) + response = requests.get(f"{base_url}/api/questions", timeout=5) if response.status_code == 200: data = response.json() basic_count = len(data.get("基础题", [])) advanced_count = len(data.get("进阶题", [])) contest_count = len(data.get("竞赛题", [])) - tests.append(("✅ 题库文件", f"基础题:{basic_count} 进阶题:{advanced_count} 竞赛题:{contest_count}")) + tests.append(("✅ 题库API", f"基础题:{basic_count} 进阶题:{advanced_count} 竞赛题:{contest_count}")) else: - tests.append(("❌ 题库文件", f"HTTP {response.status_code}")) + tests.append(("❌ 题库API", f"HTTP {response.status_code}")) except Exception as e: - tests.append(("❌ 题库文件", str(e))) + tests.append(("❌ 题库API", str(e))) # 4. 测试创建会话API try: