survey/main.py
2025-10-29 23:49:44 +08:00

1136 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import uuid
import os
from datetime import datetime
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Optional, Any
import asyncio
import threading
from enhanced_survey_system import enhanced_system, get_east8_time_string, get_east8_time
app = FastAPI(title="Enhanced Survey System")
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载静态文件
if os.path.exists("public"):
app.mount("/public", StaticFiles(directory="public"), name="public")
# 模板配置
templates = Jinja2Templates(directory="public" if os.path.exists("public") else ".")
# 数据模型
class CreateSessionRequest(BaseModel):
name: str
school: str
grade: str
selectedTag: str = ""
questionsConfig: Dict[str, int]
class SaveAnswersRequest(BaseModel):
sessionId: str
answers: List[Dict[str, Any]]
class GenerateReportRequest(BaseModel):
sessionId: str
class ReportCallbackRequest(BaseModel):
session_id: str
data: Dict[str, Any]
class SessionResponse(BaseModel):
success: bool
sessionId: str
studentId: str
class SaveAnswersResponse(BaseModel):
success: bool
totalScore: int
sessionId: str
class ApiResponse(BaseModel):
success: bool
message: str
# API端点
@app.post("/api/create-session", response_model=SessionResponse)
async def create_session(request: CreateSessionRequest):
"""创建答题会话"""
name = request.name.strip()
school = request.school.strip()
grade = request.grade.strip()
selected_tag = request.selectedTag.strip()
questions_config = request.questionsConfig
if not name or not school or not grade:
raise HTTPException(status_code=400, detail="姓名、学校和年级不能为空")
# 创建学员记录
student_id = str(uuid.uuid4())
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO students (id, name, school, grade, selected_tag)
VALUES (?, ?, ?, ?, ?)
''', (student_id, name, school, grade, selected_tag))
# 创建答题会话
session_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO quiz_sessions (id, student_id, questions_config, status)
VALUES (?, ?, ?, 'created')
''', (session_id, student_id, json.dumps(questions_config)))
conn.commit()
conn.close()
return SessionResponse(
success=True,
sessionId=session_id,
studentId=student_id
)
def async_generate_report(session_id: str):
"""异步生成报告"""
try:
asyncio.run(enhanced_system.auto_generate_report(session_id))
print(f"报告生成成功: {session_id}")
except Exception as e:
print(f"报告生成失败: {e}")
@app.post("/api/save-answers", response_model=SaveAnswersResponse)
async def save_answers(request: SaveAnswersRequest):
"""保存答题结果"""
session_id = request.sessionId
answers = request.answers
if not session_id or not answers:
raise HTTPException(status_code=400, detail="会话ID和答题结果不能为空")
# 保存答题结果 - JSON格式存储包含用户信息
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row # 设置行工厂以支持字典访问
cursor = conn.cursor()
# 获取用户信息和会话信息
cursor.execute('''
SELECT s.name, s.school, s.grade, s.selected_tag, qs.questions_config
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_info = cursor.fetchone()
if not session_info:
raise HTTPException(status_code=404, detail="会话不存在")
student_name = session_info['name']
student_school = session_info['school']
student_grade = session_info['grade']
selected_tag = session_info['selected_tag']
total_score = 0
# 处理答题数据
processed_answers = []
for answer in answers:
# 兼容驼峰和下划线命名
user_answer = answer.get('userAnswer') or answer.get('user_answer', '')
correct_answer = answer.get('correctAnswer') or answer.get('correct_answer', '')
question_id = answer.get('questionId') or answer.get('question_id', '')
question_text = answer.get('questionText') or answer.get('question_text', '')
question_type = answer.get('questionType') or answer.get('question_type', '')
is_correct = user_answer == correct_answer
score = answer.get('score', 0) if is_correct else 0
total_score += score
processed_answers.append({
'questionId': question_id,
'questionText': question_text,
'questionType': question_type,
'userAnswer': user_answer,
'correctAnswer': correct_answer,
'isCorrect': is_correct,
'score': score
})
# 将整个答题数据保存为JSON
answers_json = json.dumps(processed_answers, ensure_ascii=False)
answer_id = str(uuid.uuid4())
# 使用INSERT OR REPLACE确保每个session只有一条记录包含用户信息
cursor.execute('''
INSERT OR REPLACE INTO quiz_answers
(id, session_id, student_name, student_school, student_grade, selected_tag, answers_data, total_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (answer_id, session_id, student_name, student_school, student_grade, selected_tag, answers_json, total_score))
# 更新会话状态
cursor.execute('''
UPDATE quiz_sessions
SET status = 'completed', total_score = ?, completed_at = ?
WHERE id = ?
''', (total_score, get_east8_time_string(), session_id))
conn.commit()
conn.close()
# 异步生成报告
threading.Thread(target=async_generate_report, args=(session_id,), daemon=True).start()
return SaveAnswersResponse(
success=True,
totalScore=total_score,
sessionId=session_id
)
@app.post("/api/generate-report", response_model=ApiResponse)
async def generate_report(request: GenerateReportRequest):
"""手动生成报告"""
session_id = request.sessionId
if not session_id:
raise HTTPException(status_code=400, detail="会话ID不能为空")
# 同步生成报告
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(enhanced_system.auto_generate_report(session_id))
loop.close()
return ApiResponse(
success=result['success'],
message='报告生成成功' if result['success'] else result['error']
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/regenerate-report", response_model=ApiResponse)
async def regenerate_report(request: GenerateReportRequest):
"""处理重新生成报告请求"""
session_id = request.sessionId
if not session_id:
raise HTTPException(status_code=400, detail="会话ID不能为空")
# 同步重新生成报告
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(enhanced_system.regenerate_report(session_id))
loop.close()
response = ApiResponse(
success=result['success'],
message='报告重新生成成功' if result['success'] else result['error']
)
if result['success']:
# 使用额外的字段来存储report_id
response.extra = {"reportId": result.get('report_id')}
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/sessions-can-regenerate")
async def get_sessions_can_regenerate():
"""获取可以重新生成的会话列表"""
try:
sessions = enhanced_system.get_sessions_can_regenerate()
return {
'success': True,
'sessions': sessions
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/reports")
async def get_reports(page: int = 1, page_size: int = 10):
"""获取报告列表"""
data = enhanced_system.get_reports_list(page, page_size)
return data
@app.get("/api/report/{report_id}")
async def get_report(report_id: str):
"""获取单个报告"""
report = enhanced_system.get_report_by_id(report_id)
if not report:
raise HTTPException(status_code=404, detail="报告不存在")
# 解析报告数据
try:
report_data = json.loads(report['report_data'])
analysis_data = json.loads(report['analysis_data'])
# 返回与外部API相同格式的数据
response_data = {
'studentInfo': report_data.get('studentInfo', {}),
'report': report_data.get('report', {}),
'footer': {
'copyright': '© 2024 尚逸基石教育科技有限公司 版权所有'
}
}
return response_data
except Exception as e:
raise HTTPException(status_code=500, detail=f"解析报告数据失败: {str(e)}")
@app.delete("/api/reports/{report_id}", response_model=ApiResponse)
async def delete_report(report_id: str):
"""删除报告"""
result = enhanced_system.delete_report(report_id)
if result['success']:
return ApiResponse(
success=True,
message=result['message']
)
else:
raise HTTPException(status_code=400, detail=result['message'])
@app.get("/api/questions")
async def get_questions():
"""获取题库数据"""
try:
with open('public/questions.json', 'r', encoding='utf-8') as f:
questions_data = json.load(f)
return questions_data
except FileNotFoundError:
raise HTTPException(status_code=404, detail="题库文件不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=f"加载题库失败: {str(e)}")
@app.get("/api/tags")
async def get_tags():
"""获取标签数据"""
try:
with open('public/tags.json', 'r', encoding='utf-8') as f:
tags_data = json.load(f)
return tags_data
except FileNotFoundError:
# 如果tags.json不存在生成备用标签数据
try:
with open('public/questions.json', 'r', encoding='utf-8') as f:
questions_data = json.load(f)
all_tags = set()
for questions in questions_data.values():
for question in questions:
tags = question.get('题目标签', '') or question.get('标签', '')
if tags:
for tag in tags.split(r'[\s,]+'):
if tag.strip():
all_tags.add(tag.strip())
backup_tags = {
"tags": sorted(list(all_tags)),
"tag_counts": {},
"total_unique_tags": len(all_tags)
}
return backup_tags
except Exception as e:
raise HTTPException(status_code=500, detail=f"生成标签数据失败: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"加载标签数据失败: {str(e)}")
@app.get("/api/questions/{session_id}")
async def get_filtered_questions(session_id: str):
"""根据会话ID获取筛选后的题目"""
try:
# 从数据库获取会话信息,包括选择的标签和题目配置
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.selected_tag, qs.questions_config
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="会话不存在")
selected_tag = session_data['selected_tag'] or ''
questions_config = json.loads(session_data['questions_config'])
# 加载所有题目
with open('public/questions.json', 'r', encoding='utf-8') as f:
all_questions = json.load(f)
# 根据标签筛选题目
filtered_questions = filter_questions_by_tag(all_questions, selected_tag)
# 根据配置选择题目
selected_questions = select_questions_by_config(filtered_questions, questions_config)
response_data = {
'questions': selected_questions,
'selectedTag': selected_tag,
'questionsConfig': questions_config
}
return response_data
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取筛选题目失败: {str(e)}")
def filter_questions_by_tag(all_questions: Dict, selected_tag: str) -> Dict:
"""根据标签筛选题目"""
if not selected_tag:
return all_questions
filtered = {
"基础题": [],
"进阶题": [],
"竞赛题": []
}
for question_type in ["基础题", "进阶题", "竞赛题"]:
for question in all_questions.get(question_type, []):
question_tags = question.get('题目标签', '') or question.get('标签', '')
# 正确分割标签(按空格、逗号等分隔符)
if isinstance(question_tags, str):
import re
tag_list = re.split(r'[\s,]+', question_tags.strip())
if selected_tag in tag_list:
filtered[question_type].append(question)
return filtered
def select_questions_by_config(filtered_questions: Dict, questions_config: Dict) -> List[Dict]:
"""根据配置从筛选后的题目中选择题目"""
selected = []
# 按照题目类型顺序:基础题 -> 进阶题 -> 竞赛题
question_order = ["基础题", "进阶题", "竞赛题"]
for question_type in question_order:
if question_type not in questions_config:
continue
count = questions_config[question_type]
available_questions = filtered_questions.get(question_type, [])
if len(available_questions) < count:
print(f"警告:{question_type}可用题目不足 ({len(available_questions)}/{count})")
# 随机选择指定数量的题目
import random
random.shuffle(available_questions)
selected_for_type = available_questions[:min(count, len(available_questions))]
for question in selected_for_type:
selected.append({
**question,
'questionType': question_type,
'questionId': f"{question_type}_{question['序号']}"
})
return selected
# HTML页面路由
@app.get("/")
async def index_page(request: Request):
"""提供主页面"""
try:
with open('public/index.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="主页文件不存在")
@app.get("/login.html")
async def login_page(request: Request):
"""提供登录页面"""
try:
with open('public/login.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="登录页面文件不存在")
@app.get("/survey.html")
async def survey_page(request: Request):
"""提供测评配置页面"""
try:
with open('public/survey.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="测评页面文件不存在")
@app.get("/report.html")
async def report_page(request: Request):
"""提供报告查看页面"""
try:
with open('public/report.html', 'r', encoding='utf-8') as f:
content = f.read()
return Response(content=content, media_type="text/html; charset=utf-8")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="报告页面文件不存在")
@app.get("/quiz/{session_id}")
async def quiz_page(session_id: str, request: Request):
"""处理答题页面"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, qs.questions_config, qs.status
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="Session not found")
# 生成答题页面HTML
html_content = generate_quiz_page(session_data, session_id)
return Response(content=html_content, media_type="text/html; charset=utf-8")
def generate_quiz_page(session_data: sqlite3.Row, session_id: str) -> str:
"""生成答题页面HTML"""
questions_config = json.loads(session_data['questions_config'])
student_name = session_data['name']
school = session_data['school']
grade = session_data['grade']
return f'''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{student_name} - 学科能力测评</title>
<style>
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
font-family: 'Microsoft YaHei', sans-serif;
}}
body {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 20px;
}}
.container {{
max-width: 900px;
margin: 0 auto;
background: white;
border-radius: 15px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.2);
overflow: hidden;
}}
.header {{
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%);
color: white;
padding: 30px;
text-align: center;
}}
.student-info {{
background: #f8f9fa;
padding: 20px;
border-bottom: 1px solid #e9ecef;
}}
.info-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
gap: 15px;
}}
.info-item {{
background: white;
padding: 15px;
border-radius: 8px;
border-left: 4px solid #4facfe;
}}
.info-label {{
font-weight: 600;
color: #2d5a3d;
margin-bottom: 5px;
}}
.info-value {{
color: #333;
font-size: 16px;
}}
.progress-bar {{
background: #e9ecef;
height: 8px;
border-radius: 4px;
margin: 20px 30px;
overflow: hidden;
}}
.progress-fill {{
background: linear-gradient(90deg, #4facfe 0%, #00f2fe 100%);
height: 100%;
border-radius: 4px;
transition: width 0.3s ease;
width: 0%;
}}
.questions-container {{
padding: 30px;
}}
.question-card {{
background: white;
border: 2px solid #e9ecef;
border-radius: 12px;
padding: 25px;
margin-bottom: 20px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.05);
}}
.question-header {{
display: flex;
align-items: center;
margin-bottom: 20px;
}}
.question-number {{
background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%);
color: white;
width: 35px;
height: 35px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-weight: 600;
margin-right: 15px;
}}
.question-type {{
background: #e8f5e8;
color: #2e7d32;
padding: 4px 12px;
border-radius: 20px;
font-size: 12px;
font-weight: 500;
margin-left: auto;
border: 1px solid #2e7d32;
}}
.question-type.基础题 {{
background: #e8f5e8;
color: #2e7d32;
border: 1px solid #2e7d32;
}}
.question-type.进阶题 {{
background: #fff3e0;
color: #f57c00;
border: 1px solid #f57c00;
}}
.question-type.竞赛题 {{
background: #fce4ec;
color: #c2185b;
border: 1px solid #c2185b;
}}
.question-text {{
font-size: 18px;
color: #333;
line-height: 1.6;
margin-bottom: 20px;
font-weight: 500;
}}
.options-list {{
list-style: none;
}}
.option-item {{
background: #f8f9fa;
border: 2px solid #e9ecef;
border-radius: 8px;
padding: 15px 20px;
margin-bottom: 10px;
cursor: pointer;
transition: all 0.3s;
display: flex;
align-items: center;
}}
.option-item:hover {{
background: #e3f2fd;
border-color: #4facfe;
transform: translateY(-1px);
}}
.option-item.selected {{
background: #e3f2fd;
border-color: #4facfe;
}}
.option-item input[type="radio"] {{
margin-right: 12px;
transform: scale(1.2);
}}
.option-text {{
flex: 1;
font-size: 16px;
color: #333;
}}
.submit-btn {{
background: linear-gradient(135deg, #4CAF50, #66BB6A);
color: white;
border: none;
padding: 15px 30px;
border-radius: 8px;
font-size: 18px;
font-weight: 600;
cursor: pointer;
transition: all 0.3s;
display: block;
margin: 30px auto;
}}
.submit-btn:hover {{
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(76, 175, 80, 0.4);
}}
.submit-btn:disabled {{
background: #ccc;
cursor: not-allowed;
transform: none;
}}
.result-section {{
text-align: center;
padding: 50px 30px;
display: none;
}}
.score-display {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
border-radius: 15px;
margin: 20px 0;
}}
.score-value {{
font-size: 48px;
font-weight: 700;
margin-bottom: 10px;
}}
.report-link {{
display: inline-block;
background: linear-gradient(135deg, #ff7e5f, #feb47b);
color: white;
padding: 12px 24px;
border-radius: 8px;
text-decoration: none;
font-weight: 600;
margin-top: 20px;
transition: all 0.3s;
}}
.report-link:hover {{
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(255, 126, 95, 0.3);
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎯 学科能力测评</h1>
<p>请认真回答以下问题</p>
</div>
<div class="student-info">
<div class="info-grid">
<div class="info-item">
<div class="info-label">姓名</div>
<div class="info-value">{student_name}</div>
</div>
<div class="info-item">
<div class="info-label">学校</div>
<div class="info-value">{school}</div>
</div>
<div class="info-item">
<div class="info-label">年级</div>
<div class="info-value">{grade}</div>
</div>
<div class="info-item">
<div class="info-label">选题范围</div>
<div class="info-value" id="selectedTagDisplay">正在加载...</div>
</div>
</div>
</div>
<div class="progress-bar">
<div class="progress-fill" id="progressFill"></div>
</div>
<div class="questions-container" id="questionsContainer">
<div class="loading">正在加载题目...</div>
</div>
<div class="result-section" id="resultSection">
<div class="score-display">
<div class="score-value" id="scoreValue">0分</div>
<div>测评完成!正在生成报告...</div>
</div>
<div id="reportStatus">请稍候,系统正在为您生成详细的测评报告...</div>
</div>
</div>
<script>
class QuizSystem {{
constructor() {{
this.sessionId = '{session_id}';
this.questions = [];
this.answers = {{}};
this.init();
}}
async init() {{
try {{
await this.loadQuestions();
this.renderQuestions();
}} catch (error) {{
this.showError('系统初始化失败: ' + error.message);
}}
}}
async loadQuestions() {{
const response = await fetch(`/api/questions/{session_id}`);
if (!response.ok) {{
throw new Error('题目加载失败');
}}
const data = await response.json();
this.questions = data.questions;
this.selectedTag = data.selectedTag;
this.questionsConfig = data.questionsConfig;
// 更新选题范围显示
const selectedTagDisplay = document.getElementById('selectedTagDisplay');
if (selectedTagDisplay) {{
selectedTagDisplay.textContent = this.selectedTag || '全部题目';
}}
if (this.questions.length === 0) {{
throw new Error('没有可用的题目');
}}
}}
renderQuestions() {{
const container = document.getElementById('questionsContainer');
container.innerHTML = '';
this.questions.forEach((question, index) => {{
const questionCard = document.createElement('div');
questionCard.className = 'question-card';
questionCard.innerHTML = `
<div class="question-header">
<div class="question-number">${{index + 1}}</div>
<div class="question-type ${{question.questionType}}">${{question.questionType}}</div>
</div>
<div class="question-text">${{question['题干']}}</div>
<ul class="options-list">
${{this.renderOptions(question, index)}}
</ul>
`;
container.appendChild(questionCard);
}});
this.bindOptionEvents();
}}
renderOptions(question, questionIndex) {{
const options = [];
const labels = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'];
// 通过遍历所有键来找到选项
for (let i = 0; i < labels.length; i++) {{
let optionText = null;
// 遍历问题的所有键,寻找匹配的选项
for (const key in question) {{
// 检查键是否包含当前选项字母(忽略空格数量)
if (key.replace(/\\s+/g, '').includes(`选项${{labels[i]}}`)) {{
optionText = question[key];
break;
}}
}}
if (optionText && optionText.trim()) {{
options.push(`
<li class="option-item" data-question="${{questionIndex}}" data-option="${{labels[i]}}">
<input type="radio" name="question_${{questionIndex}}" value="${{labels[i]}}" id="option_${{questionIndex}}_${{i}}">
<label for="option_${{questionIndex}}_${{i}}" class="option-text">${{labels[i]}}. ${{optionText}}</label>
</li>
`);
}}
}}
return options.join('');
}}
bindOptionEvents() {{
document.querySelectorAll('.option-item').forEach(item => {{
item.addEventListener('click', function() {{
const questionIndex = this.dataset.question;
const option = this.dataset.option;
const radio = this.querySelector('input[type="radio"]');
document.querySelectorAll(`.option-item[data-question="${{questionIndex}}"]`).forEach(otherItem => {{
otherItem.classList.remove('selected');
}});
this.classList.add('selected');
radio.checked = true;
window.quiz.answers[questionIndex] = option;
window.quiz.updateProgress();
window.quiz.checkCompletion();
}});
}});
const container = document.getElementById('questionsContainer');
const submitBtn = document.createElement('button');
submitBtn.className = 'submit-btn';
submitBtn.textContent = '提交答题';
submitBtn.disabled = true;
submitBtn.onclick = () => this.submitQuiz();
container.appendChild(submitBtn);
}}
updateProgress() {{
const total = this.questions.length;
const answered = Object.keys(this.answers).length;
const percentage = total > 0 ? (answered / total) * 100 : 0;
document.getElementById('progressFill').style.width = percentage + '%';
}}
checkCompletion() {{
const total = this.questions.length;
const answered = Object.keys(this.answers).length;
const submitBtn = document.querySelector('.submit-btn');
submitBtn.disabled = answered !== total;
}}
async submitQuiz() {{
try {{
const submitBtn = document.querySelector('.submit-btn');
submitBtn.textContent = '正在提交...';
submitBtn.disabled = true;
const answersData = this.questions.map((question, index) => ({{
questionId: question.questionId,
questionText: question['题干'],
questionType: question.questionType,
userAnswer: this.answers[index] || '',
correctAnswer: question['答案'],
score: parseInt(question['分数']) || 0
}}));
const response = await fetch('/api/save-answers', {{
method: 'POST',
headers: {{
'Content-Type': 'application/json'
}},
body: JSON.stringify({{
sessionId: this.sessionId,
answers: answersData
}})
}});
if (response.ok) {{
const result = await response.json();
this.showResults(result.totalScore);
this.checkReportStatus();
}} else {{
throw new Error('提交失败');
}}
}} catch (error) {{
this.showError('提交失败: ' + error.message);
const submitBtn = document.querySelector('.submit-btn');
submitBtn.textContent = '提交答题';
submitBtn.disabled = false;
}}
}}
showResults(totalScore) {{
document.getElementById('questionsContainer').style.display = 'none';
document.getElementById('resultSection').style.display = 'block';
document.getElementById('scoreValue').textContent = totalScore + '';
}}
checkReportStatus() {{
let checkCount = 0;
const maxChecks = 30;
const checkInterval = setInterval(async () => {{
checkCount++;
try {{
const response = await fetch('/api/reports');
const data = await response.json();
const latestReport = data.reports.find(r => r.session_id === this.sessionId);
if (latestReport) {{
clearInterval(checkInterval);
this.showReportLink(latestReport.id);
}} else if (checkCount >= maxChecks) {{
clearInterval(checkInterval);
document.getElementById('reportStatus').innerHTML =
'报告生成时间较长,请稍后前往报告列表查看。' +
'<a href="/" style="color: #4facfe; text-decoration: none;">返回首页</a>';
}}
}} catch (error) {{
console.error('检查报告状态失败:', error);
}}
}}, 2000);
}}
showReportLink(reportId) {{
document.getElementById('reportStatus').innerHTML =
'✅ 测评报告已生成!' +
`<a href="/report.html?id=${{reportId}}" class="report-link">查看详细报告</a>`;
}}
showError(message) {{
const container = document.getElementById('questionsContainer');
container.innerHTML = `<div style="text-align: center; padding: 50px; color: #e74c3c; background: #ffebee; border-radius: 8px; margin: 20px;">${{message}}</div>`;
}}
}}
window.quiz = new QuizSystem();
</script>
</body>
</html>'''
@app.post("/api/report-callback", response_model=ApiResponse)
async def report_callback(request: ReportCallbackRequest):
"""接收异步报告生成完成的回调数据"""
session_id = request.session_id
report_data = request.data
if not session_id or not report_data:
raise HTTPException(status_code=400, detail="session_id和data不能为空")
try:
# 验证会话是否存在
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
SELECT id FROM quiz_sessions WHERE id = ?
''', (session_id,))
session_exists = cursor.fetchone()
if not session_exists:
conn.close()
raise HTTPException(status_code=404, detail="会话不存在")
# 获取分析数据(如果有保存的话)
cursor.execute('''
SELECT analysis_data FROM temp_analysis_data
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
''', (session_id,))
analysis_result = cursor.fetchone()
analysis_data = json.loads(analysis_result[0]) if analysis_result else {}
# 构造完整的报告数据
full_report_data = {
'studentInfo': report_data.get('studentInfo', {}),
'report': report_data.get('report', {}),
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data
}
# 保存报告到数据库
report_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO reports (id, session_id, report_data, analysis_data, generated_at)
VALUES (?, ?, ?, ?, ?)
''', (report_id, session_id, json.dumps(full_report_data), json.dumps(analysis_data), get_east8_time_string()))
# 更新会话状态为已完成
cursor.execute('''
UPDATE quiz_sessions
SET status = 'completed', completed_at = ?
WHERE id = ?
''', (get_east8_time_string(), session_id))
conn.commit()
conn.close()
print(f"✅ 回调处理成功: {session_id}")
print(f" 报告ID: {report_id}")
return ApiResponse(
success=True,
message='回调数据处理成功'
)
except HTTPException:
raise
except Exception as e:
print(f"❌ 回调处理失败: {e}")
raise HTTPException(status_code=500, detail=f"回调处理失败: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)