survey/main.py
2025-11-30 23:12:06 +08:00

815 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import uuid
import os
from datetime import datetime
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Optional, Any
import asyncio
import threading
from enhanced_survey_system import enhanced_system, get_east8_time_string, get_east8_time
from excel_reader import get_questions_data, get_questions_by_tag, get_questions_by_filters, get_available_filters
app = FastAPI(title="Enhanced Survey System")
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载静态文件
if os.path.exists("public"):
app.mount("/public", StaticFiles(directory="public"), name="public")
# 模板配置
templates = Jinja2Templates(directory="public" if os.path.exists("public") else ".")
# 数据模型
class CreateSessionRequest(BaseModel):
name: str
school: str
grade: str
phone: str = ""
selectedSubject: str = "" # 学科
selectedSemester: str = "" # 册次(保留兼容性)
selectedExamType: str = "" # 考试类型(保留兼容性)
selectedUnit: str = "" # 单元
selectedCategory: str = "" # 分类
selectedQuestionType: str = "" # 题型
selectedTag: str = ""
selectedTagsList: List[str] = []
questionsConfig: Dict[str, int]
totalQuestions: int = 0 # 总题目数
totalScore: int = 0 # 总分数
class SaveAnswersRequest(BaseModel):
sessionId: str
answers: List[Dict[str, Any]]
class GenerateReportRequest(BaseModel):
sessionId: str
class ReportCallbackRequest(BaseModel):
session_id: str
data: Dict[str, Any]
class SessionResponse(BaseModel):
success: bool
sessionId: str
studentId: str
class SaveAnswersResponse(BaseModel):
success: bool
totalScore: int
sessionId: str
class ApiResponse(BaseModel):
success: bool
message: str
# API端点
@app.post("/api/create-session", response_model=SessionResponse)
async def create_session(request: CreateSessionRequest):
"""创建答题会话"""
name = request.name.strip()
school = request.school.strip()
grade = request.grade.strip()
phone = request.phone.strip()
selected_subject = request.selectedSubject.strip()
selected_semester = request.selectedSemester.strip()
selected_unit = request.selectedUnit.strip()
# 构建描述性标签
selected_tag = ""
filters = []
if selected_subject: filters.append(f"学科:{selected_subject}")
if selected_semester: filters.append(f"年级:{selected_semester}") # 这里实际是完整年级信息
if selected_unit: filters.append(f"单元:{selected_unit}")
selected_tag = " | ".join(filters) if filters else "全部题目"
questions_config = request.questionsConfig
if not name or not school or not grade or not phone:
raise HTTPException(status_code=400, detail="姓名、学校、年级和手机号不能为空")
# 创建学员记录
student_id = str(uuid.uuid4())
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO students (id, name, school, grade, phone, selected_tag)
VALUES (?, ?, ?, ?, ?, ?)
''', (student_id, name, school, grade, phone, selected_tag))
# 创建答题会话
session_id = str(uuid.uuid4())
# 如果前端没有传递总题目数和总分数,则从配置中计算
if request.totalQuestions == 0 or request.totalScore == 0:
# 根据题目配置计算总题目数和总分数
total_questions = sum(questions_config.values())
# 基础题5分进阶题10分竞赛题15分
total_score = (questions_config.get('基础题', 0) * 5 +
questions_config.get('进阶题', 0) * 10 +
questions_config.get('竞赛题', 0) * 15)
else:
total_questions = request.totalQuestions
total_score = request.totalScore
cursor.execute('''
INSERT INTO quiz_sessions (id, student_id, questions_config, status, total_questions, max_score)
VALUES (?, ?, ?, 'created', ?, ?)
''', (session_id, student_id, json.dumps(questions_config), total_questions, total_score))
conn.commit()
conn.close()
return SessionResponse(
success=True,
sessionId=session_id,
studentId=student_id
)
def async_generate_report(session_id: str):
"""异步生成报告"""
try:
asyncio.run(enhanced_system.auto_generate_report(session_id))
print(f"报告生成成功: {session_id}")
except Exception as e:
print(f"报告生成失败: {e}")
@app.post("/api/save-answers", response_model=SaveAnswersResponse)
async def save_answers(request: SaveAnswersRequest):
"""保存答题结果"""
session_id = request.sessionId
answers = request.answers
if not session_id or not answers:
raise HTTPException(status_code=400, detail="会话ID和答题结果不能为空")
# 保存答题结果 - JSON格式存储包含用户信息
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row # 设置行工厂以支持字典访问
cursor = conn.cursor()
# 获取用户信息和会话信息
cursor.execute('''
SELECT s.name, s.school, s.grade, s.selected_tag, qs.questions_config
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_info = cursor.fetchone()
if not session_info:
raise HTTPException(status_code=404, detail="会话不存在")
student_name = session_info['name']
student_school = session_info['school']
student_grade = session_info['grade']
selected_tag = session_info['selected_tag']
total_score = 0
# 处理答题数据
processed_answers = []
for answer in answers:
# 兼容驼峰和下划线命名
user_answer = answer.get('userAnswer') or answer.get('user_answer', '')
correct_answer = answer.get('correctAnswer') or answer.get('correct_answer', '')
question_id = answer.get('questionId') or answer.get('question_id', '')
question_text = answer.get('questionText') or answer.get('question_text', '')
question_type = answer.get('questionType') or answer.get('question_type', '')
options = answer.get('options', {}) # 获取选项数据
is_correct = user_answer == correct_answer
score = answer.get('score', 0) if is_correct else 0
total_score += score
processed_answers.append({
'questionId': question_id,
'questionText': question_text,
'questionType': question_type,
'userAnswer': user_answer,
'correctAnswer': correct_answer,
'isCorrect': is_correct,
'score': score,
'options': options # 保存选项数据
})
# 将整个答题数据保存为JSON
answers_json = json.dumps(processed_answers, ensure_ascii=False)
answer_id = str(uuid.uuid4())
# 使用INSERT OR REPLACE确保每个session只有一条记录包含用户信息
cursor.execute('''
INSERT OR REPLACE INTO quiz_answers
(id, session_id, student_name, student_school, student_grade, selected_tag, answers_data, total_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (answer_id, session_id, student_name, student_school, student_grade, selected_tag, answers_json, total_score))
# 更新会话状态
cursor.execute('''
UPDATE quiz_sessions
SET status = 'completed', total_score = ?, completed_at = ?
WHERE id = ?
''', (total_score, get_east8_time_string(), session_id))
conn.commit()
conn.close()
# 异步生成报告
threading.Thread(target=async_generate_report, args=(session_id,), daemon=True).start()
return SaveAnswersResponse(
success=True,
totalScore=total_score,
sessionId=session_id
)
@app.post("/api/generate-report", response_model=ApiResponse)
async def generate_report(request: GenerateReportRequest):
"""手动生成报告"""
session_id = request.sessionId
if not session_id:
raise HTTPException(status_code=400, detail="会话ID不能为空")
# 同步生成报告
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(enhanced_system.auto_generate_report(session_id))
loop.close()
return ApiResponse(
success=result['success'],
message='报告生成成功' if result['success'] else result['error']
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/regenerate-report", response_model=ApiResponse)
async def regenerate_report(request: GenerateReportRequest):
"""处理重新生成报告请求"""
session_id = request.sessionId
if not session_id:
raise HTTPException(status_code=400, detail="会话ID不能为空")
# 同步重新生成报告
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(enhanced_system.regenerate_report(session_id))
loop.close()
response = ApiResponse(
success=result['success'],
message='报告重新生成成功' if result['success'] else result['error']
)
if result['success']:
# 使用额外的字段来存储report_id
response.extra = {"reportId": result.get('report_id')}
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/sessions-can-regenerate")
async def get_sessions_can_regenerate():
"""获取可以重新生成的会话列表"""
try:
sessions = enhanced_system.get_sessions_can_regenerate()
return {
'success': True,
'sessions': sessions
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/reports")
async def get_reports(page: int = 1, page_size: int = 10):
"""获取报告列表"""
data = enhanced_system.get_reports_list(page, page_size)
return data
@app.get("/api/report/{report_id}")
async def get_report(report_id: str):
"""获取单个报告"""
report = enhanced_system.get_report_by_id(report_id)
if not report:
raise HTTPException(status_code=404, detail="报告不存在")
# 解析报告数据
try:
report_data = json.loads(report['report_data'])
analysis_data = json.loads(report['analysis_data'])
# 返回与外部API相同格式的数据
response_data = {
'studentInfo': report_data.get('studentInfo', {}),
'report': report_data.get('report', {}),
'footer': {
'copyright': '© 2024 尚逸基石教育科技有限公司 版权所有'
}
}
return response_data
except Exception as e:
raise HTTPException(status_code=500, detail=f"解析报告数据失败: {str(e)}")
@app.delete("/api/reports/{report_id}", response_model=ApiResponse)
async def delete_report(report_id: str):
"""删除报告"""
result = enhanced_system.delete_report(report_id)
if result['success']:
return ApiResponse(
success=True,
message=result['message']
)
else:
raise HTTPException(status_code=400, detail=result['message'])
@app.get("/api/questions")
async def get_questions():
"""获取题库数据"""
try:
questions_data = get_questions_data()
return questions_data
except FileNotFoundError:
raise HTTPException(status_code=404, detail="题库文件不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=f"加载题库失败: {str(e)}")
@app.get("/api/filters")
async def get_filters():
"""获取所有可用的筛选条件"""
try:
filters_data = get_available_filters()
return {
"success": True,
"data": filters_data
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取筛选条件失败: {str(e)}")
@app.get("/api/questions-by-filters")
async def get_questions_by_filters_api(
subject: str = "",
grade: str = "",
unit: str = ""
):
"""根据筛选条件获取题目"""
try:
# 根据筛选条件获取题目
filtered_questions = get_questions_by_filters(
subject=subject,
grade=grade,
unit=unit
)
return {
"success": True,
"data": filtered_questions
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取筛选题目失败: {str(e)}")
@app.get("/api/questions/{session_id}")
async def get_filtered_questions(session_id: str):
"""根据会话ID获取筛选后的题目"""
try:
# 从数据库获取会话信息,包括选择的标签和题目配置
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, qs.questions_config
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="会话不存在")
selected_tag = session_data['selected_tag'] or ''
questions_config = json.loads(session_data['questions_config'])
# 解析标签,提取筛选条件
subject = ""
grade = ""
unit = ""
# 如果标签包含筛选条件格式,提取它们
if selected_tag and "学科:" in selected_tag:
# 格式: "学科:科学 | 年级:一年级上册 | 单元:1-周围的植物"
filters = selected_tag.split(' | ')
for filter_item in filters:
if filter_item.startswith("学科:"):
subject = filter_item[3:]
elif filter_item.startswith("年级:"):
grade = filter_item[3:]
elif filter_item.startswith("单元:"):
unit = filter_item[3:]
# 根据筛选条件获取题目
filtered_questions = get_questions_by_filters(
subject=subject,
grade=grade,
unit=unit
)
# 根据配置选择题目
selected_questions = select_questions_by_config(filtered_questions, questions_config)
response_data = {
'questions': selected_questions,
'selectedTag': selected_tag,
'questionsConfig': questions_config
}
return response_data
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取筛选题目失败: {str(e)}")
@app.get("/api/quiz-results/{session_id}")
async def get_quiz_results(session_id: str):
"""获取答题结果详情"""
try:
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取答题结果
cursor.execute('''
SELECT qa.*, s.name, s.school, s.grade, qs.questions_config, qs.status,
qs.total_questions, qs.max_score
FROM quiz_answers qa
JOIN students s ON qa.student_name = s.name AND qa.student_school = s.school AND qa.student_grade = s.grade
JOIN quiz_sessions qs ON qa.session_id = qs.id
WHERE qa.session_id = ?
''', (session_id,))
result = cursor.fetchone()
if not result:
raise HTTPException(status_code=404, detail="答题结果不存在")
# 解析答题数据
answers_data = json.loads(result['answers_data'])
questions_config = json.loads(result['questions_config'])
# 获取原始题目数据以便显示选项
cursor.execute('''
SELECT s.selected_tag
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_info = cursor.fetchone()
selected_tag = session_info['selected_tag'] if session_info else ''
# 解析标签,提取筛选条件
subject = ""
grade = ""
unit = ""
if selected_tag and "学科:" in selected_tag:
filters = selected_tag.split(' | ')
for filter_item in filters:
if filter_item.startswith("学科:"):
subject = filter_item[3:]
elif filter_item.startswith("年级:"):
grade = filter_item[3:]
elif filter_item.startswith("单元:"):
unit = filter_item[3:]
# 获取筛选后的题目
filtered_questions = get_questions_by_filters(
subject=subject,
grade=grade,
unit=unit
)
# 重新选择题目以获取完整的题目数据
selected_questions = select_questions_by_config(filtered_questions, questions_config)
# 合并题目数据和答题结果
detailed_results = []
for i, answer in enumerate(answers_data):
# 优先使用保存的选项数据
options = answer.get('options', {})
# 如果保存的选项数据为空尝试从Excel数据中获取
if not options:
# 找到对应的题目
question = None
for q in selected_questions:
if q['questionId'] == answer['questionId']:
question = q
break
if question:
# 获取所有选项
labels = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
for label in labels:
for key in question:
# 使用正则表达式来匹配选项键
import re
if re.sub(r'\s+', '', key).find(f'选项{label}') != -1:
options[label] = question[key]
break
detailed_results.append({
'questionNumber': i + 1,
'questionText': answer['questionText'],
'questionType': answer['questionType'],
'options': options, # 使用保存的选项数据或重新获取的数据
'userAnswer': answer['userAnswer'],
'correctAnswer': answer['correctAnswer'],
'isCorrect': answer['isCorrect'],
'score': answer['score'],
'questionId': answer['questionId']
})
conn.close()
return {
'success': True,
'studentInfo': {
'name': result['name'],
'school': result['school'],
'grade': result['grade']
},
'totalScore': result['total_score'],
'maxScore': result['max_score'],
'totalQuestions': result['total_questions'],
'selectedTag': selected_tag,
'results': detailed_results,
'sessionStatus': result['status']
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取答题结果失败: {str(e)}")
def select_questions_by_config(filtered_questions: Dict, questions_config: Dict) -> List[Dict]:
"""根据配置从筛选后的题目中选择题目"""
selected = []
# 按照题目类型顺序:基础题 -> 进阶题 -> 竞赛题
question_order = ["基础题", "进阶题", "竞赛题"]
for question_type in question_order:
if question_type not in questions_config:
continue
count = questions_config[question_type]
available_questions = filtered_questions.get(question_type, [])
if len(available_questions) < count:
print(f"警告:{question_type}可用题目不足 ({len(available_questions)}/{count})")
# 随机选择指定数量的题目
import random
random.shuffle(available_questions)
selected_for_type = available_questions[:min(count, len(available_questions))]
for question in selected_for_type:
selected.append({
**question,
'questionType': question_type,
'questionId': f"{question_type}_{question['序号']}"
})
return selected
# HTML页面路由
@app.get("/")
async def index_page(request: Request):
"""提供主页面"""
try:
with open('public/index.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="主页文件不存在")
@app.get("/login.html")
async def login_page(request: Request):
"""提供登录页面"""
try:
with open('public/login.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="登录页面文件不存在")
@app.get("/survey.html")
async def survey_page(request: Request):
"""提供测评配置页面"""
try:
with open('public/survey.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="测评页面文件不存在")
@app.get("/report.html")
async def report_page(request: Request):
"""提供报告查看页面"""
try:
with open('public/report.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="报告页面文件不存在")
@app.get("/quiz-results/{session_id}")
async def quiz_results_page(session_id: str, request: Request):
"""提供答题结果展示页面"""
try:
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, qs.questions_config, qs.status, qs.total_score, qs.completed_at
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="会话不存在")
# 生成答题结果页面
return templates.TemplateResponse("quiz-results.html", {
"request": request,
"session_id": session_id,
"student_name": session_data['name'],
"school": session_data['school'],
"grade": session_data['grade']
})
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"生成答题结果页面失败: {str(e)}")
@app.get("/quiz/{session_id}")
async def quiz_page(session_id: str, request: Request):
"""处理答题页面"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, qs.questions_config, qs.status, qs.total_score, qs.completed_at
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="Session not found")
# 检查会话状态
if session_data['status'] in ['completed', 'can_regenerate']:
# 显示完成状态页面
return templates.TemplateResponse("completion.html", {
"request": request,
"session_id": session_id,
"student_name": session_data['name'],
"school": session_data['school'],
"grade": session_data['grade'],
"total_score": session_data['total_score'] or 0,
"completed_at": session_data['completed_at'] or '未知时间'
})
else:
# 显示答题页面
return templates.TemplateResponse("quiz.html", {
"request": request,
"session_id": session_id,
"student_name": session_data['name'],
"school": session_data['school'],
"grade": session_data['grade']
})
@app.post("/api/report-callback", response_model=ApiResponse)
async def report_callback(request: ReportCallbackRequest):
"""接收异步报告生成完成的回调数据"""
session_id = request.session_id
report_data = request.data
if not session_id or not report_data:
raise HTTPException(status_code=400, detail="session_id和data不能为空")
try:
# 验证会话是否存在
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
SELECT id FROM quiz_sessions WHERE id = ?
''', (session_id,))
session_exists = cursor.fetchone()
if not session_exists:
conn.close()
raise HTTPException(status_code=404, detail="会话不存在")
# 获取分析数据(如果有保存的话)
cursor.execute('''
SELECT analysis_data FROM temp_analysis_data
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
''', (session_id,))
analysis_result = cursor.fetchone()
analysis_data = json.loads(analysis_result[0]) if analysis_result else {}
# 构造完整的报告数据
full_report_data = {
'studentInfo': report_data.get('studentInfo', {}),
'report': report_data.get('report', {}),
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data
}
# 保存报告到数据库
report_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO reports (id, session_id, report_data, analysis_data, generated_at)
VALUES (?, ?, ?, ?, ?)
''', (report_id, session_id, json.dumps(full_report_data), json.dumps(analysis_data), get_east8_time_string()))
# 更新会话状态为已完成
cursor.execute('''
UPDATE quiz_sessions
SET status = 'completed', completed_at = ?
WHERE id = ?
''', (get_east8_time_string(), session_id))
conn.commit()
conn.close()
print(f"✅ 回调处理成功: {session_id}")
print(f" 报告ID: {report_id}")
return ApiResponse(
success=True,
message='回调数据处理成功'
)
except HTTPException:
raise
except Exception as e:
print(f"❌ 回调处理失败: {e}")
raise HTTPException(status_code=500, detail=f"回调处理失败: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)