#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sqlite3 import json import uuid import os import time from datetime import datetime, timezone, timedelta from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import threading from zai import ZhipuAiClient def get_east8_time(): """获取东八区时间""" east8_tz = timezone(timedelta(hours=8)) return datetime.now(east8_tz) def get_east8_time_string(): """获取东八区时间字符串格式,用于数据库存储""" return get_east8_time().strftime('%Y-%m-%d %H:%M:%S') def load_env_config(): """加载环境变量配置""" try: # 尝试加载 .env 文件 env_file = "/Users/moshui/Documents/survey/.env" if os.path.exists(env_file): with open(env_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() except Exception as e: print(f"加载环境变量失败: {e}") class ReportGenerator: """报告生成器 - 调用GLM大语言模型API生成测评报告""" def __init__(self): # 加载环境变量 load_env_config() # GLM API配置 self.api_key = os.getenv("GLM_API_KEY", "") self.timeout = int(os.getenv("LLM_API_TIMEOUT", "300")) self.model_name = "glm-4.5-air" self.prompt_file = "./public/prompt.md" # 初始化 Zhipu AI 客户端 self.client = ZhipuAiClient(api_key=self.api_key) print(f"报告生成器配置:") print(f" - SDK: ZhipuAiClient") print(f" - 模型: {self.model_name}") print(f" - api_key: {self.api_key}") print(f" - 超时时间: {self.timeout}秒") def load_prompt(self): """加载提示词""" try: with open(self.prompt_file, 'r', encoding='utf-8') as f: return f.read() except Exception as e: print(f"加载提示词失败: {e}") return "" def generate_analysis_text(self, session_data): """生成答题情况分析文本""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 获取学员信息 cursor.execute(''' SELECT s.*, qs.started_at, qs.completed_at, qs.total_score FROM students s JOIN quiz_sessions qs ON s.id = qs.student_id WHERE qs.id = ? ''', (session_data['id'],)) student_info = cursor.fetchone() # 获取答题详情 - 从JSON格式读取,同时获取用户信息 cursor.execute(''' SELECT answers_data, student_name, student_school, student_grade, selected_tag FROM quiz_answers WHERE session_id = ? ''', (session_data['id'],)) result = cursor.fetchone() answers = [] if result and result['answers_data']: try: answers = json.loads(result['answers_data']) print(f"成功解析JSON答题数据,共{len(answers)}题") except json.JSONDecodeError: print(f"解析JSON答题数据失败: {session_data['id']}") answers = [] else: print(f"未找到答题数据: {session_data['id']}") conn.close() # 生成分析文本 - 确保markdown表格格式正确 analysis_text = "# 考试答题情况分析\n\n" # 添加总分信息 total_score = student_info['total_score'] if student_info and student_info['total_score'] is not None else 0 analysis_text += f"## 总分:{total_score} 分\n\n" analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n" analysis_text += "|------|------|----------|----------|----------|------|\n" # 使用独立字段中的用户信息 student_name = result['student_name'] if result else (student_info['name'] if student_info and student_info['name'] is not None else '未知') student_school = result['student_school'] if result else (student_info['school'] if student_info and student_info['school'] is not None else '未知') student_grade = result['student_grade'] if result else (student_info['grade'] if student_info and student_info['grade'] is not None else '未知') selected_tag = result['selected_tag'] if result else '未指定' # 添加基本信息 analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n" analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n" analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n" analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n" # 添加答题详情 - 确保每个字段都不为空 for answer in answers: is_correct = "✓" if answer.get('isCorrect', False) else "✗" user_answer = answer.get('userAnswer', '').strip() correct_answer = answer.get('correctAnswer', '').strip() question_text = answer.get('questionText', '').strip() question_type = answer.get('questionType', '').strip() score = answer.get('score', 0) # 确保题目文本不为空 if not question_text: question_text = "未知题目" # 确保题型不为空 if not question_type: question_type = "单选题" # 确保答案文本处理正确 if not user_answer: user_answer = "未作答" if not correct_answer: correct_answer = "无标准答案" # 转义markdown中的特殊字符 question_text = question_text.replace('|', '\\|') user_answer = user_answer.replace('|', '\\|') correct_answer = correct_answer.replace('|', '\\|') analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n" # 打印生成的分析文本用于调试 print(f"生成的分析文本长度: {len(analysis_text)} 字符") print(f"包含题目数量: {len(answers)}") print("分析文本预览:") print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text) return { 'analysis_text': analysis_text, 'student_info': dict(student_info), 'answers': answers } async def generate_report(self, session_id): """生成测评报告""" analysis_data = None try: # 获取会话数据 analysis_data = self.generate_analysis_text({'id': session_id}) # 调用GLM API生成报告 report_result = self.call_report_api(analysis_data, session_id) if report_result and report_result.get('success'): # API调用成功,直接生成报告 # 构造完整的报告数据 student_info = analysis_data.get('student_info', {}) # 构造报告数据 complete_report_data = { 'studentInfo': { 'name': student_info.get('name', '未知'), 'school': student_info.get('school', '未知'), 'grade': student_info.get('grade', '未知'), 'subject': '科学', 'testDate': get_east8_time().strftime('%Y年%m月%d日') }, 'report': report_result['report_data'], 'generated_at': get_east8_time().isoformat(), 'session_id': session_id, 'analysis_data': analysis_data, 'is_direct_generation': True } # 保存报告到数据库 report_id = self.save_report_to_db(session_id, complete_report_data, analysis_data) return { 'success': True, 'message': '报告生成成功', 'session_id': session_id, 'report_id': report_id, 'report_data': complete_report_data, 'analysis_data': analysis_data } else: raise Exception("GLM API调用失败") except Exception as e: print(f"生成报告失败: {e}") # 保存分析数据到数据库,允许后续重新生成 if analysis_data: self.save_analysis_data_for_regeneration(session_id, analysis_data) return { 'success': False, 'error': str(e), 'session_id': session_id, 'can_regenerate': True } def call_report_api(self, analysis_data, session_id): """调用GLM大语言模型API生成报告""" # 加载提示词 prompt = self.load_prompt() try: print(f"调用GLM大语言模型SDK...") print(f" Session ID: {session_id}") print(f" 模型: {self.model_name}") # 使用ZhipuAiClient调用API,设置JSON格式响应 response = self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "system", "content": prompt }, { "role": "user", "content": analysis_data['analysis_text'] } ], response_format={ "type": "json_object" } ) print(f"✅ GLM SDK调用成功") print(f" 响应长度: {len(response.choices[0].message.content)} 字符") # 解析JSON响应 model_response = response.choices[0].message.content parsed_report = self.parse_json_response(model_response) return { "success": True, "message": "报告生成成功", "report_data": parsed_report, "raw_response": model_response } except Exception as e: print(f"GLM SDK调用出错: {e}") raise Exception(f"大语言模型SDK调用失败: {str(e)}") def parse_json_response(self, json_content): """解析JSON格式的响应字符串""" try: # 查找 ```json 和 ``` 之间的内容 import re # 匹配 ```json ... ``` 格式 match = re.search(r'```json\s*\n(.*?)\n```', json_content, re.DOTALL) if match: json_content = match.group(1) print(f"成功提取JSON格式内容,长度: {len(json_content)} 字符") try: parsed_data = json.loads(json_content) # 如果返回的是数组,提取第一个元素 if isinstance(parsed_data, list): print(f"检测到数组响应,长度: {len(parsed_data)},提取下标为0的数据") if len(parsed_data) > 0: return parsed_data[0] else: print(f"数组为空,返回空字典") return {} # 如果已经是字典格式,直接返回 elif isinstance(parsed_data, dict): return parsed_data # 其他情况,将数据包装成字典 else: print(f"响应不是字典或数组格式,将数据包装为字典") return {"data": parsed_data} except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") # 如果解析失败,返回原始内容包装的字典 return {"raw_content": json_content} except Exception as e: print(f"解析JSON响应失败: {e}") return {"error": str(e), "raw_content": json_content} def save_report_to_db(self, session_id, report_data, analysis_data): """保存报告到数据库""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 创建报告表(如果不存在) cursor.execute(''' CREATE TABLE IF NOT EXISTS reports ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, report_data TEXT NOT NULL, analysis_data TEXT NOT NULL, generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES quiz_sessions (id) ) ''') # 保存报告 report_id = str(uuid.uuid4()) cursor.execute(''' INSERT INTO reports (id, session_id, report_data, analysis_data) VALUES (?, ?, ?, ?) ''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False), json.dumps(analysis_data, ensure_ascii=False))) # 更新会话状态 cursor.execute(''' UPDATE quiz_sessions SET status = 'report_generated' WHERE id = ? ''', (session_id,)) conn.commit() conn.close() return report_id def save_analysis_data_for_regeneration(self, session_id, analysis_data): """保存分析数据以便重新生成报告""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 创建临时分析数据表(如果不存在) cursor.execute(''' CREATE TABLE IF NOT EXISTS temp_analysis_data ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, analysis_data TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES quiz_sessions (id) ) ''') # 删除该session的旧数据(如果存在) cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,)) # 保存新的分析数据 analysis_id = str(uuid.uuid4()) cursor.execute(''' INSERT INTO temp_analysis_data (id, session_id, analysis_data) VALUES (?, ?, ?) ''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False))) # 更新会话状态为可以重新生成 cursor.execute(''' UPDATE quiz_sessions SET status = 'can_regenerate' WHERE id = ? ''', (session_id,)) conn.commit() conn.close() return analysis_id class EnhancedSurveySystem: """增强的测评系统""" def __init__(self): self.report_generator = ReportGenerator() self.init_database() def init_database(self): """初始化数据库""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 确保必要的表存在 cursor.execute(''' CREATE TABLE IF NOT EXISTS reports ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, report_data TEXT NOT NULL, analysis_data TEXT NOT NULL, generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES quiz_sessions (id) ) ''') conn.commit() conn.close() def get_reports_list(self, page=1, page_size=10): """获取报告列表""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 获取总数 cursor.execute('SELECT COUNT(*) as total FROM reports') total = cursor.fetchone()['total'] # 获取分页数据 offset = (page - 1) * page_size cursor.execute(''' SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at FROM reports r JOIN quiz_sessions qs ON r.session_id = qs.id JOIN students s ON qs.student_id = s.id ORDER BY r.generated_at DESC LIMIT ? OFFSET ? ''', (page_size, offset)) reports = [dict(row) for row in cursor.fetchall()] conn.close() return { 'reports': reports, 'total': total, 'page': page, 'page_size': page_size, 'total_pages': (total + page_size - 1) // page_size } def get_report_by_id(self, report_id): """根据ID获取报告""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT r.*, s.name, s.school, s.grade FROM reports r JOIN quiz_sessions qs ON r.session_id = qs.id JOIN students s ON qs.student_id = s.id WHERE r.id = ? ''', (report_id,)) report = cursor.fetchone() conn.close() return dict(report) if report else None def get_sessions_can_regenerate(self): """获取可以重新生成的会话列表""" conn = sqlite3.connect('data/survey.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(''' SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at FROM quiz_sessions qs JOIN students s ON qs.student_id = s.id JOIN temp_analysis_data tad ON qs.id = tad.session_id WHERE qs.status = 'can_regenerate' ORDER BY tad.created_at DESC ''') sessions = [dict(row) for row in cursor.fetchall()] conn.close() return sessions async def auto_generate_report(self, session_id): """自动生成报告(答题完成后调用)""" return await self.report_generator.generate_report(session_id) async def regenerate_report(self, session_id): """重新生成报告(从保存的分析数据)""" try: # 获取保存的分析数据 conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() cursor.execute(''' SELECT analysis_data FROM temp_analysis_data WHERE session_id = ? ORDER BY created_at DESC LIMIT 1 ''', (session_id,)) result = cursor.fetchone() conn.close() if not result: raise Exception("未找到可重新生成的分析数据") analysis_data = json.loads(result[0]) # 调用GLM API生成报告 report_result = self.report_generator.call_report_api(analysis_data, session_id) if report_result and report_result.get('success'): # 提取学员信息用于更新报告数据 student_info = analysis_data.get('student_info', {}) # 构造报告数据 report_data = { 'studentInfo': { 'name': student_info.get('name', '未知'), 'school': student_info.get('school', '未知'), 'grade': student_info.get('grade', '未知'), 'subject': '科学', 'testDate': get_east8_time().strftime('%Y年%m月%d日') }, 'report': report_result["report_data"], 'generated_at': get_east8_time().isoformat(), 'session_id': session_id, 'analysis_data': analysis_data, 'is_regenerated': True } # 保存报告到数据库 self.report_generator.save_report_to_db(session_id, report_data, analysis_data) # 清理临时数据 self.cleanup_temp_analysis_data(session_id) return { 'success': True, 'report_data': report_data, 'analysis_data': analysis_data, 'is_regenerated': True } else: raise Exception("GLM API返回空内容或失败") except Exception as e: print(f"重新生成报告失败: {e}") return { 'success': False, 'error': str(e) } def cleanup_temp_analysis_data(self, session_id): """清理临时分析数据""" conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,)) conn.commit() conn.close() def delete_report(self, report_id): """删除报告""" try: conn = sqlite3.connect('data/survey.db') cursor = conn.cursor() # 检查报告是否存在 cursor.execute('SELECT id FROM reports WHERE id = ?', (report_id,)) if not cursor.fetchone(): conn.close() return { 'success': False, 'message': '报告不存在' } # 删除报告 cursor.execute('DELETE FROM reports WHERE id = ?', (report_id,)) conn.commit() conn.close() return { 'success': True, 'message': '报告删除成功' } except Exception as e: print(f"删除报告失败: {e}") return { 'success': False, 'message': f'删除失败: {str(e)}' } # 全局系统实例 enhanced_system = EnhancedSurveySystem()