603 lines
23 KiB
Python
603 lines
23 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import sqlite3
|
||
import json
|
||
import uuid
|
||
import os
|
||
import time
|
||
from datetime import datetime, timezone, timedelta
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
from urllib.parse import urlparse, parse_qs
|
||
import threading
|
||
from zai import ZhipuAiClient
|
||
|
||
def get_east8_time():
|
||
"""获取东八区时间"""
|
||
east8_tz = timezone(timedelta(hours=8))
|
||
return datetime.now(east8_tz)
|
||
|
||
def get_east8_time_string():
|
||
"""获取东八区时间字符串格式,用于数据库存储"""
|
||
return get_east8_time().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
def load_env_config():
|
||
"""加载环境变量配置"""
|
||
try:
|
||
# 尝试加载 .env 文件
|
||
env_file = "/Users/moshui/Documents/survey/.env"
|
||
if os.path.exists(env_file):
|
||
with open(env_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith('#') and '=' in line:
|
||
key, value = line.split('=', 1)
|
||
os.environ[key.strip()] = value.strip()
|
||
except Exception as e:
|
||
print(f"加载环境变量失败: {e}")
|
||
|
||
class ReportGenerator:
|
||
"""报告生成器 - 调用GLM大语言模型API生成测评报告"""
|
||
|
||
def __init__(self):
|
||
# 加载环境变量
|
||
load_env_config()
|
||
|
||
# GLM API配置
|
||
self.api_key = os.getenv("GLM_API_KEY", "")
|
||
self.timeout = int(os.getenv("LLM_API_TIMEOUT", "300"))
|
||
self.model_name = "glm-4.5-air"
|
||
self.prompt_file = "./public/prompt.md"
|
||
|
||
# 初始化 Zhipu AI 客户端
|
||
self.client = ZhipuAiClient(api_key=self.api_key)
|
||
|
||
print(f"报告生成器配置:")
|
||
print(f" - SDK: ZhipuAiClient")
|
||
print(f" - 模型: {self.model_name}")
|
||
print(f" - api_key: {self.api_key}")
|
||
print(f" - 超时时间: {self.timeout}秒")
|
||
|
||
def load_prompt(self):
|
||
"""加载提示词"""
|
||
try:
|
||
with open(self.prompt_file, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
except Exception as e:
|
||
print(f"加载提示词失败: {e}")
|
||
return ""
|
||
|
||
def generate_analysis_text(self, session_data):
|
||
"""生成答题情况分析文本"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# 获取学员信息
|
||
cursor.execute('''
|
||
SELECT s.*, qs.started_at, qs.completed_at, qs.total_score
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_data['id'],))
|
||
|
||
student_info = cursor.fetchone()
|
||
|
||
# 获取答题详情 - 从JSON格式读取,同时获取用户信息
|
||
cursor.execute('''
|
||
SELECT answers_data, student_name, student_school, student_grade, selected_tag
|
||
FROM quiz_answers
|
||
WHERE session_id = ?
|
||
''', (session_data['id'],))
|
||
|
||
result = cursor.fetchone()
|
||
answers = []
|
||
|
||
if result and result['answers_data']:
|
||
try:
|
||
answers = json.loads(result['answers_data'])
|
||
print(f"成功解析JSON答题数据,共{len(answers)}题")
|
||
except json.JSONDecodeError:
|
||
print(f"解析JSON答题数据失败: {session_data['id']}")
|
||
answers = []
|
||
else:
|
||
print(f"未找到答题数据: {session_data['id']}")
|
||
|
||
conn.close()
|
||
|
||
# 生成分析文本 - 确保markdown表格格式正确
|
||
analysis_text = "# 考试答题情况分析\n\n"
|
||
|
||
# 添加总分信息
|
||
total_score = student_info['total_score'] if student_info and student_info['total_score'] is not None else 0
|
||
analysis_text += f"## 总分:{total_score} 分\n\n"
|
||
|
||
analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n"
|
||
analysis_text += "|------|------|----------|----------|----------|------|\n"
|
||
|
||
# 使用独立字段中的用户信息
|
||
student_name = result['student_name'] if result else (student_info['name'] if student_info and student_info['name'] is not None else '未知')
|
||
student_school = result['student_school'] if result else (student_info['school'] if student_info and student_info['school'] is not None else '未知')
|
||
student_grade = result['student_grade'] if result else (student_info['grade'] if student_info and student_info['grade'] is not None else '未知')
|
||
selected_tag = result['selected_tag'] if result else '未指定'
|
||
|
||
# 添加基本信息
|
||
analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
|
||
# 添加答题详情 - 确保每个字段都不为空
|
||
for answer in answers:
|
||
is_correct = "✓" if answer.get('isCorrect', False) else "✗"
|
||
user_answer = answer.get('userAnswer', '').strip()
|
||
correct_answer = answer.get('correctAnswer', '').strip()
|
||
question_text = answer.get('questionText', '').strip()
|
||
question_type = answer.get('questionType', '').strip()
|
||
score = answer.get('score', 0)
|
||
|
||
# 确保题目文本不为空
|
||
if not question_text:
|
||
question_text = "未知题目"
|
||
|
||
# 确保题型不为空
|
||
if not question_type:
|
||
question_type = "单选题"
|
||
|
||
# 确保答案文本处理正确
|
||
if not user_answer:
|
||
user_answer = "未作答"
|
||
|
||
if not correct_answer:
|
||
correct_answer = "无标准答案"
|
||
|
||
# 转义markdown中的特殊字符
|
||
question_text = question_text.replace('|', '\\|')
|
||
user_answer = user_answer.replace('|', '\\|')
|
||
correct_answer = correct_answer.replace('|', '\\|')
|
||
|
||
analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n"
|
||
|
||
# 打印生成的分析文本用于调试
|
||
print(f"生成的分析文本长度: {len(analysis_text)} 字符")
|
||
print(f"包含题目数量: {len(answers)}")
|
||
print("分析文本预览:")
|
||
print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text)
|
||
|
||
return {
|
||
'analysis_text': analysis_text,
|
||
'student_info': dict(student_info),
|
||
'answers': answers
|
||
}
|
||
|
||
async def generate_report(self, session_id):
|
||
"""生成测评报告"""
|
||
analysis_data = None
|
||
try:
|
||
# 获取会话数据
|
||
analysis_data = self.generate_analysis_text({'id': session_id})
|
||
|
||
# 调用GLM API生成报告
|
||
report_result = self.call_report_api(analysis_data, session_id)
|
||
|
||
if report_result and report_result.get('success'):
|
||
# API调用成功,直接生成报告
|
||
# 构造完整的报告数据
|
||
student_info = analysis_data.get('student_info', {})
|
||
|
||
# 构造报告数据
|
||
complete_report_data = {
|
||
'studentInfo': {
|
||
'name': student_info.get('name', '未知'),
|
||
'school': student_info.get('school', '未知'),
|
||
'grade': student_info.get('grade', '未知'),
|
||
'subject': '科学',
|
||
'testDate': get_east8_time().strftime('%Y年%m月%d日')
|
||
},
|
||
'report': report_result['report_data'],
|
||
'generated_at': get_east8_time().isoformat(),
|
||
'session_id': session_id,
|
||
'analysis_data': analysis_data,
|
||
'is_direct_generation': True
|
||
}
|
||
|
||
# 保存报告到数据库
|
||
report_id = self.save_report_to_db(session_id, complete_report_data, analysis_data)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '报告生成成功',
|
||
'session_id': session_id,
|
||
'report_id': report_id,
|
||
'report_data': complete_report_data,
|
||
'analysis_data': analysis_data
|
||
}
|
||
else:
|
||
raise Exception("GLM API调用失败")
|
||
|
||
except Exception as e:
|
||
print(f"生成报告失败: {e}")
|
||
# 保存分析数据到数据库,允许后续重新生成
|
||
if analysis_data:
|
||
self.save_analysis_data_for_regeneration(session_id, analysis_data)
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'session_id': session_id,
|
||
'can_regenerate': True
|
||
}
|
||
|
||
def call_report_api(self, analysis_data, session_id):
|
||
"""调用GLM大语言模型API生成报告"""
|
||
# 加载提示词
|
||
prompt = self.load_prompt()
|
||
|
||
try:
|
||
print(f"调用GLM大语言模型SDK...")
|
||
print(f" Session ID: {session_id}")
|
||
print(f" 模型: {self.model_name}")
|
||
|
||
# 使用ZhipuAiClient调用API,设置JSON格式响应
|
||
response = self.client.chat.completions.create(
|
||
model=self.model_name,
|
||
messages=[
|
||
{
|
||
"role": "system",
|
||
"content": prompt
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": analysis_data['analysis_text']
|
||
}
|
||
],
|
||
response_format={
|
||
"type": "json_object"
|
||
}
|
||
)
|
||
|
||
print(f"✅ GLM SDK调用成功")
|
||
print(f" 响应长度: {len(response.choices[0].message.content)} 字符")
|
||
|
||
# 解析JSON响应
|
||
model_response = response.choices[0].message.content
|
||
parsed_report = self.parse_json_response(model_response)
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "报告生成成功",
|
||
"report_data": parsed_report,
|
||
"raw_response": model_response
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"GLM SDK调用出错: {e}")
|
||
raise Exception(f"大语言模型SDK调用失败: {str(e)}")
|
||
|
||
def parse_json_response(self, json_content):
|
||
"""解析JSON格式的响应字符串"""
|
||
try:
|
||
# 查找 ```json 和 ``` 之间的内容
|
||
import re
|
||
|
||
# 匹配 ```json ... ``` 格式
|
||
match = re.search(r'```json\s*\n(.*?)\n```', json_content, re.DOTALL)
|
||
if match:
|
||
json_content = match.group(1)
|
||
print(f"成功提取JSON格式内容,长度: {len(json_content)} 字符")
|
||
|
||
try:
|
||
parsed_data = json.loads(json_content)
|
||
|
||
# 如果返回的是数组,提取第一个元素
|
||
if isinstance(parsed_data, list):
|
||
print(f"检测到数组响应,长度: {len(parsed_data)},提取下标为0的数据")
|
||
if len(parsed_data) > 0:
|
||
return parsed_data[0]
|
||
else:
|
||
print(f"数组为空,返回空字典")
|
||
return {}
|
||
|
||
# 如果已经是字典格式,直接返回
|
||
elif isinstance(parsed_data, dict):
|
||
return parsed_data
|
||
|
||
# 其他情况,将数据包装成字典
|
||
else:
|
||
print(f"响应不是字典或数组格式,将数据包装为字典")
|
||
return {"data": parsed_data}
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f"JSON解析失败: {e}")
|
||
# 如果解析失败,返回原始内容包装的字典
|
||
return {"raw_content": json_content}
|
||
|
||
except Exception as e:
|
||
print(f"解析JSON响应失败: {e}")
|
||
return {"error": str(e), "raw_content": json_content}
|
||
|
||
def save_report_to_db(self, session_id, report_data, analysis_data):
|
||
"""保存报告到数据库"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 创建报告表(如果不存在)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS reports (
|
||
id TEXT PRIMARY KEY,
|
||
session_id TEXT NOT NULL,
|
||
report_data TEXT NOT NULL,
|
||
analysis_data TEXT NOT NULL,
|
||
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
|
||
)
|
||
''')
|
||
|
||
# 保存报告
|
||
report_id = str(uuid.uuid4())
|
||
cursor.execute('''
|
||
INSERT INTO reports (id, session_id, report_data, analysis_data)
|
||
VALUES (?, ?, ?, ?)
|
||
''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False),
|
||
json.dumps(analysis_data, ensure_ascii=False)))
|
||
|
||
# 更新会话状态
|
||
cursor.execute('''
|
||
UPDATE quiz_sessions
|
||
SET status = 'report_generated'
|
||
WHERE id = ?
|
||
''', (session_id,))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
return report_id
|
||
|
||
def save_analysis_data_for_regeneration(self, session_id, analysis_data):
|
||
"""保存分析数据以便重新生成报告"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 创建临时分析数据表(如果不存在)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS temp_analysis_data (
|
||
id TEXT PRIMARY KEY,
|
||
session_id TEXT NOT NULL,
|
||
analysis_data TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
|
||
)
|
||
''')
|
||
|
||
# 删除该session的旧数据(如果存在)
|
||
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
|
||
|
||
# 保存新的分析数据
|
||
analysis_id = str(uuid.uuid4())
|
||
cursor.execute('''
|
||
INSERT INTO temp_analysis_data (id, session_id, analysis_data)
|
||
VALUES (?, ?, ?)
|
||
''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False)))
|
||
|
||
# 更新会话状态为可以重新生成
|
||
cursor.execute('''
|
||
UPDATE quiz_sessions
|
||
SET status = 'can_regenerate'
|
||
WHERE id = ?
|
||
''', (session_id,))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
return analysis_id
|
||
|
||
class EnhancedSurveySystem:
|
||
"""增强的测评系统"""
|
||
|
||
def __init__(self):
|
||
self.report_generator = ReportGenerator()
|
||
self.init_database()
|
||
|
||
def init_database(self):
|
||
"""初始化数据库"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 确保必要的表存在
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS reports (
|
||
id TEXT PRIMARY KEY,
|
||
session_id TEXT NOT NULL,
|
||
report_data TEXT NOT NULL,
|
||
analysis_data TEXT NOT NULL,
|
||
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
|
||
)
|
||
''')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def get_reports_list(self, page=1, page_size=10):
|
||
"""获取报告列表"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# 获取总数
|
||
cursor.execute('SELECT COUNT(*) as total FROM reports')
|
||
total = cursor.fetchone()['total']
|
||
|
||
# 获取分页数据
|
||
offset = (page - 1) * page_size
|
||
cursor.execute('''
|
||
SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at
|
||
FROM reports r
|
||
JOIN quiz_sessions qs ON r.session_id = qs.id
|
||
JOIN students s ON qs.student_id = s.id
|
||
ORDER BY r.generated_at DESC
|
||
LIMIT ? OFFSET ?
|
||
''', (page_size, offset))
|
||
|
||
reports = [dict(row) for row in cursor.fetchall()]
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'reports': reports,
|
||
'total': total,
|
||
'page': page,
|
||
'page_size': page_size,
|
||
'total_pages': (total + page_size - 1) // page_size
|
||
}
|
||
|
||
def get_report_by_id(self, report_id):
|
||
"""根据ID获取报告"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT r.*, s.name, s.school, s.grade
|
||
FROM reports r
|
||
JOIN quiz_sessions qs ON r.session_id = qs.id
|
||
JOIN students s ON qs.student_id = s.id
|
||
WHERE r.id = ?
|
||
''', (report_id,))
|
||
|
||
report = cursor.fetchone()
|
||
conn.close()
|
||
|
||
return dict(report) if report else None
|
||
|
||
def get_sessions_can_regenerate(self):
|
||
"""获取可以重新生成的会话列表"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at
|
||
FROM quiz_sessions qs
|
||
JOIN students s ON qs.student_id = s.id
|
||
JOIN temp_analysis_data tad ON qs.id = tad.session_id
|
||
WHERE qs.status = 'can_regenerate'
|
||
ORDER BY tad.created_at DESC
|
||
''')
|
||
|
||
sessions = [dict(row) for row in cursor.fetchall()]
|
||
conn.close()
|
||
|
||
return sessions
|
||
|
||
async def auto_generate_report(self, session_id):
|
||
"""自动生成报告(答题完成后调用)"""
|
||
return await self.report_generator.generate_report(session_id)
|
||
|
||
async def regenerate_report(self, session_id):
|
||
"""重新生成报告(从保存的分析数据)"""
|
||
try:
|
||
# 获取保存的分析数据
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT analysis_data FROM temp_analysis_data
|
||
WHERE session_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
''', (session_id,))
|
||
|
||
result = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if not result:
|
||
raise Exception("未找到可重新生成的分析数据")
|
||
|
||
analysis_data = json.loads(result[0])
|
||
|
||
# 调用GLM API生成报告
|
||
report_result = self.report_generator.call_report_api(analysis_data, session_id)
|
||
|
||
if report_result and report_result.get('success'):
|
||
# 提取学员信息用于更新报告数据
|
||
student_info = analysis_data.get('student_info', {})
|
||
|
||
# 构造报告数据
|
||
report_data = {
|
||
'studentInfo': {
|
||
'name': student_info.get('name', '未知'),
|
||
'school': student_info.get('school', '未知'),
|
||
'grade': student_info.get('grade', '未知'),
|
||
'subject': '科学',
|
||
'testDate': get_east8_time().strftime('%Y年%m月%d日')
|
||
},
|
||
'report': report_result["report_data"],
|
||
'generated_at': get_east8_time().isoformat(),
|
||
'session_id': session_id,
|
||
'analysis_data': analysis_data,
|
||
'is_regenerated': True
|
||
}
|
||
|
||
# 保存报告到数据库
|
||
self.report_generator.save_report_to_db(session_id, report_data, analysis_data)
|
||
|
||
# 清理临时数据
|
||
self.cleanup_temp_analysis_data(session_id)
|
||
|
||
return {
|
||
'success': True,
|
||
'report_data': report_data,
|
||
'analysis_data': analysis_data,
|
||
'is_regenerated': True
|
||
}
|
||
else:
|
||
raise Exception("GLM API返回空内容或失败")
|
||
|
||
except Exception as e:
|
||
print(f"重新生成报告失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': str(e)
|
||
}
|
||
|
||
def cleanup_temp_analysis_data(self, session_id):
|
||
"""清理临时分析数据"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def delete_report(self, report_id):
|
||
"""删除报告"""
|
||
try:
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 检查报告是否存在
|
||
cursor.execute('SELECT id FROM reports WHERE id = ?', (report_id,))
|
||
if not cursor.fetchone():
|
||
conn.close()
|
||
return {
|
||
'success': False,
|
||
'message': '报告不存在'
|
||
}
|
||
|
||
# 删除报告
|
||
cursor.execute('DELETE FROM reports WHERE id = ?', (report_id,))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '报告删除成功'
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"删除报告失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'message': f'删除失败: {str(e)}'
|
||
}
|
||
|
||
# 全局系统实例
|
||
enhanced_system = EnhancedSurveySystem()
|