194 lines
6.0 KiB
Python
194 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
测试新的自动校准和监听启动流程
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import time
|
||
import signal
|
||
from control_system import ControlSystem
|
||
|
||
def signal_handler(sig, frame):
|
||
"""信号处理函数"""
|
||
print('\n👋 收到中断信号,正在关闭系统...')
|
||
if 'control_system' in globals():
|
||
control_system.shutdown()
|
||
sys.exit(0)
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("🧪 测试自动校准和监听启动流程")
|
||
print("=" * 60)
|
||
|
||
# 注册信号处理
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
try:
|
||
# 1. 创建控制系统
|
||
print("📦 创建控制系统...")
|
||
config = {
|
||
'system': {
|
||
'max_queue_size': 1000,
|
||
'process_timeout': 30,
|
||
'heartbeat_interval': 1.0,
|
||
'log_level': "INFO"
|
||
},
|
||
'audio': {
|
||
'sample_rate': 16000,
|
||
'channels': 1,
|
||
'chunk_size': 1024
|
||
},
|
||
'recording': {
|
||
'min_duration': 2.0,
|
||
'max_duration': 30.0,
|
||
'silence_threshold': 3.0
|
||
},
|
||
'processing': {
|
||
'enable_asr': True,
|
||
'enable_llm': True,
|
||
'enable_tts': True,
|
||
'character': 'libai'
|
||
}
|
||
}
|
||
|
||
global control_system
|
||
control_system = ControlSystem(config)
|
||
|
||
# 设置角色
|
||
control_system.config['processing']['character'] = 'libai'
|
||
|
||
print("✅ 控制系统创建完成")
|
||
|
||
# 2. 测试手动启动流程
|
||
print("\n🎯 测试步骤1:手动启动流程")
|
||
print("-" * 40)
|
||
|
||
# 2.1 启动进程但不自动校准和监听
|
||
print("🔧 启动进程...")
|
||
control_system._start_processes()
|
||
time.sleep(2) # 等待进程启动
|
||
|
||
# 2.2 检查初始状态
|
||
print("\n🔍 检查初始状态...")
|
||
monitoring_status = control_system.get_monitoring_status()
|
||
calibration_status = control_system.get_calibration_status()
|
||
|
||
print(f" 监听状态: {monitoring_status}")
|
||
print(f" 校准状态: {calibration_status}")
|
||
|
||
# 2.3 手动启动校准
|
||
print("\n🎯 手动启动校准...")
|
||
success = control_system.start_calibration()
|
||
print(f" 校准启动结果: {success}")
|
||
|
||
if success:
|
||
print("\n⏱️ 等待校准完成...")
|
||
if control_system.wait_for_calibration_complete(timeout=30):
|
||
print("✅ 校准完成")
|
||
else:
|
||
print("⚠️ 校准超时")
|
||
|
||
# 2.4 手动启动监听
|
||
print("\n🎯 手动启动监听...")
|
||
success = control_system.start_monitoring()
|
||
print(f" 监听启动结果: {success}")
|
||
|
||
time.sleep(1) # 等待监听启动
|
||
|
||
# 2.5 验证监听状态
|
||
monitoring_status = control_system.get_monitoring_status()
|
||
print(f" 监听状态: {monitoring_status}")
|
||
|
||
# 3. 运行一段时间测试
|
||
print("\n🎙️ 系统运行测试")
|
||
print("-" * 40)
|
||
print("💡 请说话测试语音检测功能")
|
||
print("⏱️ 运行10秒...")
|
||
|
||
start_time = time.time()
|
||
while time.time() - start_time < 10:
|
||
try:
|
||
control_system.check_events()
|
||
time.sleep(0.1)
|
||
except KeyboardInterrupt:
|
||
break
|
||
|
||
# 4. 停止监听
|
||
print("\n🛑 停止监听...")
|
||
success = control_system.stop_monitoring()
|
||
print(f" 停止结果: {success}")
|
||
|
||
time.sleep(1)
|
||
|
||
# 5. 关闭系统
|
||
print("\n🔄 关闭当前系统...")
|
||
control_system.shutdown()
|
||
time.sleep(2)
|
||
|
||
# 6. 测试自动启动流程
|
||
print("\n🎯 测试步骤2:自动启动流程")
|
||
print("-" * 40)
|
||
|
||
# 创建新的控制系统实例
|
||
control_system = ControlSystem(config)
|
||
control_system.config['processing']['character'] = 'libai'
|
||
|
||
print("\n🚀 启动系统(自动校准和监听)...")
|
||
# 这里我们只启动进程,不进入主循环
|
||
control_system._start_processes()
|
||
|
||
# 手动执行自动校准和监听流程
|
||
print("\n🎯 自动启动校准...")
|
||
success = control_system.start_calibration()
|
||
if success and control_system.wait_for_calibration_complete(timeout=30):
|
||
print("✅ 自动校准完成")
|
||
|
||
print("\n🎯 自动启动监听...")
|
||
success = control_system.start_monitoring()
|
||
if success:
|
||
print("✅ 自动监听启动完成")
|
||
|
||
time.sleep(2)
|
||
|
||
# 检查最终状态
|
||
monitoring_status = control_system.get_monitoring_status()
|
||
calibration_status = control_system.get_calibration_status()
|
||
|
||
print(f"\n📊 最终状态:")
|
||
print(f" 监听状态: {monitoring_status}")
|
||
print(f" 校准状态: {calibration_status}")
|
||
|
||
# 7. 运行一段时间
|
||
print("\n🎙️ 最终运行测试")
|
||
print("-" * 40)
|
||
print("💡 请说话测试最终功能")
|
||
print("⏱️ 运行10秒...")
|
||
|
||
start_time = time.time()
|
||
while time.time() - start_time < 10:
|
||
try:
|
||
control_system.check_events()
|
||
control_system.display_status()
|
||
time.sleep(0.1)
|
||
except KeyboardInterrupt:
|
||
break
|
||
|
||
print("\n🎉 测试完成!")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试过程中出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
# 确保系统关闭
|
||
if 'control_system' in globals():
|
||
print("\n🛑 确保系统关闭...")
|
||
control_system.shutdown()
|
||
|
||
print("✅ 测试结束")
|
||
|
||
if __name__ == "__main__":
|
||
main() |