#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sqlite3 import json import uuid import os from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import cgi import asyncio import threading from enhanced_survey_system import enhanced_system, get_east8_time_string class EnhancedSurveyHandler(BaseHTTPRequestHandler): """增强的测评系统HTTP处理器""" def do_OPTIONS(self): """处理预检请求""" self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def send_cors_headers(self): """发送CORS头""" self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') def do_POST(self): """处理POST请求""" parsed_path = urlparse(self.path) path = parsed_path.path try: if path == '/api/create-session': self.handle_create_session() elif path == '/api/save-answers': self.handle_save_answers() elif path == '/api/generate-report': self.handle_generate_report() elif path == '/api/regenerate-report': self.handle_regenerate_report() elif path == '/api/sessions-can-regenerate': self.handle_get_sessions_can_regenerate() elif path.startswith('/api/report/'): self.handle_get_report(path) else: self.send_error(404, "API endpoint not found") except Exception as e: self.send_error(500, str(e)) def handle_create_session(self): """创建答题会话""" content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) name = data.get('name', '').strip() school = data.get('school', '').strip() grade = data.get('grade', '').strip() selected_tag = data.get('selectedTag', '').strip() questions_config = data.get('questionsConfig', {}) if not name or not school or not grade: self.send_error(400, "姓名、学校和年级不能为空") return # 创建学员记录 student_id = str(uuid.uuid4()) conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() cursor.execute(''' INSERT INTO students (id, name, school, grade, selected_tag) VALUES (?, ?, ?, ?, ?) ''', (student_id, name, school, grade, selected_tag)) # 创建答题会话 session_id = str(uuid.uuid4()) cursor.execute(''' INSERT INTO quiz_sessions (id, student_id, questions_config, status) VALUES (?, ?, ?, 'created') ''', (session_id, student_id, json.dumps(questions_config))) conn.commit() conn.close() response = { 'success': True, 'sessionId': session_id, 'studentId': student_id } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) def handle_save_answers(self): """保存答题结果""" content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) session_id = data.get('sessionId') answers = data.get('answers', []) if not session_id or not answers: self.send_error(400, "会话ID和答题结果不能为空") return # 保存答题结果 - JSON格式存储,包含用户信息 conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row # 设置行工厂以支持字典访问 cursor = conn.cursor() # 获取用户信息和会话信息 cursor.execute(''' SELECT s.name, s.school, s.grade, s.selected_tag, qs.questions_config FROM students s JOIN quiz_sessions qs ON s.id = qs.student_id WHERE qs.id = ? ''', (session_id,)) session_info = cursor.fetchone() if not session_info: self.send_error(404, "会话不存在") return student_name = session_info['name'] student_school = session_info['school'] student_grade = session_info['grade'] selected_tag = session_info['selected_tag'] total_score = 0 # 处理答题数据 processed_answers = [] for answer in answers: # 兼容驼峰和下划线命名 user_answer = answer.get('userAnswer') or answer.get('user_answer', '') correct_answer = answer.get('correctAnswer') or answer.get('correct_answer', '') question_id = answer.get('questionId') or answer.get('question_id', '') question_text = answer.get('questionText') or answer.get('question_text', '') question_type = answer.get('questionType') or answer.get('question_type', '') is_correct = user_answer == correct_answer score = answer.get('score', 0) if is_correct else 0 total_score += score processed_answers.append({ 'questionId': question_id, 'questionText': question_text, 'questionType': question_type, 'userAnswer': user_answer, 'correctAnswer': correct_answer, 'isCorrect': is_correct, 'score': score }) # 将整个答题数据保存为JSON answers_json = json.dumps(processed_answers, ensure_ascii=False) answer_id = str(uuid.uuid4()) # 使用INSERT OR REPLACE确保每个session只有一条记录,包含用户信息 cursor.execute(''' INSERT OR REPLACE INTO quiz_answers (id, session_id, student_name, student_school, student_grade, selected_tag, answers_data, total_score) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (answer_id, session_id, student_name, student_school, student_grade, selected_tag, answers_json, total_score)) # 更新会话状态 cursor.execute(''' UPDATE quiz_sessions SET status = 'completed', total_score = ?, completed_at = ? WHERE id = ? ''', (total_score, get_east8_time_string(), session_id)) conn.commit() conn.close() # 异步生成报告 threading.Thread(target=self.async_generate_report, args=(session_id,), daemon=True).start() response = { 'success': True, 'totalScore': total_score, 'sessionId': session_id } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) def async_generate_report(self, session_id): """异步生成报告""" try: asyncio.run(enhanced_system.auto_generate_report(session_id)) print(f"报告生成成功: {session_id}") except Exception as e: print(f"报告生成失败: {e}") def handle_generate_report(self): """手动生成报告""" content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) session_id = data.get('sessionId') if not session_id: self.send_error(400, "会话ID不能为空") return # 同步生成报告 try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(enhanced_system.auto_generate_report(session_id)) loop.close() response = { 'success': result['success'], 'message': '报告生成成功' if result['success'] else result['error'] } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) except Exception as e: self.send_error(500, str(e)) def do_GET(self): """处理GET请求""" parsed_path = urlparse(self.path) path = parsed_path.path query_params = parse_qs(parsed_path.query) try: if path == '/api/reports': self.handle_get_reports(query_params) elif path.startswith('/api/report/'): self.handle_get_report(path) elif path == '/api/questions': self.handle_get_questions() elif path == '/api/tags': self.handle_get_tags() elif path.startswith('/api/questions/'): session_id = path.split('/')[-1] self.handle_get_filtered_questions(session_id) elif path.startswith('/quiz/'): self.handle_quiz_page(path) elif path == '/': self.serve_index_page() elif path.startswith('/public/'): self.serve_static_file(path) elif path == '/survey.html': self.serve_survey_page() elif path == '/report.html': self.serve_report_page() else: self.send_error(404, "Page not found") except Exception as e: self.send_error(500, str(e)) def handle_get_reports(self, query_params): """获取报告列表""" page = int(query_params.get('page', [1])[0]) page_size = int(query_params.get('pageSize', [10])[0]) data = enhanced_system.get_reports_list(page, page_size) self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def handle_get_report(self, path): """获取单个报告""" # 从路径中提取报告ID: /api/report/{report_id} report_id = path.split('/')[-1] report = enhanced_system.get_report_by_id(report_id) if not report: self.send_error(404, "报告不存在") return # 解析报告数据 try: report_data = json.loads(report['report_data']) analysis_data = json.loads(report['analysis_data']) # 返回与外部API相同格式的数据 response_data = { 'studentInfo': report_data.get('studentInfo', {}), 'report': report_data.get('report', {}), 'footer': { 'copyright': '© 2024 尚逸基石教育科技有限公司 版权所有' } } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response_data, ensure_ascii=False).encode('utf-8')) except Exception as e: self.send_error(500, f"解析报告数据失败: {str(e)}") def handle_get_questions(self): """获取题库数据""" try: with open('public/questions.json', 'r', encoding='utf-8') as f: questions_data = json.load(f) self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(questions_data, ensure_ascii=False).encode('utf-8')) except FileNotFoundError: self.send_error(404, "题库文件不存在") except Exception as e: self.send_error(500, f"加载题库失败: {str(e)}") def handle_get_tags(self): """获取标签数据""" try: with open('public/tags.json', 'r', encoding='utf-8') as f: tags_data = json.load(f) self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(tags_data, ensure_ascii=False).encode('utf-8')) except FileNotFoundError: # 如果tags.json不存在,生成备用标签数据 try: with open('public/questions.json', 'r', encoding='utf-8') as f: questions_data = json.load(f) all_tags = set() for questions in questions_data.values(): for question in questions: tags = question.get('题目标签', '') or question.get('标签', '') if tags: for tag in tags.split(r'[\s,,]+'): if tag.strip(): all_tags.add(tag.strip()) backup_tags = { "tags": sorted(list(all_tags)), "tag_counts": {}, "total_unique_tags": len(all_tags) } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(backup_tags, ensure_ascii=False).encode('utf-8')) except Exception as e: self.send_error(500, f"生成标签数据失败: {str(e)}") except Exception as e: self.send_error(500, f"加载标签数据失败: {str(e)}") def handle_get_filtered_questions(self, session_id): """根据会话ID获取筛选后的题目""" try: # 从数据库获取会话信息,包括选择的标签和题目配置 conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT s.selected_tag, qs.questions_config FROM students s JOIN quiz_sessions qs ON s.id = qs.student_id WHERE qs.id = ? ''', (session_id,)) session_data = cursor.fetchone() conn.close() if not session_data: self.send_error(404, "会话不存在") return selected_tag = session_data['selected_tag'] or '' questions_config = json.loads(session_data['questions_config']) # 加载所有题目 with open('public/questions.json', 'r', encoding='utf-8') as f: all_questions = json.load(f) # 根据标签筛选题目 filtered_questions = self.filter_questions_by_tag(all_questions, selected_tag) # 根据配置选择题目 selected_questions = self.select_questions_by_config(filtered_questions, questions_config) response_data = { 'questions': selected_questions, 'selectedTag': selected_tag, 'questionsConfig': questions_config } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response_data, ensure_ascii=False).encode('utf-8')) except Exception as e: error_msg = f"获取筛选题目失败: {str(e)}" self.send_response(500) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_cors_headers() self.end_headers() self.wfile.write(error_msg.encode('utf-8')) def filter_questions_by_tag(self, all_questions, selected_tag): """根据标签筛选题目""" if not selected_tag: return all_questions filtered = { "基础题": [], "进阶题": [], "竞赛题": [] } for question_type in ["基础题", "进阶题", "竞赛题"]: for question in all_questions.get(question_type, []): question_tags = question.get('题目标签', '') or question.get('标签', '') # 正确分割标签(按空格、逗号等分隔符) if isinstance(question_tags, str): import re tag_list = re.split(r'[\s,,]+', question_tags.strip()) if selected_tag in tag_list: filtered[question_type].append(question) return filtered def select_questions_by_config(self, filtered_questions, questions_config): """根据配置从筛选后的题目中选择题目""" selected = [] # 按照题目类型顺序:基础题 -> 进阶题 -> 竞赛题 question_order = ["基础题", "进阶题", "竞赛题"] for question_type in question_order: if question_type not in questions_config: continue count = questions_config[question_type] available_questions = filtered_questions.get(question_type, []) if len(available_questions) < count: print(f"警告:{question_type}可用题目不足 ({len(available_questions)}/{count})") # 随机选择指定数量的题目 import random random.shuffle(available_questions) selected_for_type = available_questions[:min(count, len(available_questions))] for question in selected_for_type: selected.append({ **question, 'questionType': question_type, 'questionId': f"{question_type}_{question['序号']}" }) return selected def serve_index_page(self): """提供主页面""" try: with open('public/index.html', 'r', encoding='utf-8') as f: content = f.read() self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_cors_headers() self.end_headers() self.wfile.write(content.encode('utf-8')) except FileNotFoundError: self.send_error(404, "主页文件不存在") def serve_survey_page(self): """提供测评配置页面""" try: with open('public/survey.html', 'r', encoding='utf-8') as f: content = f.read() self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_cors_headers() self.end_headers() self.wfile.write(content.encode('utf-8')) except FileNotFoundError: self.send_error(404, "测评页面文件不存在") def serve_report_page(self): """提供报告查看页面""" try: with open('public/report.html', 'r', encoding='utf-8') as f: content = f.read() self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_cors_headers() self.end_headers() self.wfile.write(content.encode('utf-8')) except FileNotFoundError: self.send_error(404, "报告页面文件不存在") def serve_static_file(self, path): """提供静态文件服务""" try: file_path = '.' + path # 检查文件是否存在 if not os.path.exists(file_path): self.send_error(404, "文件不存在") return # 根据文件扩展名确定内容类型 if path.endswith('.html'): content_type = 'text/html; charset=utf-8' elif path.endswith('.css'): content_type = 'text/css; charset=utf-8' elif path.endswith('.js'): content_type = 'application/javascript; charset=utf-8' elif path.endswith('.json'): content_type = 'application/json; charset=utf-8' elif path.endswith('.png'): content_type = 'image/png' elif path.endswith('.jpg') or path.endswith('.jpeg'): content_type = 'image/jpeg' else: content_type = 'text/plain' with open(file_path, 'rb') as f: content = f.read() self.send_response(200) self.send_header('Content-Type', content_type) self.send_cors_headers() self.end_headers() if isinstance(content, str): self.wfile.write(content.encode('utf-8')) else: self.wfile.write(content) except FileNotFoundError: self.send_error(404, "文件不存在") except Exception as e: self.send_error(500, str(e)) def handle_quiz_page(self, path): """处理答题页面""" session_id = path.split('/')[-1] conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT s.*, qs.questions_config, qs.status FROM students s JOIN quiz_sessions qs ON s.id = qs.student_id WHERE qs.id = ? ''', (session_id,)) session_data = cursor.fetchone() conn.close() if not session_data: self.send_error(404, "Session not found") return # 生成答题页面HTML html_content = self.generate_quiz_page(session_data, session_id) self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_cors_headers() self.end_headers() self.wfile.write(html_content.encode('utf-8')) def generate_quiz_page(self, session_data, session_id): """生成答题页面HTML""" questions_config = json.loads(session_data['questions_config']) student_name = session_data['name'] school = session_data['school'] grade = session_data['grade'] return f''' {student_name} - 学科能力测评

🎯 学科能力测评

请认真回答以下问题

姓名
{student_name}
学校
{school}
年级
{grade}
选题范围
正在加载...
正在加载题目...
0分
测评完成!正在生成报告...
请稍候,系统正在为您生成详细的测评报告...
''' def handle_regenerate_report(self): """处理重新生成报告请求""" content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) session_id = data.get('sessionId') if not session_id: self.send_error(400, "会话ID不能为空") return # 同步重新生成报告 try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(enhanced_system.regenerate_report(session_id)) loop.close() response = { 'success': result['success'], 'message': '报告重新生成成功' if result['success'] else result['error'] } if result['success']: response['reportId'] = result.get('report_id') self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) except Exception as e: self.send_error(500, str(e)) def handle_get_sessions_can_regenerate(self): """获取可以重新生成的会话列表""" try: sessions = enhanced_system.get_sessions_can_regenerate() response = { 'success': True, 'sessions': sessions } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8')) except Exception as e: self.send_error(500, str(e)) def run_enhanced_server(port=5678): """运行增强的HTTP服务器""" server_address = ('', port) httpd = HTTPServer(server_address, EnhancedSurveyHandler) print(f"增强版Survey系统已启动: http://localhost:{port}") print("功能包括:") print("- 测评配置和答题") print("- 自动生成测评报告") print("- 报告列表管理") print("- 学员数据管理") httpd.serve_forever() if __name__ == "__main__": run_enhanced_server()