survey/enhanced_survey_system.py
朱潮 99796408cf Initial commit: Add survey system with enhanced features
- Complete survey management system with web interface
- Question generation tools and prompts
- Report generation and analysis capabilities
- Docker configuration for deployment
- Database initialization scripts

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-28 20:28:57 +08:00

554 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import uuid
import requests
import os
import time
from datetime import datetime, timezone, timedelta
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
def get_east8_time():
"""获取东八区时间"""
east8_tz = timezone(timedelta(hours=8))
return datetime.now(east8_tz)
def get_east8_time_string():
"""获取东八区时间字符串格式,用于数据库存储"""
return get_east8_time().strftime('%Y-%m-%d %H:%M:%S')
def load_env_config():
"""加载环境变量配置"""
try:
# 尝试加载 .env 文件
env_file = "/Users/moshui/Documents/survey/.env"
if os.path.exists(env_file):
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
except Exception as e:
print(f"加载环境变量失败: {e}")
class ReportGenerator:
"""报告生成器 - 调用自定义API生成测评报告"""
def __init__(self):
# 加载环境变量
load_env_config()
# 自定义API配置
self.api_url = os.getenv("REPORT_API_URL", "http://120.26.23.172:5678/webhook/survey_report")
self.api_key = os.getenv("REPORT_API_KEY", "")
self.timeout = int(os.getenv("REPORT_API_TIMEOUT", "300"))
self.prompt_file = "/Users/moshui/Documents/survey/public/prompt.txt"
# 请求头
self.headers = {
"Content-Type": "application/json"
}
# 添加API密钥如果设置了
if self.api_key:
self.headers["Authorization"] = f"Bearer {self.api_key}"
print(f"报告生成器配置:")
print(f" - API端点: {self.api_url}")
print(f" - 超时时间: {self.timeout}")
def load_prompt(self):
"""加载提示词"""
try:
with open(self.prompt_file, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
print(f"加载提示词失败: {e}")
return ""
def generate_analysis_text(self, session_data):
"""生成答题情况分析文本"""
conn = sqlite3.connect('survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取学员信息
cursor.execute('''
SELECT s.*, qs.started_at, qs.completed_at, qs.total_score
FROM students s
JOIN quiz_sessions qs ON s.id = qs.student_id
WHERE qs.id = ?
''', (session_data['id'],))
student_info = cursor.fetchone()
# 获取答题详情 - 从JSON格式读取同时获取用户信息
cursor.execute('''
SELECT answers_data, student_name, student_school, student_grade, selected_tag
FROM quiz_answers
WHERE session_id = ?
''', (session_data['id'],))
result = cursor.fetchone()
answers = []
if result and result['answers_data']:
try:
answers = json.loads(result['answers_data'])
print(f"成功解析JSON答题数据{len(answers)}")
except json.JSONDecodeError:
print(f"解析JSON答题数据失败: {session_data['id']}")
answers = []
else:
print(f"未找到答题数据: {session_data['id']}")
conn.close()
# 生成分析文本 - 确保markdown表格格式正确
analysis_text = "# 考试答题情况分析\n\n"
analysis_text += "| 题目 | 题型 | 用户答案 | 正确答案 | 是否正确 | 得分 |\n"
analysis_text += "|------|------|----------|----------|----------|------|\n"
# 使用独立字段中的用户信息
student_name = result['student_name'] if result else student_info.get('name', '未知')
student_school = result['student_school'] if result else student_info.get('school', '未知')
student_grade = result['student_grade'] if result else student_info.get('grade', '未知')
selected_tag = result['selected_tag'] if result else '未指定'
# 添加基本信息
analysis_text += f"| 姓名 | 填空题 | {student_name} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 学校 | 填空题 | {student_school} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 年级 | 填空题 | {student_grade} | 无标准答案 | 无法判断 | 不适用 |\n"
analysis_text += f"| 考试标签 | 填空题 | {selected_tag} | 无标准答案 | 无法判断 | 不适用 |\n"
# 添加答题详情 - 确保每个字段都不为空
for answer in answers:
is_correct = "" if answer.get('isCorrect', False) else ""
user_answer = answer.get('userAnswer', '').strip()
correct_answer = answer.get('correctAnswer', '').strip()
question_text = answer.get('questionText', '').strip()
question_type = answer.get('questionType', '').strip()
score = answer.get('score', 0)
# 确保题目文本不为空
if not question_text:
question_text = "未知题目"
# 确保题型不为空
if not question_type:
question_type = "单选题"
# 确保答案文本处理正确
if not user_answer:
user_answer = "未作答"
if not correct_answer:
correct_answer = "无标准答案"
# 转义markdown中的特殊字符
question_text = question_text.replace('|', '\\|')
user_answer = user_answer.replace('|', '\\|')
correct_answer = correct_answer.replace('|', '\\|')
analysis_text += f"| {question_text} | {question_type} | {user_answer} | {correct_answer} | {is_correct} | {score} |\n"
# 打印生成的分析文本用于调试
print(f"生成的分析文本长度: {len(analysis_text)} 字符")
print(f"包含题目数量: {len(answers)}")
print("分析文本预览:")
print(analysis_text[:500] + "..." if len(analysis_text) > 500 else analysis_text)
return {
'analysis_text': analysis_text,
'student_info': dict(student_info),
'answers': answers
}
async def generate_report(self, session_id):
"""生成测评报告"""
try:
# 获取会话数据
analysis_data = self.generate_analysis_text({'id': session_id})
# 调用自定义API生成报告
report_result = self.call_report_api(analysis_data)
if report_result:
# 提取学员信息用于更新报告数据
student_info = analysis_data['student_info']
report_result['studentInfo']['name'] = student_info.get('name', '未知')
report_result['studentInfo']['school'] = student_info.get('school', '未知')
report_result['studentInfo']['grade'] = student_info.get('grade', '未知')
report_result['studentInfo']['subject'] = '科学'
report_result['studentInfo']['testDate'] = get_east8_time().strftime('%Y年%m月%d')
# 构造报告数据
report_data = {
'studentInfo': report_result['studentInfo'],
'report': report_result['report'],
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data
}
# 保存报告到数据库
self.save_report_to_db(session_id, report_data, analysis_data)
return {
'success': True,
'report_data': report_data,
'analysis_data': analysis_data
}
else:
raise Exception("API返回空内容")
except Exception as e:
print(f"生成报告失败: {e}")
# 保存分析数据到数据库,允许后续重新生成
self.save_analysis_data_for_regeneration(session_id, analysis_data)
return {
'success': False,
'error': str(e),
'session_id': session_id,
'can_regenerate': True
}
def call_report_api(self, analysis_data):
"""调用自定义报告API生成报告"""
# 构建请求数据
request_data = {
"analysis_text": analysis_data['analysis_text']
}
# 实现重试机制
max_retries = 3
retry_delay = 2 # 秒
for attempt in range(max_retries):
try:
print(f"调用报告生成API (尝试 {attempt + 1}/{max_retries})...")
print(f" API端点: {self.api_url}")
print(f" 数据长度: {len(request_data['analysis_text'])} 字符")
response = requests.post(
self.api_url,
headers=self.headers,
json=request_data,
timeout=self.timeout
)
if response.status_code == 200:
result = response.json()
# 验证响应格式
if 'studentInfo' in result and 'report' in result:
print(f"✅ API调用成功生成报告数据")
return result
else:
print(f"⚠️ API响应格式异常但继续处理")
print(f" 响应内容: {str(result)[:200]}...")
return result
elif response.status_code == 401:
raise Exception("API密钥无效或已过期")
elif response.status_code == 429:
# 速率限制,增加等待时间
wait_time = retry_delay * (2 ** attempt)
print(f"API速率限制等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
elif response.status_code >= 500:
# 服务器错误,重试
wait_time = retry_delay * (2 ** attempt)
print(f"API服务器错误 ({response.status_code}){wait_time} 秒后重试...")
time.sleep(wait_time)
continue
else:
raise Exception(f"API调用失败: HTTP {response.status_code} - {response.text}")
except requests.exceptions.Timeout:
wait_time = retry_delay * (2 ** attempt)
print(f"API请求超时{wait_time} 秒后重试...")
if attempt < max_retries - 1:
time.sleep(wait_time)
else:
raise Exception(f"API请求超时已达到最大重试次数 ({self.timeout}秒)")
except requests.exceptions.ConnectionError:
wait_time = retry_delay * (2 ** attempt)
print(f"API连接错误{wait_time} 秒后重试...")
if attempt < max_retries - 1:
time.sleep(wait_time)
else:
raise Exception("API连接失败已达到最大重试次数")
except Exception as e:
if attempt == max_retries - 1:
raise Exception(f"API调用失败: {str(e)}")
else:
wait_time = retry_delay * (2 ** attempt)
print(f"API调用出错: {e}{wait_time} 秒后重试...")
time.sleep(wait_time)
return None
def save_report_to_db(self, session_id, report_data, analysis_data):
"""保存报告到数据库"""
conn = sqlite3.connect('survey.db')
cursor = conn.cursor()
# 创建报告表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
report_data TEXT NOT NULL,
analysis_data TEXT NOT NULL,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
# 保存报告
report_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO reports (id, session_id, report_data, analysis_data)
VALUES (?, ?, ?, ?)
''', (report_id, session_id, json.dumps(report_data, ensure_ascii=False),
json.dumps(analysis_data, ensure_ascii=False)))
# 更新会话状态
cursor.execute('''
UPDATE quiz_sessions
SET status = 'report_generated'
WHERE id = ?
''', (session_id,))
conn.commit()
conn.close()
return report_id
def save_analysis_data_for_regeneration(self, session_id, analysis_data):
"""保存分析数据以便重新生成报告"""
conn = sqlite3.connect('survey.db')
cursor = conn.cursor()
# 创建临时分析数据表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS temp_analysis_data (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
analysis_data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
# 删除该session的旧数据如果存在
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
# 保存新的分析数据
analysis_id = str(uuid.uuid4())
cursor.execute('''
INSERT INTO temp_analysis_data (id, session_id, analysis_data)
VALUES (?, ?, ?)
''', (analysis_id, session_id, json.dumps(analysis_data, ensure_ascii=False)))
# 更新会话状态为可以重新生成
cursor.execute('''
UPDATE quiz_sessions
SET status = 'can_regenerate'
WHERE id = ?
''', (session_id,))
conn.commit()
conn.close()
return analysis_id
class EnhancedSurveySystem:
"""增强的测评系统"""
def __init__(self):
self.report_generator = ReportGenerator()
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect('survey.db')
cursor = conn.cursor()
# 确保必要的表存在
cursor.execute('''
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
report_data TEXT NOT NULL,
analysis_data TEXT NOT NULL,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES quiz_sessions (id)
)
''')
conn.commit()
conn.close()
def get_reports_list(self, page=1, page_size=10):
"""获取报告列表"""
conn = sqlite3.connect('survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取总数
cursor.execute('SELECT COUNT(*) as total FROM reports')
total = cursor.fetchone()['total']
# 获取分页数据
offset = (page - 1) * page_size
cursor.execute('''
SELECT r.*, s.name, s.school, s.grade, qs.total_score, qs.completed_at
FROM reports r
JOIN quiz_sessions qs ON r.session_id = qs.id
JOIN students s ON qs.student_id = s.id
ORDER BY r.generated_at DESC
LIMIT ? OFFSET ?
''', (page_size, offset))
reports = [dict(row) for row in cursor.fetchall()]
conn.close()
return {
'reports': reports,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
def get_report_by_id(self, report_id):
"""根据ID获取报告"""
conn = sqlite3.connect('survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT r.*, s.name, s.school, s.grade
FROM reports r
JOIN quiz_sessions qs ON r.session_id = qs.id
JOIN students s ON qs.student_id = s.id
WHERE r.id = ?
''', (report_id,))
report = cursor.fetchone()
conn.close()
return dict(report) if report else None
def get_sessions_can_regenerate(self):
"""获取可以重新生成的会话列表"""
conn = sqlite3.connect('survey.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT qs.*, s.name, s.school, s.grade, tad.created_at as analysis_created_at
FROM quiz_sessions qs
JOIN students s ON qs.student_id = s.id
JOIN temp_analysis_data tad ON qs.id = tad.session_id
WHERE qs.status = 'can_regenerate'
ORDER BY tad.created_at DESC
''')
sessions = [dict(row) for row in cursor.fetchall()]
conn.close()
return sessions
async def auto_generate_report(self, session_id):
"""自动生成报告(答题完成后调用)"""
return await self.report_generator.generate_report(session_id)
async def regenerate_report(self, session_id):
"""重新生成报告(从保存的分析数据)"""
try:
# 获取保存的分析数据
conn = sqlite3.connect('survey.db')
cursor = conn.cursor()
cursor.execute('''
SELECT analysis_data FROM temp_analysis_data
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
''', (session_id,))
result = cursor.fetchone()
conn.close()
if not result:
raise Exception("未找到可重新生成的分析数据")
analysis_data = json.loads(result[0])
# 调用API生成报告
report_result = self.report_generator.call_report_api(analysis_data)
if report_result:
# 提取学员信息用于更新报告数据
student_info = analysis_data['student_info']
report_result['studentInfo']['name'] = student_info.get('name', '未知')
report_result['studentInfo']['school'] = student_info.get('school', '未知')
report_result['studentInfo']['grade'] = student_info.get('grade', '未知')
report_result['studentInfo']['subject'] = '科学'
report_result['studentInfo']['testDate'] = get_east8_time().strftime('%Y年%m月%d')
# 构造报告数据
report_data = {
'studentInfo': report_result['studentInfo'],
'report': report_result['report'],
'generated_at': get_east8_time().isoformat(),
'session_id': session_id,
'analysis_data': analysis_data,
'is_regenerated': True
}
# 保存报告到数据库
self.report_generator.save_report_to_db(session_id, report_data, analysis_data)
# 清理临时数据
self.cleanup_temp_analysis_data(session_id)
return {
'success': True,
'report_data': report_data,
'analysis_data': analysis_data,
'is_regenerated': True
}
else:
raise Exception("API返回空内容")
except Exception as e:
print(f"重新生成报告失败: {e}")
return {
'success': False,
'error': str(e)
}
def cleanup_temp_analysis_data(self, session_id):
"""清理临时分析数据"""
conn = sqlite3.connect('survey.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM temp_analysis_data WHERE session_id = ?', (session_id,))
conn.commit()
conn.close()
# 全局系统实例
enhanced_system = EnhancedSurveySystem()