survey/enhanced_survey_system.py
2025-10-30 00:19:42 +08:00

535 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import uuid
import requests
import os
import time
from datetime import datetime, timezone, timedelta
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
def get_east8_time():
"""获取东八区时间"""
east8_tz = timezone(timedelta(hours=8))
return datetime.now(east8_tz)
def get_east8_time_string():
"""获取东八区时间字符串格式,用于数据库存储"""
return get_east8_time().strftime('%Y-%m-%d %H:%M:%S')
def load_env_config():
"""加载环境变量配置"""
try:
# 尝试加载 .env 文件
env_file = "/Users/moshui/Documents/survey/.env"
if os.path.exists(env_file):
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
except Exception as e:
print(f"加载环境变量失败: {e}")
class ReportGenerator:
"""报告生成器 - 调用自定义API生成测评报告"""
def __init__(self):
# 加载环境变量
load_env_config()
# 自定义API配置
self.api_url = os.getenv("REPORT_API_URL", "http://120.26.23.172:5678/webhook/survey_report")
self.api_key = os.getenv("REPORT_API_KEY", "")
self.timeout = int(os.getenv("REPORT_API_TIMEOUT", "300"))
self.prompt_file = "/Users/moshui/Documents/survey/public/prompt.txt"
# 请求头
self.headers = {
"Content-Type": "application/json"
}
# 添加API密钥如果设置了
if self.api_key:
self.headers["Authorization"] = f"Bearer {self.api_key}"
print(f"报告生成器配置:")
print(f" - API端点: {self.api_url}")
print(f" - 超时时间: {self.timeout}")
def load_prompt(self):
"""加载提示词"""
try:
with open(self.prompt_file, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
print(f"加载提示词失败: {e}")
return ""
def generate_analysis_text(self, session_data):
"""生成答题情况分析文本"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取学员信息
cursor.execute('''
SELECT s.*, qs.started_at, qs.completed_at, qs.total_score
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_data['id'],))
student_info = cursor.fetchone()
# 获取答题详情 - 从JSON格式读取同时获取用户信息
cursor.execute('''
SELECT answers_data, student_name, student_school, student_grade, selected_tag
FROM quiz_answers
WHERE session_id = ?
''', (session_data['id'],))
result = cursor.fetchone()
answers = []
if result and result['answers_data']:
try:
answers = json.loads(result['answers_data'])
print(f"成功解析JSON答题数据{len(answers)}")
except json.JSONDecodeError:
print(f"解析JSON答题数据失败: {session_data['id']}")
answers = []
else:
print(f"未找到答题数据: {session_data['id']}")
conn.close()
# 生成分析文本 - 确保markdown表格格式正确
analysis_text = "# 考试答题情况分析\n\n"
# 添加总分信息
total_score = student_info['total_score'] if student_info and student_info['total_score'] is not None else 0
analysis_text += f"## 总分:{total_score}\n\n"
analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n"
analysis_text += "|------|------|----------|----------|----------|------|\n"
# 使用独立字段中的用户信息
student_name = result['student_name'] if result else (student_info['name'] if student_info and student_info['name'] is not None else '未知')
student_school = result['student_school'] if result else (student_info['school'] if student_info and student_info['school'] is not None else '未知')
student_grade = result['student_grade'] if result else (student_info['grade'] if student_info and student_info['grade'] is not None else '未知')
selected_tag = result['selected_tag'] if result else '未指定'
# 添加基本信息
analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n"
# 添加答题详情 - 确保每个字段都不为空
for answer in answers:
is_correct = "" if answer.get('isCorrect', False) else ""
user_answer = answer.get('userAnswer', '').strip()
correct_answer = answer.get('correctAnswer', '').strip()
question_text = answer.get('questionText', '').strip()
question_type = answer.get('questionType', '').strip()
score = answer.get('score', 0)
# 确保题目文本不为空
if not question_text:
question_text = "未知题目"
# 确保题型不为空
if not question_type:
question_type = "单选题"
# 确保答案文本处理正确
if not user_answer:
user_answer = "未作答"
if not correct_answer:
correct_answer = "无标准答案"
# 转义markdown中的特殊字符
question_text = question_text.replace('|', '\\|')
user_answer = user_answer.replace('|', '\\|')
correct_answer = correct_answer.replace('|', '\\|')
analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n"
# 打印生成的分析文本用于调试
print(f"生成的分析文本长度: {len(analysis_text)} 字符")
print(f"包含题目数量: {len(answers)}")
print("分析文本预览:")
print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text)
return {
'analysis_text': analysis_text,
'student_info': dict(student_info),
'answers': answers
}
async def generate_report(self, session_id):
"""生成测评报告"""
analysis_data = None
try:
# 获取会话数据
analysis_data = self.generate_analysis_text({'id': session_id})
# 调用自定义API生成报告
# 构造回调URL
report_result = self.call_report_api(analysis_data, session_id)
if report_result and report_result.get('success'):
# API调用成功异步生成中
# 保存分析数据用于后续回调处理
self.save_analysis_data_for_regeneration(session_id, analysis_data)
return {
'success': True,
'message': '报告异步生成中',
'session_id': session_id,
'analysis_data': analysis_data
}
else:
raise Exception("API调用失败")
except Exception as e:
print(f"生成报告失败: {e}")
# 保存分析数据到数据库,允许后续重新生成
if analysis_data:
self.save_analysis_data_for_regeneration(session_id, analysis_data)
return {
'success': False,
'error': str(e),
'session_id': session_id,
'can_regenerate': True
}
def call_report_api(self, analysis_data, session_id):
"""调用自定义报告API生成报告"""
# 构建请求数据
request_data = {
"analysis_text": analysis_data['analysis_text'],
"session_id": session_id
}
# 实现重试机制
max_retries = 3
retry_delay = 2 # 秒
try:
print(f"调用报告生成API...")
print(f" API端点: {self.api_url}")
print(f" Session ID: {session_id}")
print(f" 数据长度: {len(request_data['analysis_text'])} 字符")
response = requests.post(
self.api_url,
headers=self.headers,
json=request_data,
timeout=10 # 简化超时时间
)
if response.status_code == 200:
result = response.json()
print(f"✅ API调用成功已提交异步报告生成请求")
print(f" 响应内容: {str(result)[:200]}...")
return {"success": True, "message": "异步报告生成请求已提交"}
else:
raise Exception(f"API调用失败: HTTP {response.status_code} - {response.text}")
except Exception as e:
print(f"API调用出错: {e}")
raise Exception(f"报告生成API调用失败: {str(e)}")
def save_report_to_db(self, session_id, report_data, analysis_data):
"""保存报告到数据库"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 创建报告表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
report_data TEXT NOT NULL,
analysis_data TEXT NOT NULL,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
# 保存报告
report_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO reports (id, session_id, report_data, analysis_data)
VALUES (?, ?, ?, ?)
''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False),
json.dumps(analysis_data, ensure_ascii=False)))
# 更新会话状态
cursor.execute('''
UPDATE quiz_sessions
SET status = 'report_generated'
WHERE id = ?
''', (session_id,))
conn.commit()
conn.close()
return report_id
def save_analysis_data_for_regeneration(self, session_id, analysis_data):
"""保存分析数据以便重新生成报告"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 创建临时分析数据表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS temp_analysis_data (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
analysis_data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
# 删除该session的旧数据如果存在
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
# 保存新的分析数据
analysis_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO temp_analysis_data (id, session_id, analysis_data)
VALUES (?, ?, ?)
''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False)))
# 更新会话状态为可以重新生成
cursor.execute('''
UPDATE quiz_sessions
SET status = 'can_regenerate'
WHERE id = ?
''', (session_id,))
conn.commit()
conn.close()
return analysis_id
class EnhancedSurveySystem:
"""增强的测评系统"""
def __init__(self):
self.report_generator = ReportGenerator()
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 确保必要的表存在
cursor.execute('''
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
report_data TEXT NOT NULL,
analysis_data TEXT NOT NULL,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
conn.commit()
conn.close()
def get_reports_list(self, page=1, page_size=10):
"""获取报告列表"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取总数
cursor.execute('SELECT COUNT(*) as total FROM reports')
total = cursor.fetchone()['total']
# 获取分页数据
offset = (page - 1) * page_size
cursor.execute('''
SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at
FROM reports r
JOIN quiz_sessions qs ON r.session_id = qs.id
JOIN students s ON qs.student_id = s.id
ORDER BY r.generated_at DESC
LIMIT ? OFFSET ?
''', (page_size, offset))
reports = [dict(row) for row in cursor.fetchall()]
conn.close()
return {
'reports': reports,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
def get_report_by_id(self, report_id):
"""根据ID获取报告"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT r.*, s.name, s.school, s.grade
FROM reports r
JOIN quiz_sessions qs ON r.session_id = qs.id
JOIN students s ON qs.student_id = s.id
WHERE r.id = ?
''', (report_id,))
report = cursor.fetchone()
conn.close()
return dict(report) if report else None
def get_sessions_can_regenerate(self):
"""获取可以重新生成的会话列表"""
conn = sqlite3.connect('data/survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at
FROM quiz_sessions qs
JOIN students s ON qs.student_id = s.id
JOIN temp_analysis_data tad ON qs.id = tad.session_id
WHERE qs.status = 'can_regenerate'
ORDER BY tad.created_at DESC
''')
sessions = [dict(row) for row in cursor.fetchall()]
conn.close()
return sessions
async def auto_generate_report(self, session_id):
"""自动生成报告(答题完成后调用)"""
return await self.report_generator.generate_report(session_id)
async def regenerate_report(self, session_id):
"""重新生成报告(从保存的分析数据)"""
try:
# 获取保存的分析数据
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('''
SELECT analysis_data FROM temp_analysis_data
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
''', (session_id,))
result = cursor.fetchone()
conn.close()
if not result:
raise Exception("未找到可重新生成的分析数据")
analysis_data = json.loads(result[0])
# 调用API生成报告
report_result = self.report_generator.call_report_api(analysis_data, session_id)
if report_result:
# 提取学员信息用于更新报告数据
student_info = analysis_data['student_info']
report_result['studentInfo']['name'] = student_info.get('name', '未知')
report_result['studentInfo']['school'] = student_info.get('school', '未知')
report_result['studentInfo']['grade'] = student_info.get('grade', '未知')
report_result['studentInfo']['subject'] = '科学'
report_result['studentInfo']['testDate'] = get_east8_time().strftime('%Y年%m月%d')
# 构造报告数据
report_data = {
'studentInfo': report_result['studentInfo'],
'report': report_result['report'],
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data,
'is_regenerated': True
}
# 保存报告到数据库
self.report_generator.save_report_to_db(session_id, report_data, analysis_data)
# 清理临时数据
self.cleanup_temp_analysis_data(session_id)
return {
'success': True,
'report_data': report_data,
'analysis_data': analysis_data,
'is_regenerated': True
}
else:
raise Exception("API返回空内容")
except Exception as e:
print(f"重新生成报告失败: {e}")
return {
'success': False,
'error': str(e)
}
def cleanup_temp_analysis_data(self, session_id):
"""清理临时分析数据"""
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
conn.commit()
conn.close()
def delete_report(self, report_id):
"""删除报告"""
try:
conn = sqlite3.connect('data/survey.db')
cursor = conn.cursor()
# 检查报告是否存在
cursor.execute('SELECT id FROM reports WHERE id = ?', (report_id,))
if not cursor.fetchone():
conn.close()
return {
'success': False,
'message': '报告不存在'
}
# 删除报告
cursor.execute('DELETE FROM reports WHERE id = ?', (report_id,))
conn.commit()
conn.close()
return {
'success': True,
'message': '报告删除成功'
}
except Exception as e:
print(f"删除报告失败: {e}")
return {
'success': False,
'message': f'删除失败: {str(e)}'
}
# 全局系统实例
enhanced_system = EnhancedSurveySystem()