#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sqlite3 import json import uuid import requests import os import time from datetime import datetime, timezone, timedelta from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import threading def get_east8_time(): """获取东八区时间""" east8_tz = timezone(timedelta(hours=8)) return datetime.now(east8_tz) def get_east8_time_string(): """获取东八区时间字符串格式,用于数据库存储""" return get_east8_time().strftime('%Y-%m-%d %H:%M:%S') def load_env_config(): """加载环境变量配置""" try: # 尝试加载 .env 文件 env_file = "/Users/moshui/Documents/survey/.env" if os.path.exists(env_file): with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() except Exception as e: print(f"加载环境变量失败: {e}") class ReportGenerator: """报告生成器 - 调用自定义API生成测评报告""" def __init__(self): # 加载环境变量 load_env_config() # 自定义API配置 self.api_url = os.getenv("REPORT_API_URL", "http://120.26.23.172:5678/webhook/survey_report") self.api_key = os.getenv("REPORT_API_KEY", "") self.timeout = int(os.getenv("REPORT_API_TIMEOUT", "300")) self.prompt_file = "/Users/moshui/Documents/survey/public/prompt.txt" # 请求头 self.headers = { "Content-Type": "application/json" } # 添加API密钥(如果设置了) if self.api_key: self.headers["Authorization"] = f"Bearer {self.api_key}" print(f"报告生成器配置:") print(f" - API端点: {self.api_url}") print(f" - 超时时间: {self.timeout}秒") def load_prompt(self): """加载提示词""" try: with open(self.prompt_file, 'r', encoding='utf-8') as f: return f.read() except Exception as e: print(f"加载提示词失败: {e}") return "" def generate_analysis_text(self, session_data): """生成答题情况分析文本""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 获取学员信息 cursor.execute(''' SELECT s.*, qs.started_at, qs.completed_at, qs.total_score FROM students s JOIN quiz_sessions qs ON s.id = qs.student_id WHERE qs.id = ? ''', (session_data['id'],)) student_info = cursor.fetchone() # 获取答题详情 - 从JSON格式读取,同时获取用户信息 cursor.execute(''' SELECT answers_data, student_name, student_school, student_grade, selected_tag FROM quiz_answers WHERE session_id = ? ''', (session_data['id'],)) result = cursor.fetchone() answers = [] if result and result['answers_data']: try: answers = json.loads(result['answers_data']) print(f"成功解析JSON答题数据,共{len(answers)}题") except json.JSONDecodeError: print(f"解析JSON答题数据失败: {session_data['id']}") answers = [] else: print(f"未找到答题数据: {session_data['id']}") conn.close() # 生成分析文本 - 确保markdown表格格式正确 analysis_text = "# 考试答题情况分析\n\n" analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n" analysis_text += "|------|------|----------|----------|----------|------|\n" # 使用独立字段中的用户信息 student_name = result['student_name'] if result else student_info.get('name', '未知') student_school = result['student_school'] if result else student_info.get('school', '未知') student_grade = result['student_grade'] if result else student_info.get('grade', '未知') selected_tag = result['selected_tag'] if result else '未指定' # 添加基本信息 analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n" analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n" analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n" analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n" # 添加答题详情 - 确保每个字段都不为空 for answer in answers: is_correct = "✓" if answer.get('isCorrect', False) else "✗" user_answer = answer.get('userAnswer', '').strip() correct_answer = answer.get('correctAnswer', '').strip() question_text = answer.get('questionText', '').strip() question_type = answer.get('questionType', '').strip() score = answer.get('score', 0) # 确保题目文本不为空 if not question_text: question_text = "未知题目" # 确保题型不为空 if not question_type: question_type = "单选题" # 确保答案文本处理正确 if not user_answer: user_answer = "未作答" if not correct_answer: correct_answer = "无标准答案" # 转义markdown中的特殊字符 question_text = question_text.replace('|', '\\|') user_answer = user_answer.replace('|', '\\|') correct_answer = correct_answer.replace('|', '\\|') analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n" # 打印生成的分析文本用于调试 print(f"生成的分析文本长度: {len(analysis_text)} 字符") print(f"包含题目数量: {len(answers)}") print("分析文本预览:") print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text) return { 'analysis_text': analysis_text, 'student_info': dict(student_info), 'answers': answers } async def generate_report(self, session_id): """生成测评报告""" try: # 获取会话数据 analysis_data = self.generate_analysis_text({'id': session_id}) # 调用自定义API生成报告 # 构造回调URL report_result = self.call_report_api(analysis_data, session_id) if report_result and report_result.get('success'): # API调用成功,异步生成中 # 保存分析数据用于后续回调处理 self.save_analysis_data_for_regeneration(session_id, analysis_data) return { 'success': True, 'message': '报告异步生成中', 'session_id': session_id, 'analysis_data': analysis_data } else: raise Exception("API调用失败") except Exception as e: print(f"生成报告失败: {e}") # 保存分析数据到数据库,允许后续重新生成 self.save_analysis_data_for_regeneration(session_id, analysis_data) return { 'success': False, 'error': str(e), 'session_id': session_id, 'can_regenerate': True } def call_report_api(self, analysis_data, session_id): """调用自定义报告API生成报告""" # 构建请求数据 request_data = { "analysis_text": analysis_data['analysis_text'], "session_id": session_id } # 实现重试机制 max_retries = 3 retry_delay = 2 # 秒 try: print(f"调用报告生成API...") print(f" API端点: {self.api_url}") print(f" Session ID: {session_id}") print(f" 数据长度: {len(request_data['analysis_text'])} 字符") response = requests.post( self.api_url, headers=self.headers, json=request_data, timeout=10 # 简化超时时间 ) if response.status_code == 200: result = response.json() print(f"✅ API调用成功,已提交异步报告生成请求") print(f" 响应内容: {str(result)[:200]}...") return {"success": True, "message": "异步报告生成请求已提交"} else: raise Exception(f"API调用失败: HTTP {response.status_code} - {response.text}") except Exception as e: print(f"API调用出错: {e}") raise Exception(f"报告生成API调用失败: {str(e)}") def save_report_to_db(self, session_id, report_data, analysis_data): """保存报告到数据库""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 创建报告表(如果不存在) cursor.execute(''' CREATE TABLE IF NOT EXISTS reports ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, report_data TEXT NOT NULL, analysis_data TEXT NOT NULL, generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES quiz_sessions (id) ) ''') # 保存报告 report_id = str(uuid.uuid4()) cursor.execute(''' INSERT INTO reports (id, session_id, report_data, analysis_data) VALUES (?, ?, ?, ?) ''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False), json.dumps(analysis_data, ensure_ascii=False))) # 更新会话状态 cursor.execute(''' UPDATE quiz_sessions SET status = 'report_generated' WHERE id = ? ''', (session_id,)) conn.commit() conn.close() return report_id def save_analysis_data_for_regeneration(self, session_id, analysis_data): """保存分析数据以便重新生成报告""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 创建临时分析数据表(如果不存在) cursor.execute(''' CREATE TABLE IF NOT EXISTS temp_analysis_data ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, analysis_data TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES quiz_sessions (id) ) ''') # 删除该session的旧数据(如果存在) cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,)) # 保存新的分析数据 analysis_id = str(uuid.uuid4()) cursor.execute(''' INSERT INTO temp_analysis_data (id, session_id, analysis_data) VALUES (?, ?, ?) ''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False))) # 更新会话状态为可以重新生成 cursor.execute(''' UPDATE quiz_sessions SET status = 'can_regenerate' WHERE id = ? ''', (session_id,)) conn.commit() conn.close() return analysis_id class EnhancedSurveySystem: """增强的测评系统""" def __init__(self): self.report_generator = ReportGenerator() self.init_database() def init_database(self): """初始化数据库""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 确保必要的表存在 cursor.execute(''' CREATE TABLE IF NOT EXISTS reports ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, report_data TEXT NOT NULL, analysis_data TEXT NOT NULL, generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES quiz_sessions (id) ) ''') conn.commit() conn.close() def get_reports_list(self, page=1, page_size=10): """获取报告列表""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 获取总数 cursor.execute('SELECT COUNT(*) as total FROM reports') total = cursor.fetchone()['total'] # 获取分页数据 offset = (page - 1) * page_size cursor.execute(''' SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at FROM reports r JOIN quiz_sessions qs ON r.session_id = qs.id JOIN students s ON qs.student_id = s.id ORDER BY r.generated_at DESC LIMIT ? OFFSET ? ''', (page_size, offset)) reports = [dict(row) for row in cursor.fetchall()] conn.close() return { 'reports': reports, 'total': total, 'page': page, 'page_size': page_size, 'total_pages': (total + page_size - 1) // page_size } def get_report_by_id(self, report_id): """根据ID获取报告""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT r.*, s.name, s.school, s.grade FROM reports r JOIN quiz_sessions qs ON r.session_id = qs.id JOIN students s ON qs.student_id = s.id WHERE r.id = ? ''', (report_id,)) report = cursor.fetchone() conn.close() return dict(report) if report else None def get_sessions_can_regenerate(self): """获取可以重新生成的会话列表""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at FROM quiz_sessions qs JOIN students s ON qs.student_id = s.id JOIN temp_analysis_data tad ON qs.id = tad.session_id WHERE qs.status = 'can_regenerate' ORDER BY tad.created_at DESC ''') sessions = [dict(row) for row in cursor.fetchall()] conn.close() return sessions async def auto_generate_report(self, session_id): """自动生成报告(答题完成后调用)""" return await self.report_generator.generate_report(session_id) async def regenerate_report(self, session_id): """重新生成报告(从保存的分析数据)""" try: # 获取保存的分析数据 conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() cursor.execute(''' SELECT analysis_data FROM temp_analysis_data WHERE session_id = ? ORDER BY created_at DESC LIMIT 1 ''', (session_id,)) result = cursor.fetchone() conn.close() if not result: raise Exception("未找到可重新生成的分析数据") analysis_data = json.loads(result[0]) # 调用API生成报告 report_result = self.report_generator.call_report_api(analysis_data, session_id) if report_result: # 提取学员信息用于更新报告数据 student_info = analysis_data['student_info'] report_result['studentInfo']['name'] = student_info.get('name', '未知') report_result['studentInfo']['school'] = student_info.get('school', '未知') report_result['studentInfo']['grade'] = student_info.get('grade', '未知') report_result['studentInfo']['subject'] = '科学' report_result['studentInfo']['testDate'] = get_east8_time().strftime('%Y年%m月%d日') # 构造报告数据 report_data = { 'studentInfo': report_result['studentInfo'], 'report': report_result['report'], 'generated_at': get_east8_time().isoformat(), 'session_id': session_id, 'analysis_data': analysis_data, 'is_regenerated': True } # 保存报告到数据库 self.report_generator.save_report_to_db(session_id, report_data, analysis_data) # 清理临时数据 self.cleanup_temp_analysis_data(session_id) return { 'success': True, 'report_data': report_data, 'analysis_data': analysis_data, 'is_regenerated': True } else: raise Exception("API返回空内容") except Exception as e: print(f"重新生成报告失败: {e}") return { 'success': False, 'error': str(e) } def cleanup_temp_analysis_data(self, session_id): """清理临时分析数据""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,)) conn.commit() conn.close() # 全局系统实例 enhanced_system = EnhancedSurveySystem()