survey/enhanced_survey_system.py
2025-11-30 20:11:22 +08:00

603 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import uuid
import os
import time
from datetime import datetime, timezone, timedelta
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
from zai import ZhipuAiClient
def get_east8_time():
"""获取东八区时间"""
east8_tz = timezone(timedelta(hours=8))
return datetime.now(east8_tz)
def get_east8_time_string():
"""获取东八区时间字符串格式,用于数据库存储"""
return get_east8_time().strftime('%Y-%m-%d %H:%M:%S')
def load_env_config():
"""加载环境变量配置"""
try:
# 尝试加载 .env 文件
env_file = "/Users/moshui/Documents/survey/.env"
if os.path.exists(env_file):
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
except Exception as e:
print(f"加载环境变量失败: {e}")
class ReportGenerator:
"""报告生成器 - 调用GLM大语言模型API生成测评报告"""
def __init__(self):
# 加载环境变量
load_env_config()
# GLM API配置
self.api_key = os.getenv("GLM_API_KEY", "")
self.timeout = int(os.getenv("LLM_API_TIMEOUT", "300"))
self.model_name = "glm-4.5-air"
self.prompt_file = "./public/prompt.md"
# 初始化 Zhipu AI 客户端
self.client = ZhipuAiClient(api_key=self.api_key)
print(f"报告生成器配置:")
print(f" - SDK: ZhipuAiClient")
print(f" - 模型: {self.model_name}")
print(f" - api_key: {self.api_key}")
print(f" - 超时时间: {self.timeout}")
def load_prompt(self):
"""加载提示词"""
try:
with open(self.prompt_file, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
print(f"加载提示词失败: {e}")
return ""
def generate_analysis_text(self, session_data):
"""生成答题情况分析文本"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取学员信息
cursor.execute('''
SELECT s.*, qs.started_at, qs.completed_at, qs.total_score
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_data['id'],))
student_info = cursor.fetchone()
# 获取答题详情 - 从JSON格式读取同时获取用户信息
cursor.execute('''
SELECT answers_data, student_name, student_school, student_grade, selected_tag
FROM quiz_answers
WHERE session_id = ?
''', (session_data['id'],))
result = cursor.fetchone()
answers = []
if result and result['answers_data']:
try:
answers = json.loads(result['answers_data'])
print(f"成功解析JSON答题数据{len(answers)}")
except json.JSONDecodeError:
print(f"解析JSON答题数据失败: {session_data['id']}")
answers = []
else:
print(f"未找到答题数据: {session_data['id']}")
conn.close()
# 生成分析文本 - 确保markdown表格格式正确
analysis_text = "# 考试答题情况分析\n\n"
# 添加总分信息
total_score = student_info['total_score'] if student_info and student_info['total_score'] is not None else 0
analysis_text += f"## 总分:{total_score}\n\n"
analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n"
analysis_text += "|------|------|----------|----------|----------|------|\n"
# 使用独立字段中的用户信息
student_name = result['student_name'] if result else (student_info['name'] if student_info and student_info['name'] is not None else '未知')
student_school = result['student_school'] if result else (student_info['school'] if student_info and student_info['school'] is not None else '未知')
student_grade = result['student_grade'] if result else (student_info['grade'] if student_info and student_info['grade'] is not None else '未知')
selected_tag = result['selected_tag'] if result else '未指定'
# 添加基本信息
analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n"
# 添加答题详情 - 确保每个字段都不为空
for answer in answers:
is_correct = "" if answer.get('isCorrect', False) else ""
user_answer = answer.get('userAnswer', '').strip()
correct_answer = answer.get('correctAnswer', '').strip()
question_text = answer.get('questionText', '').strip()
question_type = answer.get('questionType', '').strip()
score = answer.get('score', 0)
# 确保题目文本不为空
if not question_text:
question_text = "未知题目"
# 确保题型不为空
if not question_type:
question_type = "单选题"
# 确保答案文本处理正确
if not user_answer:
user_answer = "未作答"
if not correct_answer:
correct_answer = "无标准答案"
# 转义markdown中的特殊字符
question_text = question_text.replace('|', '\\|')
user_answer = user_answer.replace('|', '\\|')
correct_answer = correct_answer.replace('|', '\\|')
analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n"
# 打印生成的分析文本用于调试
print(f"生成的分析文本长度: {len(analysis_text)} 字符")
print(f"包含题目数量: {len(answers)}")
print("分析文本预览:")
print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text)
return {
'analysis_text': analysis_text,
'student_info': dict(student_info),
'answers': answers
}
async def generate_report(self, session_id):
"""生成测评报告"""
analysis_data = None
try:
# 获取会话数据
analysis_data = self.generate_analysis_text({'id': session_id})
# 调用GLM API生成报告
report_result = self.call_report_api(analysis_data, session_id)
if report_result and report_result.get('success'):
# API调用成功直接生成报告
# 构造完整的报告数据
student_info = analysis_data.get('student_info', {})
# 构造报告数据
complete_report_data = {
'studentInfo': {
'name': student_info.get('name', '未知'),
'school': student_info.get('school', '未知'),
'grade': student_info.get('grade', '未知'),
'subject': '科学',
'testDate': get_east8_time().strftime('%Y年%m月%d')
},
'report': report_result['report_data'],
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data,
'is_direct_generation': True
}
# 保存报告到数据库
report_id = self.save_report_to_db(session_id, complete_report_data, analysis_data)
return {
'success': True,
'message': '报告生成成功',
'session_id': session_id,
'report_id': report_id,
'report_data': complete_report_data,
'analysis_data': analysis_data
}
else:
raise Exception("GLM API调用失败")
except Exception as e:
print(f"生成报告失败: {e}")
# 保存分析数据到数据库,允许后续重新生成
if analysis_data:
self.save_analysis_data_for_regeneration(session_id, analysis_data)
return {
'success': False,
'error': str(e),
'session_id': session_id,
'can_regenerate': True
}
def call_report_api(self, analysis_data, session_id):
"""调用GLM大语言模型API生成报告"""
# 加载提示词
prompt = self.load_prompt()
try:
print(f"调用GLM大语言模型SDK...")
print(f" Session ID: {session_id}")
print(f" 模型: {self.model_name}")
# 使用ZhipuAiClient调用API设置JSON格式响应
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "system",
"content": prompt
},
{
"role": "user",
"content": analysis_data['analysis_text']
}
],
response_format={
"type": "json_object"
}
)
print(f"✅ GLM SDK调用成功")
print(f" 响应长度: {len(response.choices[0].message.content)} 字符")
# 解析JSON响应
model_response = response.choices[0].message.content
parsed_report = self.parse_json_response(model_response)
return {
"success": True,
"message": "报告生成成功",
"report_data": parsed_report,
"raw_response": model_response
}
except Exception as e:
print(f"GLM SDK调用出错: {e}")
raise Exception(f"大语言模型SDK调用失败: {str(e)}")
def parse_json_response(self, json_content):
"""解析JSON格式的响应字符串"""
try:
# 查找 ```json 和 ``` 之间的内容
import re
# 匹配 ```json ... ``` 格式
match = re.search(r'```json\s*\n(.*?)\n```', json_content, re.DOTALL)
if match:
json_content = match.group(1)
print(f"成功提取JSON格式内容长度: {len(json_content)} 字符")
try:
parsed_data = json.loads(json_content)
# 如果返回的是数组,提取第一个元素
if isinstance(parsed_data, list):
print(f"检测到数组响应,长度: {len(parsed_data)}提取下标为0的数据")
if len(parsed_data) > 0:
return parsed_data[0]
else:
print(f"数组为空,返回空字典")
return {}
# 如果已经是字典格式,直接返回
elif isinstance(parsed_data, dict):
return parsed_data
# 其他情况,将数据包装成字典
else:
print(f"响应不是字典或数组格式,将数据包装为字典")
return {"data": parsed_data}
except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}")
# 如果解析失败,返回原始内容包装的字典
return {"raw_content": json_content}
except Exception as e:
print(f"解析JSON响应失败: {e}")
return {"error": str(e), "raw_content": json_content}
def save_report_to_db(self, session_id, report_data, analysis_data):
"""保存报告到数据库"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 创建报告表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
report_data TEXT NOT NULL,
analysis_data TEXT NOT NULL,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
# 保存报告
report_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO reports (id, session_id, report_data, analysis_data)
VALUES (?, ?, ?, ?)
''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False),
json.dumps(analysis_data, ensure_ascii=False)))
# 更新会话状态
cursor.execute('''
UPDATE quiz_sessions
SET status = 'report_generated'
WHERE id = ?
''', (session_id,))
conn.commit()
conn.close()
return report_id
def save_analysis_data_for_regeneration(self, session_id, analysis_data):
"""保存分析数据以便重新生成报告"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 创建临时分析数据表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS temp_analysis_data (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
analysis_data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
# 删除该session的旧数据如果存在
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
# 保存新的分析数据
analysis_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO temp_analysis_data (id, session_id, analysis_data)
VALUES (?, ?, ?)
''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False)))
# 更新会话状态为可以重新生成
cursor.execute('''
UPDATE quiz_sessions
SET status = 'can_regenerate'
WHERE id = ?
''', (session_id,))
conn.commit()
conn.close()
return analysis_id
class EnhancedSurveySystem:
"""增强的测评系统"""
def __init__(self):
self.report_generator = ReportGenerator()
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 确保必要的表存在
cursor.execute('''
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
report_data TEXT NOT NULL,
analysis_data TEXT NOT NULL,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
conn.commit()
conn.close()
def get_reports_list(self, page=1, page_size=10):
"""获取报告列表"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取总数
cursor.execute('SELECT COUNT(*) as total FROM reports')
total = cursor.fetchone()['total']
# 获取分页数据
offset = (page - 1) * page_size
cursor.execute('''
SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at
FROM reports r
JOIN quiz_sessions qs ON r.session_id = qs.id
JOIN students s ON qs.student_id = s.id
ORDER BY r.generated_at DESC
LIMIT ? OFFSET ?
''', (page_size, offset))
reports = [dict(row) for row in cursor.fetchall()]
conn.close()
return {
'reports': reports,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
def get_report_by_id(self, report_id):
"""根据ID获取报告"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT r.*, s.name, s.school, s.grade
FROM reports r
JOIN quiz_sessions qs ON r.session_id = qs.id
JOIN students s ON qs.student_id = s.id
WHERE r.id = ?
''', (report_id,))
report = cursor.fetchone()
conn.close()
return dict(report) if report else None
def get_sessions_can_regenerate(self):
"""获取可以重新生成的会话列表"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at
FROM quiz_sessions qs
JOIN students s ON qs.student_id = s.id
JOIN temp_analysis_data tad ON qs.id = tad.session_id
WHERE qs.status = 'can_regenerate'
ORDER BY tad.created_at DESC
''')
sessions = [dict(row) for row in cursor.fetchall()]
conn.close()
return sessions
async def auto_generate_report(self, session_id):
"""自动生成报告(答题完成后调用)"""
return await self.report_generator.generate_report(session_id)
async def regenerate_report(self, session_id):
"""重新生成报告(从保存的分析数据)"""
try:
# 获取保存的分析数据
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
SELECT analysis_data FROM temp_analysis_data
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
''', (session_id,))
result = cursor.fetchone()
conn.close()
if not result:
raise Exception("未找到可重新生成的分析数据")
analysis_data = json.loads(result[0])
# 调用GLM API生成报告
report_result = self.report_generator.call_report_api(analysis_data, session_id)
if report_result and report_result.get('success'):
# 提取学员信息用于更新报告数据
student_info = analysis_data.get('student_info', {})
# 构造报告数据
report_data = {
'studentInfo': {
'name': student_info.get('name', '未知'),
'school': student_info.get('school', '未知'),
'grade': student_info.get('grade', '未知'),
'subject': '科学',
'testDate': get_east8_time().strftime('%Y年%m月%d')
},
'report': report_result["report_data"],
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data,
'is_regenerated': True
}
# 保存报告到数据库
self.report_generator.save_report_to_db(session_id, report_data, analysis_data)
# 清理临时数据
self.cleanup_temp_analysis_data(session_id)
return {
'success': True,
'report_data': report_data,
'analysis_data': analysis_data,
'is_regenerated': True
}
else:
raise Exception("GLM API返回空内容或失败")
except Exception as e:
print(f"重新生成报告失败: {e}")
return {
'success': False,
'error': str(e)
}
def cleanup_temp_analysis_data(self, session_id):
"""清理临时分析数据"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
conn.commit()
conn.close()
def delete_report(self, report_id):
"""删除报告"""
try:
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 检查报告是否存在
cursor.execute('SELECT id FROM reports WHERE id = ?', (report_id,))
if not cursor.fetchone():
conn.close()
return {
'success': False,
'message': '报告不存在'
}
# 删除报告
cursor.execute('DELETE FROM reports WHERE id = ?', (report_id,))
conn.commit()
conn.close()
return {
'success': True,
'message': '报告删除成功'
}
except Exception as e:
print(f"删除报告失败: {e}")
return {
'success': False,
'message': f'删除失败: {str(e)}'
}
# 全局系统实例
enhanced_system = EnhancedSurveySystem()