815 lines
29 KiB
Python
815 lines
29 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import sqlite3
|
||
import json
|
||
import uuid
|
||
import os
|
||
from datetime import datetime
|
||
from fastapi import FastAPI, HTTPException, Request, Response
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.templating import Jinja2Templates
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
from typing import Dict, List, Optional, Any
|
||
import asyncio
|
||
import threading
|
||
from enhanced_survey_system import enhanced_system, get_east8_time_string, get_east8_time
|
||
from excel_reader import get_questions_data, get_questions_by_tag, get_questions_by_filters, get_available_filters
|
||
|
||
app = FastAPI(title="Enhanced Survey System")
|
||
|
||
# 配置CORS
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# 挂载静态文件
|
||
if os.path.exists("public"):
|
||
app.mount("/public", StaticFiles(directory="public"), name="public")
|
||
|
||
# 模板配置
|
||
templates = Jinja2Templates(directory="public" if os.path.exists("public") else ".")
|
||
|
||
# 数据模型
|
||
class CreateSessionRequest(BaseModel):
|
||
name: str
|
||
school: str
|
||
grade: str
|
||
phone: str = ""
|
||
selectedSubject: str = "" # 学科
|
||
selectedSemester: str = "" # 册次(保留兼容性)
|
||
selectedExamType: str = "" # 考试类型(保留兼容性)
|
||
selectedUnit: str = "" # 单元
|
||
selectedCategory: str = "" # 分类
|
||
selectedQuestionType: str = "" # 题型
|
||
selectedTag: str = ""
|
||
selectedTagsList: List[str] = []
|
||
questionsConfig: Dict[str, int]
|
||
totalQuestions: int = 0 # 总题目数
|
||
totalScore: int = 0 # 总分数
|
||
|
||
class SaveAnswersRequest(BaseModel):
|
||
sessionId: str
|
||
answers: List[Dict[str, Any]]
|
||
|
||
class GenerateReportRequest(BaseModel):
|
||
sessionId: str
|
||
|
||
class ReportCallbackRequest(BaseModel):
|
||
session_id: str
|
||
data: Dict[str, Any]
|
||
|
||
class SessionResponse(BaseModel):
|
||
success: bool
|
||
sessionId: str
|
||
studentId: str
|
||
|
||
class SaveAnswersResponse(BaseModel):
|
||
success: bool
|
||
totalScore: int
|
||
sessionId: str
|
||
|
||
class ApiResponse(BaseModel):
|
||
success: bool
|
||
message: str
|
||
|
||
# API端点
|
||
@app.post("/api/create-session", response_model=SessionResponse)
|
||
async def create_session(request: CreateSessionRequest):
|
||
"""创建答题会话"""
|
||
name = request.name.strip()
|
||
school = request.school.strip()
|
||
grade = request.grade.strip()
|
||
phone = request.phone.strip()
|
||
selected_subject = request.selectedSubject.strip()
|
||
selected_semester = request.selectedSemester.strip()
|
||
selected_unit = request.selectedUnit.strip()
|
||
|
||
# 构建描述性标签
|
||
selected_tag = ""
|
||
filters = []
|
||
if selected_subject: filters.append(f"学科:{selected_subject}")
|
||
if selected_semester: filters.append(f"年级:{selected_semester}") # 这里实际是完整年级信息
|
||
if selected_unit: filters.append(f"单元:{selected_unit}")
|
||
|
||
selected_tag = " | ".join(filters) if filters else "全部题目"
|
||
|
||
questions_config = request.questionsConfig
|
||
|
||
if not name or not school or not grade or not phone:
|
||
raise HTTPException(status_code=400, detail="姓名、学校、年级和手机号不能为空")
|
||
|
||
# 创建学员记录
|
||
student_id = str(uuid.uuid4())
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
INSERT INTO students (id, name, school, grade, phone, selected_tag)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
''', (student_id, name, school, grade, phone, selected_tag))
|
||
|
||
# 创建答题会话
|
||
session_id = str(uuid.uuid4())
|
||
|
||
# 如果前端没有传递总题目数和总分数,则从配置中计算
|
||
if request.totalQuestions == 0 or request.totalScore == 0:
|
||
# 根据题目配置计算总题目数和总分数
|
||
total_questions = sum(questions_config.values())
|
||
# 基础题5分,进阶题10分,竞赛题15分
|
||
total_score = (questions_config.get('基础题', 0) * 5 +
|
||
questions_config.get('进阶题', 0) * 10 +
|
||
questions_config.get('竞赛题', 0) * 15)
|
||
else:
|
||
total_questions = request.totalQuestions
|
||
total_score = request.totalScore
|
||
|
||
cursor.execute('''
|
||
INSERT INTO quiz_sessions (id, student_id, questions_config, status, total_questions, max_score)
|
||
VALUES (?, ?, ?, 'created', ?, ?)
|
||
''', (session_id, student_id, json.dumps(questions_config), total_questions, total_score))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
return SessionResponse(
|
||
success=True,
|
||
sessionId=session_id,
|
||
studentId=student_id
|
||
)
|
||
|
||
def async_generate_report(session_id: str):
|
||
"""异步生成报告"""
|
||
try:
|
||
asyncio.run(enhanced_system.auto_generate_report(session_id))
|
||
print(f"报告生成成功: {session_id}")
|
||
except Exception as e:
|
||
print(f"报告生成失败: {e}")
|
||
|
||
@app.post("/api/save-answers", response_model=SaveAnswersResponse)
|
||
async def save_answers(request: SaveAnswersRequest):
|
||
"""保存答题结果"""
|
||
session_id = request.sessionId
|
||
answers = request.answers
|
||
|
||
if not session_id or not answers:
|
||
raise HTTPException(status_code=400, detail="会话ID和答题结果不能为空")
|
||
|
||
# 保存答题结果 - JSON格式存储,包含用户信息
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row # 设置行工厂以支持字典访问
|
||
cursor = conn.cursor()
|
||
|
||
# 获取用户信息和会话信息
|
||
cursor.execute('''
|
||
SELECT s.name, s.school, s.grade, s.selected_tag, qs.questions_config
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_id,))
|
||
|
||
session_info = cursor.fetchone()
|
||
if not session_info:
|
||
raise HTTPException(status_code=404, detail="会话不存在")
|
||
|
||
student_name = session_info['name']
|
||
student_school = session_info['school']
|
||
student_grade = session_info['grade']
|
||
selected_tag = session_info['selected_tag']
|
||
|
||
total_score = 0
|
||
|
||
# 处理答题数据
|
||
processed_answers = []
|
||
for answer in answers:
|
||
# 兼容驼峰和下划线命名
|
||
user_answer = answer.get('userAnswer') or answer.get('user_answer', '')
|
||
correct_answer = answer.get('correctAnswer') or answer.get('correct_answer', '')
|
||
question_id = answer.get('questionId') or answer.get('question_id', '')
|
||
question_text = answer.get('questionText') or answer.get('question_text', '')
|
||
question_type = answer.get('questionType') or answer.get('question_type', '')
|
||
options = answer.get('options', {}) # 获取选项数据
|
||
|
||
is_correct = user_answer == correct_answer
|
||
score = answer.get('score', 0) if is_correct else 0
|
||
total_score += score
|
||
|
||
processed_answers.append({
|
||
'questionId': question_id,
|
||
'questionText': question_text,
|
||
'questionType': question_type,
|
||
'userAnswer': user_answer,
|
||
'correctAnswer': correct_answer,
|
||
'isCorrect': is_correct,
|
||
'score': score,
|
||
'options': options # 保存选项数据
|
||
})
|
||
|
||
# 将整个答题数据保存为JSON
|
||
answers_json = json.dumps(processed_answers, ensure_ascii=False)
|
||
answer_id = str(uuid.uuid4())
|
||
|
||
# 使用INSERT OR REPLACE确保每个session只有一条记录,包含用户信息
|
||
cursor.execute('''
|
||
INSERT OR REPLACE INTO quiz_answers
|
||
(id, session_id, student_name, student_school, student_grade, selected_tag, answers_data, total_score)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
''', (answer_id, session_id, student_name, student_school, student_grade, selected_tag, answers_json, total_score))
|
||
|
||
# 更新会话状态
|
||
cursor.execute('''
|
||
UPDATE quiz_sessions
|
||
SET status = 'completed', total_score = ?, completed_at = ?
|
||
WHERE id = ?
|
||
''', (total_score, get_east8_time_string(), session_id))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
# 异步生成报告
|
||
threading.Thread(target=async_generate_report, args=(session_id,), daemon=True).start()
|
||
|
||
return SaveAnswersResponse(
|
||
success=True,
|
||
totalScore=total_score,
|
||
sessionId=session_id
|
||
)
|
||
|
||
@app.post("/api/generate-report", response_model=ApiResponse)
|
||
async def generate_report(request: GenerateReportRequest):
|
||
"""手动生成报告"""
|
||
session_id = request.sessionId
|
||
if not session_id:
|
||
raise HTTPException(status_code=400, detail="会话ID不能为空")
|
||
|
||
# 同步生成报告
|
||
try:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
result = loop.run_until_complete(enhanced_system.auto_generate_report(session_id))
|
||
loop.close()
|
||
|
||
return ApiResponse(
|
||
success=result['success'],
|
||
message='报告生成成功' if result['success'] else result['error']
|
||
)
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
@app.post("/api/regenerate-report", response_model=ApiResponse)
|
||
async def regenerate_report(request: GenerateReportRequest):
|
||
"""处理重新生成报告请求"""
|
||
session_id = request.sessionId
|
||
if not session_id:
|
||
raise HTTPException(status_code=400, detail="会话ID不能为空")
|
||
|
||
# 同步重新生成报告
|
||
try:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
result = loop.run_until_complete(enhanced_system.regenerate_report(session_id))
|
||
loop.close()
|
||
|
||
response = ApiResponse(
|
||
success=result['success'],
|
||
message='报告重新生成成功' if result['success'] else result['error']
|
||
)
|
||
|
||
if result['success']:
|
||
# 使用额外的字段来存储report_id
|
||
response.extra = {"reportId": result.get('report_id')}
|
||
|
||
return response
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
@app.get("/api/sessions-can-regenerate")
|
||
async def get_sessions_can_regenerate():
|
||
"""获取可以重新生成的会话列表"""
|
||
try:
|
||
sessions = enhanced_system.get_sessions_can_regenerate()
|
||
|
||
return {
|
||
'success': True,
|
||
'sessions': sessions
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
@app.get("/api/reports")
|
||
async def get_reports(page: int = 1, page_size: int = 10):
|
||
"""获取报告列表"""
|
||
data = enhanced_system.get_reports_list(page, page_size)
|
||
return data
|
||
|
||
@app.get("/api/report/{report_id}")
|
||
async def get_report(report_id: str):
|
||
"""获取单个报告"""
|
||
report = enhanced_system.get_report_by_id(report_id)
|
||
|
||
if not report:
|
||
raise HTTPException(status_code=404, detail="报告不存在")
|
||
|
||
# 解析报告数据
|
||
try:
|
||
report_data = json.loads(report['report_data'])
|
||
analysis_data = json.loads(report['analysis_data'])
|
||
|
||
# 返回与外部API相同格式的数据
|
||
response_data = {
|
||
'studentInfo': report_data.get('studentInfo', {}),
|
||
'report': report_data.get('report', {}),
|
||
'footer': {
|
||
'copyright': '© 2024 尚逸基石教育科技有限公司 版权所有'
|
||
}
|
||
}
|
||
|
||
return response_data
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"解析报告数据失败: {str(e)}")
|
||
|
||
@app.delete("/api/reports/{report_id}", response_model=ApiResponse)
|
||
async def delete_report(report_id: str):
|
||
"""删除报告"""
|
||
result = enhanced_system.delete_report(report_id)
|
||
|
||
if result['success']:
|
||
return ApiResponse(
|
||
success=True,
|
||
message=result['message']
|
||
)
|
||
else:
|
||
raise HTTPException(status_code=400, detail=result['message'])
|
||
|
||
@app.get("/api/questions")
|
||
async def get_questions():
|
||
"""获取题库数据"""
|
||
try:
|
||
questions_data = get_questions_data()
|
||
return questions_data
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=404, detail="题库文件不存在")
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"加载题库失败: {str(e)}")
|
||
|
||
@app.get("/api/filters")
|
||
async def get_filters():
|
||
"""获取所有可用的筛选条件"""
|
||
try:
|
||
filters_data = get_available_filters()
|
||
return {
|
||
"success": True,
|
||
"data": filters_data
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取筛选条件失败: {str(e)}")
|
||
|
||
@app.get("/api/questions-by-filters")
|
||
async def get_questions_by_filters_api(
|
||
subject: str = "",
|
||
grade: str = "",
|
||
unit: str = ""
|
||
):
|
||
"""根据筛选条件获取题目"""
|
||
try:
|
||
# 根据筛选条件获取题目
|
||
filtered_questions = get_questions_by_filters(
|
||
subject=subject,
|
||
grade=grade,
|
||
unit=unit
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"data": filtered_questions
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取筛选题目失败: {str(e)}")
|
||
|
||
@app.get("/api/questions/{session_id}")
|
||
async def get_filtered_questions(session_id: str):
|
||
"""根据会话ID获取筛选后的题目"""
|
||
try:
|
||
# 从数据库获取会话信息,包括选择的标签和题目配置
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT s.*, qs.questions_config
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_id,))
|
||
|
||
session_data = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if not session_data:
|
||
raise HTTPException(status_code=404, detail="会话不存在")
|
||
|
||
selected_tag = session_data['selected_tag'] or ''
|
||
questions_config = json.loads(session_data['questions_config'])
|
||
|
||
# 解析标签,提取筛选条件
|
||
subject = ""
|
||
grade = ""
|
||
unit = ""
|
||
|
||
# 如果标签包含筛选条件格式,提取它们
|
||
if selected_tag and "学科:" in selected_tag:
|
||
# 格式: "学科:科学 | 年级:一年级上册 | 单元:1-周围的植物"
|
||
filters = selected_tag.split(' | ')
|
||
for filter_item in filters:
|
||
if filter_item.startswith("学科:"):
|
||
subject = filter_item[3:]
|
||
elif filter_item.startswith("年级:"):
|
||
grade = filter_item[3:]
|
||
elif filter_item.startswith("单元:"):
|
||
unit = filter_item[3:]
|
||
|
||
# 根据筛选条件获取题目
|
||
filtered_questions = get_questions_by_filters(
|
||
subject=subject,
|
||
grade=grade,
|
||
unit=unit
|
||
)
|
||
|
||
# 根据配置选择题目
|
||
selected_questions = select_questions_by_config(filtered_questions, questions_config)
|
||
|
||
response_data = {
|
||
'questions': selected_questions,
|
||
'selectedTag': selected_tag,
|
||
'questionsConfig': questions_config
|
||
}
|
||
|
||
return response_data
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取筛选题目失败: {str(e)}")
|
||
|
||
|
||
@app.get("/api/quiz-results/{session_id}")
|
||
async def get_quiz_results(session_id: str):
|
||
"""获取答题结果详情"""
|
||
try:
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# 获取答题结果
|
||
cursor.execute('''
|
||
SELECT qa.*, s.name, s.school, s.grade, qs.questions_config, qs.status,
|
||
qs.total_questions, qs.max_score
|
||
FROM quiz_answers qa
|
||
JOIN students s ON qa.student_name = s.name AND qa.student_school = s.school AND qa.student_grade = s.grade
|
||
JOIN quiz_sessions qs ON qa.session_id = qs.id
|
||
WHERE qa.session_id = ?
|
||
''', (session_id,))
|
||
|
||
result = cursor.fetchone()
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="答题结果不存在")
|
||
|
||
# 解析答题数据
|
||
answers_data = json.loads(result['answers_data'])
|
||
questions_config = json.loads(result['questions_config'])
|
||
|
||
# 获取原始题目数据以便显示选项
|
||
cursor.execute('''
|
||
SELECT s.selected_tag
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_id,))
|
||
|
||
session_info = cursor.fetchone()
|
||
selected_tag = session_info['selected_tag'] if session_info else ''
|
||
|
||
# 解析标签,提取筛选条件
|
||
subject = ""
|
||
grade = ""
|
||
unit = ""
|
||
|
||
if selected_tag and "学科:" in selected_tag:
|
||
filters = selected_tag.split(' | ')
|
||
for filter_item in filters:
|
||
if filter_item.startswith("学科:"):
|
||
subject = filter_item[3:]
|
||
elif filter_item.startswith("年级:"):
|
||
grade = filter_item[3:]
|
||
elif filter_item.startswith("单元:"):
|
||
unit = filter_item[3:]
|
||
|
||
# 获取筛选后的题目
|
||
filtered_questions = get_questions_by_filters(
|
||
subject=subject,
|
||
grade=grade,
|
||
unit=unit
|
||
)
|
||
|
||
# 重新选择题目以获取完整的题目数据
|
||
selected_questions = select_questions_by_config(filtered_questions, questions_config)
|
||
|
||
# 合并题目数据和答题结果
|
||
detailed_results = []
|
||
for i, answer in enumerate(answers_data):
|
||
# 优先使用保存的选项数据
|
||
options = answer.get('options', {})
|
||
|
||
# 如果保存的选项数据为空,尝试从Excel数据中获取
|
||
if not options:
|
||
# 找到对应的题目
|
||
question = None
|
||
for q in selected_questions:
|
||
if q['questionId'] == answer['questionId']:
|
||
question = q
|
||
break
|
||
|
||
if question:
|
||
# 获取所有选项
|
||
labels = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
|
||
for label in labels:
|
||
for key in question:
|
||
# 使用正则表达式来匹配选项键
|
||
import re
|
||
if re.sub(r'\s+', '', key).find(f'选项{label}') != -1:
|
||
options[label] = question[key]
|
||
break
|
||
|
||
detailed_results.append({
|
||
'questionNumber': i + 1,
|
||
'questionText': answer['questionText'],
|
||
'questionType': answer['questionType'],
|
||
'options': options, # 使用保存的选项数据或重新获取的数据
|
||
'userAnswer': answer['userAnswer'],
|
||
'correctAnswer': answer['correctAnswer'],
|
||
'isCorrect': answer['isCorrect'],
|
||
'score': answer['score'],
|
||
'questionId': answer['questionId']
|
||
})
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'success': True,
|
||
'studentInfo': {
|
||
'name': result['name'],
|
||
'school': result['school'],
|
||
'grade': result['grade']
|
||
},
|
||
'totalScore': result['total_score'],
|
||
'maxScore': result['max_score'],
|
||
'totalQuestions': result['total_questions'],
|
||
'selectedTag': selected_tag,
|
||
'results': detailed_results,
|
||
'sessionStatus': result['status']
|
||
}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"获取答题结果失败: {str(e)}")
|
||
|
||
def select_questions_by_config(filtered_questions: Dict, questions_config: Dict) -> List[Dict]:
|
||
"""根据配置从筛选后的题目中选择题目"""
|
||
selected = []
|
||
|
||
# 按照题目类型顺序:基础题 -> 进阶题 -> 竞赛题
|
||
question_order = ["基础题", "进阶题", "竞赛题"]
|
||
|
||
for question_type in question_order:
|
||
if question_type not in questions_config:
|
||
continue
|
||
|
||
count = questions_config[question_type]
|
||
available_questions = filtered_questions.get(question_type, [])
|
||
|
||
if len(available_questions) < count:
|
||
print(f"警告:{question_type}可用题目不足 ({len(available_questions)}/{count})")
|
||
|
||
# 随机选择指定数量的题目
|
||
import random
|
||
random.shuffle(available_questions)
|
||
selected_for_type = available_questions[:min(count, len(available_questions))]
|
||
|
||
for question in selected_for_type:
|
||
selected.append({
|
||
**question,
|
||
'questionType': question_type,
|
||
'questionId': f"{question_type}_{question['序号']}"
|
||
})
|
||
|
||
return selected
|
||
|
||
# HTML页面路由
|
||
@app.get("/")
|
||
async def index_page(request: Request):
|
||
"""提供主页面"""
|
||
try:
|
||
with open('public/index.html', 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
return Response(content=content, media_type="text/html; charset=utf-8")
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=404, detail="主页文件不存在")
|
||
|
||
@app.get("/login.html")
|
||
async def login_page(request: Request):
|
||
"""提供登录页面"""
|
||
try:
|
||
with open('public/login.html', 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
return Response(content=content, media_type="text/html; charset=utf-8")
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=404, detail="登录页面文件不存在")
|
||
|
||
@app.get("/survey.html")
|
||
async def survey_page(request: Request):
|
||
"""提供测评配置页面"""
|
||
try:
|
||
with open('public/survey.html', 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
return Response(content=content, media_type="text/html; charset=utf-8")
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=404, detail="测评页面文件不存在")
|
||
|
||
@app.get("/report.html")
|
||
async def report_page(request: Request):
|
||
"""提供报告查看页面"""
|
||
try:
|
||
with open('public/report.html', 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
return Response(content=content, media_type="text/html; charset=utf-8")
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=404, detail="报告页面文件不存在")
|
||
|
||
@app.get("/quiz-results/{session_id}")
|
||
async def quiz_results_page(session_id: str, request: Request):
|
||
"""提供答题结果展示页面"""
|
||
try:
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT s.*, qs.questions_config, qs.status, qs.total_score, qs.completed_at
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_id,))
|
||
|
||
session_data = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if not session_data:
|
||
raise HTTPException(status_code=404, detail="会话不存在")
|
||
|
||
# 生成答题结果页面
|
||
return templates.TemplateResponse("quiz-results.html", {
|
||
"request": request,
|
||
"session_id": session_id,
|
||
"student_name": session_data['name'],
|
||
"school": session_data['school'],
|
||
"grade": session_data['grade']
|
||
})
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"生成答题结果页面失败: {str(e)}")
|
||
|
||
@app.get("/quiz/{session_id}")
|
||
async def quiz_page(session_id: str, request: Request):
|
||
"""处理答题页面"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT s.*, qs.questions_config, qs.status, qs.total_score, qs.completed_at
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_id,))
|
||
|
||
session_data = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if not session_data:
|
||
raise HTTPException(status_code=404, detail="Session not found")
|
||
|
||
# 检查会话状态
|
||
if session_data['status'] in ['completed', 'can_regenerate']:
|
||
# 显示完成状态页面
|
||
return templates.TemplateResponse("completion.html", {
|
||
"request": request,
|
||
"session_id": session_id,
|
||
"student_name": session_data['name'],
|
||
"school": session_data['school'],
|
||
"grade": session_data['grade'],
|
||
"total_score": session_data['total_score'] or 0,
|
||
"completed_at": session_data['completed_at'] or '未知时间'
|
||
})
|
||
else:
|
||
# 显示答题页面
|
||
return templates.TemplateResponse("quiz.html", {
|
||
"request": request,
|
||
"session_id": session_id,
|
||
"student_name": session_data['name'],
|
||
"school": session_data['school'],
|
||
"grade": session_data['grade']
|
||
})
|
||
|
||
|
||
@app.post("/api/report-callback", response_model=ApiResponse)
|
||
async def report_callback(request: ReportCallbackRequest):
|
||
"""接收异步报告生成完成的回调数据"""
|
||
session_id = request.session_id
|
||
report_data = request.data
|
||
|
||
if not session_id or not report_data:
|
||
raise HTTPException(status_code=400, detail="session_id和data不能为空")
|
||
|
||
try:
|
||
# 验证会话是否存在
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT id FROM quiz_sessions WHERE id = ?
|
||
''', (session_id,))
|
||
|
||
session_exists = cursor.fetchone()
|
||
if not session_exists:
|
||
conn.close()
|
||
raise HTTPException(status_code=404, detail="会话不存在")
|
||
|
||
# 获取分析数据(如果有保存的话)
|
||
cursor.execute('''
|
||
SELECT analysis_data FROM temp_analysis_data
|
||
WHERE session_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
''', (session_id,))
|
||
|
||
analysis_result = cursor.fetchone()
|
||
analysis_data = json.loads(analysis_result[0]) if analysis_result else {}
|
||
|
||
# 构造完整的报告数据
|
||
full_report_data = {
|
||
'studentInfo': report_data.get('studentInfo', {}),
|
||
'report': report_data.get('report', {}),
|
||
'generated_at': get_east8_time().isoformat(),
|
||
'session_id': session_id,
|
||
'analysis_data': analysis_data
|
||
}
|
||
|
||
# 保存报告到数据库
|
||
report_id = str(uuid.uuid4())
|
||
cursor.execute('''
|
||
INSERT INTO reports (id, session_id, report_data, analysis_data, generated_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
''', (report_id, session_id, json.dumps(full_report_data), json.dumps(analysis_data), get_east8_time_string()))
|
||
|
||
# 更新会话状态为已完成
|
||
cursor.execute('''
|
||
UPDATE quiz_sessions
|
||
SET status = 'completed', completed_at = ?
|
||
WHERE id = ?
|
||
''', (get_east8_time_string(), session_id))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
print(f"✅ 回调处理成功: {session_id}")
|
||
print(f" 报告ID: {report_id}")
|
||
|
||
return ApiResponse(
|
||
success=True,
|
||
message='回调数据处理成功'
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
print(f"❌ 回调处理失败: {e}")
|
||
raise HTTPException(status_code=500, detail=f"回调处理失败: {str(e)}")
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|