survey/main.py
2025-10-29 15:14:27 +08:00

436 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import uuid
import os
import asyncio
from datetime import datetime
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from enhanced_survey_system import enhanced_system, get_east8_time_string
app = FastAPI(title="Survey System", version="1.0.0")
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 静态文件服务
if os.path.exists("public"):
app.mount("/static", StaticFiles(directory="public"), name="static")
# 请求模型
class CreateSessionRequest(BaseModel):
name: str
school: str
grade: str
selectedTag: Optional[str] = ""
questionsConfig: Dict[str, int]
class SaveAnswersRequest(BaseModel):
sessionId: str
answers: List[Dict[str, Any]]
class GenerateReportRequest(BaseModel):
sessionId: str
class RegenerateReportRequest(BaseModel):
sessionId: str
# API端点
@app.post("/api/create-session")
async def create_session(request: CreateSessionRequest):
"""创建新的测评会话"""
try:
# 生成学生ID和会话ID
student_id = str(uuid.uuid4())
session_id = str(uuid.uuid4())
# 保存学生信息
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO students (id, name, school, grade, selected_tag, created_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (student_id, request.name, request.school, request.grade,
request.selectedTag, get_east8_time_string()))
# 创建会话
cursor.execute('''
INSERT INTO quiz_sessions (id, student_id, questions_config, status, created_at)
VALUES (?, ?, ?, ?, ?)
''', (session_id, student_id, json.dumps(request.questionsConfig),
'created', get_east8_time_string()))
conn.commit()
conn.close()
return JSONResponse(content={
'success': True,
'sessionId': session_id,
'studentId': student_id,
'message': '会话创建成功'
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/save-answers")
async def save_answers(request: SaveAnswersRequest):
"""保存答题结果"""
try:
# 计算总分
total_score = 0
for answer in request.answers:
if answer.get('userAnswer') == answer.get('correctAnswer'):
total_score += answer.get('score', 0)
# 保存答题结果
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
UPDATE quiz_sessions
SET answers = ?, status = 'completed', completed_at = ?, total_score = ?
WHERE id = ?
''', (json.dumps(request.answers), get_east8_time_string(),
total_score, request.sessionId))
conn.commit()
conn.close()
# 异步生成报告
asyncio.create_task(generate_report_async(request.sessionId))
return JSONResponse(content={
'success': True,
'totalScore': total_score,
'message': '答题结果已保存,正在生成报告'
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def generate_report_async(session_id: str):
"""异步生成报告"""
try:
await enhanced_system.generate_report(session_id)
except Exception as e:
print(f"生成报告失败: {e}")
@app.post("/api/generate-report")
async def generate_report(request: GenerateReportRequest):
"""手动生成报告"""
try:
result = await enhanced_system.generate_report(request.sessionId)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/regenerate-report")
async def regenerate_report(request: RegenerateReportRequest):
"""重新生成报告"""
try:
result = await enhanced_system.regenerate_report(request.sessionId)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/sessions-can-regenerate")
async def get_sessions_can_regenerate():
"""获取可以重新生成的会话列表"""
try:
sessions = enhanced_system.get_sessions_can_regenerate()
return JSONResponse(content={
'success': True,
'sessions': sessions
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/reports")
async def get_reports(page: int = 1, page_size: int = 10):
"""获取报告列表"""
try:
data = enhanced_system.get_reports_list(page, page_size)
return JSONResponse(content=data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/report/{report_id}")
async def get_report(report_id: str):
"""获取单个报告"""
try:
report = enhanced_system.get_report_by_id(report_id)
if not report:
raise HTTPException(status_code=404, detail="报告不存在")
# 解析报告数据
report_data = json.loads(report['report_data'])
response_data = {
'studentInfo': report_data.get('studentInfo', {}),
'report': report_data.get('report', {}),
'footer': {
'copyright': '© 2024 尚逸基石教育科技有限公司 版权所有'
}
}
return JSONResponse(content=response_data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/questions/{session_id}")
async def get_questions(session_id: str):
"""根据会话ID获取筛选后的题目"""
try:
# 从数据库获取会话信息
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.selected_tag, qs.questions_config
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="会话不存在")
selected_tag = session_data['selected_tag'] or ''
questions_config = json.loads(session_data['questions_config'])
# 加载所有题目
with open('public/questions.json', 'r', encoding='utf-8') as f:
all_questions = json.load(f)
# 根据标签筛选题目
filtered_questions = filter_questions_by_tag(all_questions, selected_tag)
# 根据配置选择题目
selected_questions = select_questions_by_config(filtered_questions, questions_config)
response_data = {
'questions': selected_questions,
'selectedTag': selected_tag,
'questionsConfig': questions_config
}
return JSONResponse(content=response_data)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/questions")
async def get_all_questions():
"""获取所有题库数据"""
try:
with open('public/questions.json', 'r', encoding='utf-8') as f:
questions_data = json.load(f)
return JSONResponse(content=questions_data)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="题库文件不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=f"加载题库失败: {str(e)}")
@app.get("/api/tags")
async def get_tags():
"""获取标签数据"""
try:
with open('public/tags.json', 'r', encoding='utf-8') as f:
tags_data = json.load(f)
return JSONResponse(content=tags_data)
except FileNotFoundError:
# 如果tags.json不存在生成备用标签数据
try:
with open('public/questions.json', 'r', encoding='utf-8') as f:
questions_data = json.load(f)
all_tags = set()
for questions in questions_data.values():
for question in questions:
tags = question.get('题目标签', '') or question.get('标签', '')
if tags:
for tag in tags.split(r'[\s,]+'):
if tag.strip():
all_tags.add(tag.strip())
backup_tags = {
"tags": sorted(list(all_tags)),
"tag_counts": {},
"total_unique_tags": len(all_tags)
}
return JSONResponse(content=backup_tags)
except Exception as e:
raise HTTPException(status_code=500, detail=f"生成标签数据失败: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"加载标签数据失败: {str(e)}")
# 页面路由
@app.get("/", response_class=HTMLResponse)
async def index():
"""主页面"""
try:
with open('public/index.html', 'r', encoding='utf-8') as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="主页文件不存在")
@app.get("/survey.html", response_class=HTMLResponse)
async def survey_page():
"""测评配置页面"""
try:
with open('public/survey.html', 'r', encoding='utf-8') as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="测评页面文件不存在")
@app.get("/report.html", response_class=HTMLResponse)
async def report_page():
"""报告查看页面"""
try:
with open('public/report.html', 'r', encoding='utf-8') as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="报告页面文件不存在")
@app.get("/quiz/{session_id}", response_class=HTMLResponse)
async def quiz_page(session_id: str):
"""答题页面"""
try:
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT s.*, qs.questions_config, qs.status
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_id,))
session_data = cursor.fetchone()
conn.close()
if not session_data:
raise HTTPException(status_code=404, detail="Session not found")
# 生成答题页面HTML
html_content = generate_quiz_page(session_data, session_id)
return HTMLResponse(content=html_content)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 辅助函数
def filter_questions_by_tag(all_questions, selected_tag):
"""根据标签筛选题目"""
if not selected_tag:
return all_questions
filtered = {
"基础题": [],
"进阶题": [],
"竞赛题": []
}
for question_type in ["基础题", "进阶题", "竞赛题"]:
for question in all_questions.get(question_type, []):
question_tags = question.get('题目标签', '') or question.get('标签', '')
# 正确分割标签(按空格、逗号等分隔符)
if isinstance(question_tags, str):
import re
tag_list = re.split(r'[\s,]+', question_tags.strip())
if selected_tag in tag_list:
filtered[question_type].append(question)
return filtered
def select_questions_by_config(filtered_questions, questions_config):
"""根据配置从筛选后的题目中选择题目"""
selected = []
# 按照题目类型顺序:基础题 -> 进阶题 -> 竞赛题
question_order = ["基础题", "进阶题", "竞赛题"]
for question_type in question_order:
if question_type not in questions_config:
continue
count = questions_config[question_type]
available_questions = filtered_questions.get(question_type, [])
if len(available_questions) < count:
print(f"警告:{question_type}可用题目不足 ({len(available_questions)}/{count})")
# 随机选择指定数量的题目
import random
random.shuffle(available_questions)
selected_for_type = available_questions[:min(count, len(available_questions))]
for question in selected_for_type:
selected.append({
**question,
'questionType': question_type,
'questionId': f"{question_type}_{question['序号']}"
})
return selected
def generate_quiz_page(session_data, session_id):
"""生成答题页面HTML"""
questions_config = json.loads(session_data['questions_config'])
student_name = session_data['name']
school = session_data['school']
grade = session_data['grade']
# 这里可以复用原有的HTML生成逻辑为了简化返回一个简单的页面
return f'''
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{student_name} - 学科能力测评</title>
</head>
<body>
<h1>测评页面</h1>
<p>学生: {student_name}</p>
<p>学校: {school}</p>
<p>年级: {grade}</p>
<p>会话ID: {session_id}</p>
<div id="questions-container">加载中...</div>
<script>
fetch('/api/questions/{session_id}')
.then(response => response.json())
.then(data => {{
console.log('题目数据:', data);
// 这里可以渲染题目
}})
.catch(error => console.error('加载题目失败:', error));
</script>
</body>
</html>
'''
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)