501 lines
19 KiB
Python
501 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import sqlite3
|
||
import json
|
||
import uuid
|
||
import requests
|
||
import os
|
||
import time
|
||
from datetime import datetime, timezone, timedelta
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
from urllib.parse import urlparse, parse_qs
|
||
import threading
|
||
|
||
def get_east8_time():
|
||
"""获取东八区时间"""
|
||
east8_tz = timezone(timedelta(hours=8))
|
||
return datetime.now(east8_tz)
|
||
|
||
def get_east8_time_string():
|
||
"""获取东八区时间字符串格式,用于数据库存储"""
|
||
return get_east8_time().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
def load_env_config():
|
||
"""加载环境变量配置"""
|
||
try:
|
||
# 尝试加载 .env 文件
|
||
env_file = "/Users/moshui/Documents/survey/.env"
|
||
if os.path.exists(env_file):
|
||
with open(env_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith('#') and '=' in line:
|
||
key, value = line.split('=', 1)
|
||
os.environ[key.strip()] = value.strip()
|
||
except Exception as e:
|
||
print(f"加载环境变量失败: {e}")
|
||
|
||
class ReportGenerator:
|
||
"""报告生成器 - 调用自定义API生成测评报告"""
|
||
|
||
def __init__(self):
|
||
# 加载环境变量
|
||
load_env_config()
|
||
|
||
# 自定义API配置
|
||
self.api_url = os.getenv("REPORT_API_URL", "http://120.26.23.172:5678/webhook/survey_report")
|
||
self.api_key = os.getenv("REPORT_API_KEY", "")
|
||
self.timeout = int(os.getenv("REPORT_API_TIMEOUT", "300"))
|
||
self.prompt_file = "/Users/moshui/Documents/survey/public/prompt.txt"
|
||
|
||
# 请求头
|
||
self.headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 添加API密钥(如果设置了)
|
||
if self.api_key:
|
||
self.headers["Authorization"] = f"Bearer {self.api_key}"
|
||
|
||
print(f"报告生成器配置:")
|
||
print(f" - API端点: {self.api_url}")
|
||
print(f" - 超时时间: {self.timeout}秒")
|
||
|
||
def load_prompt(self):
|
||
"""加载提示词"""
|
||
try:
|
||
with open(self.prompt_file, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
except Exception as e:
|
||
print(f"加载提示词失败: {e}")
|
||
return ""
|
||
|
||
def generate_analysis_text(self, session_data):
|
||
"""生成答题情况分析文本"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# 获取学员信息
|
||
cursor.execute('''
|
||
SELECT s.*, qs.started_at, qs.completed_at, qs.total_score
|
||
FROM students s
|
||
JOIN quiz_sessions qs ON s.id = qs.student_id
|
||
WHERE qs.id = ?
|
||
''', (session_data['id'],))
|
||
|
||
student_info = cursor.fetchone()
|
||
|
||
# 获取答题详情 - 从JSON格式读取,同时获取用户信息
|
||
cursor.execute('''
|
||
SELECT answers_data, student_name, student_school, student_grade, selected_tag
|
||
FROM quiz_answers
|
||
WHERE session_id = ?
|
||
''', (session_data['id'],))
|
||
|
||
result = cursor.fetchone()
|
||
answers = []
|
||
|
||
if result and result['answers_data']:
|
||
try:
|
||
answers = json.loads(result['answers_data'])
|
||
print(f"成功解析JSON答题数据,共{len(answers)}题")
|
||
except json.JSONDecodeError:
|
||
print(f"解析JSON答题数据失败: {session_data['id']}")
|
||
answers = []
|
||
else:
|
||
print(f"未找到答题数据: {session_data['id']}")
|
||
|
||
conn.close()
|
||
|
||
# 生成分析文本 - 确保markdown表格格式正确
|
||
analysis_text = "# 考试答题情况分析\n\n"
|
||
|
||
# 添加总分信息
|
||
total_score = student_info.get('total_score', 0) if student_info else 0
|
||
analysis_text += f"## 总分:{total_score} 分\n\n"
|
||
|
||
analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n"
|
||
analysis_text += "|------|------|----------|----------|----------|------|\n"
|
||
|
||
# 使用独立字段中的用户信息
|
||
student_name = result['student_name'] if result else student_info.get('name', '未知')
|
||
student_school = result['student_school'] if result else student_info.get('school', '未知')
|
||
student_grade = result['student_grade'] if result else student_info.get('grade', '未知')
|
||
selected_tag = result['selected_tag'] if result else '未指定'
|
||
|
||
# 添加基本信息
|
||
analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n"
|
||
|
||
# 添加答题详情 - 确保每个字段都不为空
|
||
for answer in answers:
|
||
is_correct = "✓" if answer.get('isCorrect', False) else "✗"
|
||
user_answer = answer.get('userAnswer', '').strip()
|
||
correct_answer = answer.get('correctAnswer', '').strip()
|
||
question_text = answer.get('questionText', '').strip()
|
||
question_type = answer.get('questionType', '').strip()
|
||
score = answer.get('score', 0)
|
||
|
||
# 确保题目文本不为空
|
||
if not question_text:
|
||
question_text = "未知题目"
|
||
|
||
# 确保题型不为空
|
||
if not question_type:
|
||
question_type = "单选题"
|
||
|
||
# 确保答案文本处理正确
|
||
if not user_answer:
|
||
user_answer = "未作答"
|
||
|
||
if not correct_answer:
|
||
correct_answer = "无标准答案"
|
||
|
||
# 转义markdown中的特殊字符
|
||
question_text = question_text.replace('|', '\\|')
|
||
user_answer = user_answer.replace('|', '\\|')
|
||
correct_answer = correct_answer.replace('|', '\\|')
|
||
|
||
analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n"
|
||
|
||
# 打印生成的分析文本用于调试
|
||
print(f"生成的分析文本长度: {len(analysis_text)} 字符")
|
||
print(f"包含题目数量: {len(answers)}")
|
||
print("分析文本预览:")
|
||
print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text)
|
||
|
||
return {
|
||
'analysis_text': analysis_text,
|
||
'student_info': dict(student_info),
|
||
'answers': answers
|
||
}
|
||
|
||
async def generate_report(self, session_id):
|
||
"""生成测评报告"""
|
||
try:
|
||
# 获取会话数据
|
||
analysis_data = self.generate_analysis_text({'id': session_id})
|
||
|
||
# 调用自定义API生成报告
|
||
# 构造回调URL
|
||
report_result = self.call_report_api(analysis_data, session_id)
|
||
|
||
if report_result and report_result.get('success'):
|
||
# API调用成功,异步生成中
|
||
# 保存分析数据用于后续回调处理
|
||
self.save_analysis_data_for_regeneration(session_id, analysis_data)
|
||
|
||
return {
|
||
'success': True,
|
||
'message': '报告异步生成中',
|
||
'session_id': session_id,
|
||
'analysis_data': analysis_data
|
||
}
|
||
else:
|
||
raise Exception("API调用失败")
|
||
|
||
except Exception as e:
|
||
print(f"生成报告失败: {e}")
|
||
# 保存分析数据到数据库,允许后续重新生成
|
||
self.save_analysis_data_for_regeneration(session_id, analysis_data)
|
||
return {
|
||
'success': False,
|
||
'error': str(e),
|
||
'session_id': session_id,
|
||
'can_regenerate': True
|
||
}
|
||
|
||
def call_report_api(self, analysis_data, session_id):
|
||
"""调用自定义报告API生成报告"""
|
||
# 构建请求数据
|
||
request_data = {
|
||
"analysis_text": analysis_data['analysis_text'],
|
||
"session_id": session_id
|
||
}
|
||
|
||
# 实现重试机制
|
||
max_retries = 3
|
||
retry_delay = 2 # 秒
|
||
|
||
try:
|
||
print(f"调用报告生成API...")
|
||
print(f" API端点: {self.api_url}")
|
||
print(f" Session ID: {session_id}")
|
||
print(f" 数据长度: {len(request_data['analysis_text'])} 字符")
|
||
|
||
response = requests.post(
|
||
self.api_url,
|
||
headers=self.headers,
|
||
json=request_data,
|
||
timeout=10 # 简化超时时间
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
print(f"✅ API调用成功,已提交异步报告生成请求")
|
||
print(f" 响应内容: {str(result)[:200]}...")
|
||
return {"success": True, "message": "异步报告生成请求已提交"}
|
||
else:
|
||
raise Exception(f"API调用失败: HTTP {response.status_code} - {response.text}")
|
||
|
||
except Exception as e:
|
||
print(f"API调用出错: {e}")
|
||
raise Exception(f"报告生成API调用失败: {str(e)}")
|
||
|
||
def save_report_to_db(self, session_id, report_data, analysis_data):
|
||
"""保存报告到数据库"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 创建报告表(如果不存在)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS reports (
|
||
id TEXT PRIMARY KEY,
|
||
session_id TEXT NOT NULL,
|
||
report_data TEXT NOT NULL,
|
||
analysis_data TEXT NOT NULL,
|
||
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
|
||
)
|
||
''')
|
||
|
||
# 保存报告
|
||
report_id = str(uuid.uuid4())
|
||
cursor.execute('''
|
||
INSERT INTO reports (id, session_id, report_data, analysis_data)
|
||
VALUES (?, ?, ?, ?)
|
||
''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False),
|
||
json.dumps(analysis_data, ensure_ascii=False)))
|
||
|
||
# 更新会话状态
|
||
cursor.execute('''
|
||
UPDATE quiz_sessions
|
||
SET status = 'report_generated'
|
||
WHERE id = ?
|
||
''', (session_id,))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
return report_id
|
||
|
||
def save_analysis_data_for_regeneration(self, session_id, analysis_data):
|
||
"""保存分析数据以便重新生成报告"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 创建临时分析数据表(如果不存在)
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS temp_analysis_data (
|
||
id TEXT PRIMARY KEY,
|
||
session_id TEXT NOT NULL,
|
||
analysis_data TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
|
||
)
|
||
''')
|
||
|
||
# 删除该session的旧数据(如果存在)
|
||
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
|
||
|
||
# 保存新的分析数据
|
||
analysis_id = str(uuid.uuid4())
|
||
cursor.execute('''
|
||
INSERT INTO temp_analysis_data (id, session_id, analysis_data)
|
||
VALUES (?, ?, ?)
|
||
''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False)))
|
||
|
||
# 更新会话状态为可以重新生成
|
||
cursor.execute('''
|
||
UPDATE quiz_sessions
|
||
SET status = 'can_regenerate'
|
||
WHERE id = ?
|
||
''', (session_id,))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
return analysis_id
|
||
|
||
class EnhancedSurveySystem:
|
||
"""增强的测评系统"""
|
||
|
||
def __init__(self):
|
||
self.report_generator = ReportGenerator()
|
||
self.init_database()
|
||
|
||
def init_database(self):
|
||
"""初始化数据库"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
# 确保必要的表存在
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS reports (
|
||
id TEXT PRIMARY KEY,
|
||
session_id TEXT NOT NULL,
|
||
report_data TEXT NOT NULL,
|
||
analysis_data TEXT NOT NULL,
|
||
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
|
||
)
|
||
''')
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def get_reports_list(self, page=1, page_size=10):
|
||
"""获取报告列表"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
# 获取总数
|
||
cursor.execute('SELECT COUNT(*) as total FROM reports')
|
||
total = cursor.fetchone()['total']
|
||
|
||
# 获取分页数据
|
||
offset = (page - 1) * page_size
|
||
cursor.execute('''
|
||
SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at
|
||
FROM reports r
|
||
JOIN quiz_sessions qs ON r.session_id = qs.id
|
||
JOIN students s ON qs.student_id = s.id
|
||
ORDER BY r.generated_at DESC
|
||
LIMIT ? OFFSET ?
|
||
''', (page_size, offset))
|
||
|
||
reports = [dict(row) for row in cursor.fetchall()]
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'reports': reports,
|
||
'total': total,
|
||
'page': page,
|
||
'page_size': page_size,
|
||
'total_pages': (total + page_size - 1) // page_size
|
||
}
|
||
|
||
def get_report_by_id(self, report_id):
|
||
"""根据ID获取报告"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT r.*, s.name, s.school, s.grade
|
||
FROM reports r
|
||
JOIN quiz_sessions qs ON r.session_id = qs.id
|
||
JOIN students s ON qs.student_id = s.id
|
||
WHERE r.id = ?
|
||
''', (report_id,))
|
||
|
||
report = cursor.fetchone()
|
||
conn.close()
|
||
|
||
return dict(report) if report else None
|
||
|
||
def get_sessions_can_regenerate(self):
|
||
"""获取可以重新生成的会话列表"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at
|
||
FROM quiz_sessions qs
|
||
JOIN students s ON qs.student_id = s.id
|
||
JOIN temp_analysis_data tad ON qs.id = tad.session_id
|
||
WHERE qs.status = 'can_regenerate'
|
||
ORDER BY tad.created_at DESC
|
||
''')
|
||
|
||
sessions = [dict(row) for row in cursor.fetchall()]
|
||
conn.close()
|
||
|
||
return sessions
|
||
|
||
async def auto_generate_report(self, session_id):
|
||
"""自动生成报告(答题完成后调用)"""
|
||
return await self.report_generator.generate_report(session_id)
|
||
|
||
async def regenerate_report(self, session_id):
|
||
"""重新生成报告(从保存的分析数据)"""
|
||
try:
|
||
# 获取保存的分析数据
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('''
|
||
SELECT analysis_data FROM temp_analysis_data
|
||
WHERE session_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
''', (session_id,))
|
||
|
||
result = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if not result:
|
||
raise Exception("未找到可重新生成的分析数据")
|
||
|
||
analysis_data = json.loads(result[0])
|
||
|
||
# 调用API生成报告
|
||
report_result = self.report_generator.call_report_api(analysis_data, session_id)
|
||
|
||
if report_result:
|
||
# 提取学员信息用于更新报告数据
|
||
student_info = analysis_data['student_info']
|
||
report_result['studentInfo']['name'] = student_info.get('name', '未知')
|
||
report_result['studentInfo']['school'] = student_info.get('school', '未知')
|
||
report_result['studentInfo']['grade'] = student_info.get('grade', '未知')
|
||
report_result['studentInfo']['subject'] = '科学'
|
||
report_result['studentInfo']['testDate'] = get_east8_time().strftime('%Y年%m月%d日')
|
||
|
||
# 构造报告数据
|
||
report_data = {
|
||
'studentInfo': report_result['studentInfo'],
|
||
'report': report_result['report'],
|
||
'generated_at': get_east8_time().isoformat(),
|
||
'session_id': session_id,
|
||
'analysis_data': analysis_data,
|
||
'is_regenerated': True
|
||
}
|
||
|
||
# 保存报告到数据库
|
||
self.report_generator.save_report_to_db(session_id, report_data, analysis_data)
|
||
|
||
# 清理临时数据
|
||
self.cleanup_temp_analysis_data(session_id)
|
||
|
||
return {
|
||
'success': True,
|
||
'report_data': report_data,
|
||
'analysis_data': analysis_data,
|
||
'is_regenerated': True
|
||
}
|
||
else:
|
||
raise Exception("API返回空内容")
|
||
|
||
except Exception as e:
|
||
print(f"重新生成报告失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': str(e)
|
||
}
|
||
|
||
def cleanup_temp_analysis_data(self, session_id):
|
||
"""清理临时分析数据"""
|
||
conn = sqlite3.connect('data/survey.db')
|
||
cursor = conn.cursor()
|
||
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
# 全局系统实例
|
||
enhanced_system = EnhancedSurveySystem()
|