修复结束问题

This commit is contained in:
朱潮 2025-09-21 18:33:36 +08:00
parent ccbca440a7
commit ce1f101ec7
9 changed files with 924 additions and 13 deletions

101
QUICKSTART.md Normal file
View File

@ -0,0 +1,101 @@
# 快速启动指南
## 一键启动(推荐)
```bash
# 直接运行,系统会自动校准和启动监听
python multiprocess_recorder.py
# 指定角色
python multiprocess_recorder.py -c libai
# 详细模式
python multiprocess_recorder.py -v
```
## 编程方式启动
### 最简单的方式
```python
from control_system import ControlSystem
# 创建控制系统
control_system = ControlSystem()
# 一键启动(自动校准 + 自动监听)
control_system.start()
```
### 自定义配置
```python
from control_system import ControlSystem
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 启动选项:
# auto_calibration=True - 自动校准语音检测器
# auto_monitoring=True - 自动启动音频监听
control_system.start(auto_calibration=True, auto_monitoring=True)
```
## 手动控制
```python
from control_system import ControlSystem
control_system = ControlSystem()
# 只启动进程,不自动校准和监听
control_system._start_processes()
# 手动步骤:
control_system.start_calibration() # 1. 启动校准
control_system.wait_for_calibration_complete() # 2. 等待校准完成
control_system.start_monitoring() # 3. 启动监听
# 运行中可以随时控制:
control_system.stop_monitoring() # 停止监听
control_system.start_monitoring() # 重新启动监听
# 查询状态:
status = control_system.get_calibration_status() # 获取校准状态
status = control_system.get_monitoring_status() # 获取监听状态
# 关闭系统:
control_system.shutdown()
```
## 启动流程
系统启动时会按以下顺序执行:
1. **启动进程** - 创建输入进程和输出进程
2. **自动校准** - 校准语音检测器约3-5秒
3. **启动监听** - 启用音频监听功能
4. **开始运行** - 进入主控制循环,开始检测语音
## 注意事项
- **校准时间**首次启动需要3-5秒进行语音检测器校准
- **音频权限**:确保麦克风权限已授予
- **环境安静**:校准时请保持环境安静
- **API密钥**如需LLM功能请设置 `ARK_API_KEY` 环境变量
## 故障排除
如果校准失败:
- 检查麦克风是否正常工作
- 确保环境安静,无背景噪音
- 尝试重新启动系统
如果监听失败:
- 检查音频设备是否被其他程序占用
- 尝试重启程序
- 查看日志文件排查问题

210
README_MANUAL_CONTROL.md Normal file
View File

@ -0,0 +1,210 @@
# 多进程音频控制系统 - 主进程控制功能
## 概述
本系统已经重构,支持主进程对输入进程的校准和监听功能进行精确控制。通过这些新功能,你可以:
1. **手动控制校准过程**:在适当的时间启动语音检测器校准
2. **精确控制监听状态**:按需启用或禁用音频监听
3. **获取实时状态**:查询校准进度和监听状态
## 主要功能
### 1. 校准功能
#### 启动校准
```python
# 启动语音检测器校准
success = control_system.start_calibration()
if success:
print("校准已启动")
```
#### 获取校准状态
```python
# 获取当前校准状态
status = control_system.get_calibration_status()
if status:
print(f"校准进度: {status['progress']*100:.1f}%")
print(f"是否在校准中: {status['calibrating']}")
```
#### 等待校准完成
```python
# 等待校准完成30秒超时
if control_system.wait_for_calibration_complete(timeout=30):
print("校准完成")
else:
print("校准超时")
```
### 2. 监听功能
#### 启动监听
```python
# 启动音频监听
success = control_system.start_monitoring()
if success:
print("监听已启动")
```
#### 停止监听
```python
# 停止音频监听
success = control_system.stop_monitoring()
if success:
print("监听已停止")
```
#### 获取监听状态
```python
# 获取当前监听状态
status = control_system.get_monitoring_status()
if status:
print(f"监听启用: {status['enabled']}")
print(f"正在录音: {status['recording']}")
print(f"音频流活跃: {status['audio_stream_active']}")
```
## 使用示例
### 方法1自动启动推荐
```python
from control_system import ControlSystem
# 1. 创建控制系统
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 2. 一键启动(自动校准和监听)
control_system.start(auto_calibration=True, auto_monitoring=True)
# 系统现在正在运行,会自动处理语音检测和录音
```
### 方法2手动控制
```python
from control_system import ControlSystem
import time
# 1. 创建控制系统
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 2. 启动进程(但不自动启用监听)
control_system._start_processes()
# 3. 步骤1校准
print("开始校准...")
control_system.start_calibration()
# 等待校准完成
if control_system.wait_for_calibration_complete(timeout=30):
print("校准完成")
else:
print("校准失败")
exit(1)
# 4. 步骤2启动监听
print("开始监听...")
control_system.start_monitoring()
# 5. 运行一段时间
print("系统运行中...")
try:
while True:
# 检查事件和显示状态
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
print("用户中断")
# 6. 停止监听
print("停止监听...")
control_system.stop_monitoring()
# 7. 关闭系统
control_system.shutdown()
```
### 方法3混合控制
```python
from control_system import ControlSystem
# 1. 创建控制系统
control_system = ControlSystem(config)
# 2. 自动启动,但只校准,不自动监听
control_system.start(auto_calibration=True, auto_monitoring=False)
# 3. 手动控制监听
control_system.start_monitoring() # 启动监听
# ... 运行一段时间 ...
control_system.stop_monitoring() # 停止监听
control_system.start_monitoring() # 重新启动监听
# 4. 关闭系统
control_system.shutdown()
```
### 自动化示例
查看 `example_manual_control.py` 文件获取完整的自动化控制示例。
## 关键变化
### 1. 默认行为变化
- **之前**:输入进程启动后自动开始校准和监听
- **现在**:输入进程启动后处于静默状态,等待主进程命令
### 2. 新增控制接口
`ControlSystem` 类中新增了以下方法:
- `start_calibration()` - 启动校准
- `start_monitoring()` - 启动监听
- `stop_monitoring()` - 停止监听
- `get_calibration_status()` - 获取校准状态
- `get_monitoring_status()` - 获取监听状态
- `wait_for_calibration_complete(timeout)` - 等待校准完成
### 3. 新增命令支持
`InputProcess` 中支持以下新命令:
- `start_calibration` - 开始校准
- `start_monitoring` - 开始监听
- `stop_monitoring` - 停止监听
- `get_calibration_status` - 获取校准状态
- `get_monitoring_status` - 获取监听状态
## 使用建议
1. **初始化顺序**:建议按照"启动进程 → 校准 → 启动监听"的顺序进行
2. **错误处理**:建议对每个操作进行错误检查和重试
3. **状态监控**:定期检查状态以确保系统正常运行
4. **资源清理**:使用完毕后正确关闭系统
## 注意事项
1. **进程间通信**:所有控制都是通过进程间队列实现的,可能会有轻微延迟
2. **超时处理**:建议为所有状态查询操作设置合理的超时时间
3. **并发安全**:确保在多线程环境中正确使用这些方法
4. **音频设备**:启动和停止监听会重新初始化音频设备,可能有短暂延迟

View File

@ -75,7 +75,7 @@ class InputProcess:
self.CHUNK_SIZE = 1024
# 状态控制
self.recording_enabled = True # 是否允许录音
self.recording_enabled = False # 是否允许录音(默认不启用,等待外部命令)
self.is_recording = False # 是否正在录音
self.recording_buffer = [] # 录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
@ -140,8 +140,14 @@ class InputProcess:
# 1. 检查主进程命令
self._check_commands()
# 2. 如果允许录音,处理音频
if self.recording_enabled:
# 2. 处理音频的条件:
# - 录音已启用,或者
# - 正在校准中(需要音频数据进行校准)
should_process_audio = (self.recording_enabled or
(hasattr(self.voice_detector, 'calibration_mode') and
self.voice_detector.calibration_mode))
if should_process_audio:
self._process_audio()
# 3. 短暂休眠减少CPU占用
@ -155,6 +161,67 @@ class InputProcess:
self._cleanup()
self.logger.info("输入进程退出")
def start_calibration(self):
"""开始校准语音检测器"""
if hasattr(self, 'voice_detector') and self.voice_detector:
self.voice_detector.reset()
self.voice_detector.calibration_mode = True
self.voice_detector.calibration_samples = 0
# 确保音频流已设置(校准需要音频数据)
if not self.input_stream:
self._setup_audio()
print("🎙️ 输入进程:开始语音检测器校准")
return True
return False
def start_monitoring(self):
"""开始监听音频"""
if not self.recording_enabled:
self.recording_enabled = True
# 重新初始化音频流
self._cleanup_audio_stream()
self.recording_buffer = []
self.pre_record_buffer = []
self.is_recording = False
self.silence_start_time = None
self.consecutive_silence_count = 0
self._setup_audio()
print("🎙️ 输入进程:开始音频监听")
return True
return False
def stop_monitoring(self):
"""停止监听音频"""
if self.recording_enabled:
self.recording_enabled = False
if self.is_recording:
self._stop_recording()
self._cleanup_audio_stream()
print("🎙️ 输入进程:停止音频监听")
return True
return False
def get_calibration_status(self):
"""获取校准状态"""
if hasattr(self, 'voice_detector') and self.voice_detector:
return {
'calibrating': self.voice_detector.calibration_mode,
'progress': self.voice_detector.calibration_samples / self.voice_detector.required_calibration,
'samples': self.voice_detector.calibration_samples,
'required': self.voice_detector.required_calibration
}
return {'calibrating': False, 'progress': 0, 'samples': 0, 'required': 0}
def get_monitoring_status(self):
"""获取监听状态"""
return {
'enabled': self.recording_enabled,
'recording': self.is_recording,
'audio_stream_active': self.input_stream is not None
}
def _setup_audio(self):
"""设置音频输入设备"""
try:
@ -228,6 +295,41 @@ class InputProcess:
self._cleanup_audio_stream()
self.logger.info("录音功能已禁用")
elif command.command == 'start_calibration':
success = self.start_calibration()
if success:
self.logger.info("校准已启动")
else:
self.logger.error("校准启动失败")
elif command.command == 'start_monitoring':
success = self.start_monitoring()
if success:
self.logger.info("监听已启动")
else:
self.logger.error("监听启动失败")
elif command.command == 'stop_monitoring':
success = self.stop_monitoring()
if success:
self.logger.info("监听已停止")
else:
self.logger.error("监听停止失败")
elif command.command == 'get_calibration_status':
status = self.get_calibration_status()
self.event_queue.put(ProcessEvent(
event_type='calibration_status',
metadata=status
))
elif command.command == 'get_monitoring_status':
status = self.get_monitoring_status()
self.event_queue.put(ProcessEvent(
event_type='monitoring_status',
metadata=status
))
elif command.command == 'shutdown':
self.logger.info("收到关闭命令")
self.running = False
@ -254,6 +356,15 @@ class InputProcess:
detection_result = self.voice_detector.is_voice_advanced(data)
is_voice = detection_result['is_voice']
# 如果正在校准,确保语音检测器能处理数据
if hasattr(self.voice_detector, 'calibration_mode') and self.voice_detector.calibration_mode:
# 校准模式下显示进度
if hasattr(self.voice_detector, 'calibration_samples'):
progress = (self.voice_detector.calibration_samples /
self.voice_detector.required_calibration * 100) if hasattr(self.voice_detector, 'required_calibration') else 0
status = f"\r🎙️ 校准中... {progress:.1f}%"
print(status, end='', flush=True)
if self.is_recording:
# 录音模式
self.recording_buffer.append(data)

View File

@ -195,10 +195,9 @@ class ControlSystem:
print("⚠️ 未设置 ARK_API_KEY 环境变量,大语言模型功能将被禁用")
self.config['processing']['enable_llm'] = False
def start(self):
"""启动系统"""
print("🚀 启动多进程音频控制系统")
print("=" * 60)
def _start_processes(self):
"""内部方法:启动进程但不启动控制循环"""
print("🚀 启动进程...")
# 创建并启动输入进程
input_config = {
@ -239,11 +238,118 @@ class ControlSystem:
print("🎙️ 输入进程:负责录音和语音检测")
print("🔊 输出进程:负责音频播放")
print("🎯 主控制负责协调和AI处理")
def start(self, auto_calibration=True, auto_monitoring=True):
"""启动系统
Args:
auto_calibration: 是否自动启动校准
auto_monitoring: 是否自动启动监听
"""
print("🚀 启动多进程音频控制系统")
print("=" * 60)
# 启动进程
self._start_processes()
print("=" * 60)
if auto_calibration:
# 自动启动校准
print("🎯 自动启动语音检测器校准...")
success = self.start_calibration()
if not success:
print("⚠️ 校准启动失败,继续运行...")
else:
# 等待校准完成
if self.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
else:
print("⚠️ 校准超时,继续运行...")
if auto_monitoring:
# 自动启动监听
print("🎯 自动启动音频监听...")
success = self.start_monitoring()
if success:
print("✅ 监听已启动")
else:
print("⚠️ 监听启动失败")
print("=" * 60)
print("🎙️ 系统就绪,开始检测语音...")
print("=" * 60)
# 启动主控制循环
self._control_loop()
def start_calibration(self):
"""启动语音检测器校准"""
print("🎯 启动语音检测器校准...")
self.input_command_queue.put(ControlCommand('start_calibration'))
return True
def start_monitoring(self):
"""启动音频监听"""
print("🎯 启动音频监听...")
self.input_command_queue.put(ControlCommand('start_monitoring'))
return True
def stop_monitoring(self):
"""停止音频监听"""
print("🎯 停止音频监听...")
self.input_command_queue.put(ControlCommand('stop_monitoring'))
return True
def get_calibration_status(self):
"""获取校准状态"""
self.input_command_queue.put(ControlCommand('get_calibration_status'))
# 等待响应
start_time = time.time()
while time.time() - start_time < 5.0: # 5秒超时
try:
event = self.input_event_queue.get(timeout=0.1)
if event.event_type == 'calibration_status':
return event.metadata
except queue.Empty:
continue
return None
def get_monitoring_status(self):
"""获取监听状态"""
self.input_command_queue.put(ControlCommand('get_monitoring_status'))
# 等待响应
start_time = time.time()
while time.time() - start_time < 5.0: # 5秒超时
try:
event = self.input_event_queue.get(timeout=0.1)
if event.event_type == 'monitoring_status':
return event.metadata
except queue.Empty:
continue
return None
def wait_for_calibration_complete(self, timeout=30):
"""等待校准完成"""
print(f"⏱️ 等待校准完成,超时时间: {timeout}秒...")
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_calibration_status()
if status and not status['calibrating']:
print("✅ 校准完成")
return True
# 显示进度
if status:
progress = status['progress'] * 100
print(f"\r🔧 校准进度: {progress:.1f}%", end='', flush=True)
time.sleep(0.5)
print(f"\n⚠️ 校准超时")
return False
def _control_loop(self):
"""主控制循环"""
print("🎯 主控制循环启动")
@ -288,10 +394,23 @@ class ControlSystem:
print("🎯 状态IDLE播放刚完成等待延迟启用录音")
self._just_finished_playing = False # 重置标志
else:
# 启用输入进程录音功能
self.input_command_queue.put(ControlCommand('enable_recording'))
self.state = RecordingState.RECORDING
print("🎯 状态IDLE → RECORDING")
# 检查监听状态
monitoring_status = self.get_monitoring_status()
if monitoring_status and monitoring_status['enabled']:
# 监听已启用,切换到录音状态
self.state = RecordingState.RECORDING
print("🎯 状态IDLE → RECORDING监听已启用")
else:
# 监听未启用,尝试启用
print("🎯 状态IDLE监听未启用尝试启用")
success = self.start_monitoring()
if success:
# 监听启用成功,等待状态更新后进入录音状态
time.sleep(0.5) # 等待状态更新
self.state = RecordingState.RECORDING
print("🎯 状态IDLE → RECORDING监听已启用")
else:
print("⚠️ 监听启用失败保持IDLE状态")
def _handle_recording_state(self):
"""处理录音状态"""
@ -334,6 +453,10 @@ class ControlSystem:
except queue.Empty:
pass
def check_events(self):
"""公开方法:检查进程事件"""
return self._check_events()
def _handle_recording_complete(self, event: ProcessEvent):
"""处理录音完成事件"""
# 禁用输入进程录音功能
@ -1314,6 +1437,10 @@ class ControlSystem:
status_str = " | ".join(status_lines)
print(f"\r{status_str}", end='', flush=True)
def display_status(self):
"""公开方法:显示系统状态"""
return self._display_status()
def shutdown(self):
"""关闭系统"""
print("\n🛑 正在关闭系统...")

161
example_manual_control.py Normal file
View File

@ -0,0 +1,161 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
主进程控制示例
演示如何从主进程控制输入进程的校准和监听功能
"""
import sys
import os
import time
import json
from control_system import ControlSystem
def main():
"""主函数 - 演示主进程控制"""
print("🚀 主进程控制示例")
print("=" * 60)
# 1. 创建控制系统
print("📦 创建控制系统...")
config = {
'system': {
'max_queue_size': 1000,
'process_timeout': 30,
'heartbeat_interval': 1.0,
'log_level': "INFO"
},
'audio': {
'sample_rate': 16000,
'channels': 1,
'chunk_size': 1024
},
'recording': {
'min_duration': 2.0,
'max_duration': 30.0,
'silence_threshold': 3.0
},
'processing': {
'enable_asr': True,
'enable_llm': True,
'enable_tts': True,
'character': 'libai'
}
}
control_system = ControlSystem(config)
# 2. 启动系统
print("🔧 启动系统(包含自动校准和监听)...")
# 使用新的自动启动功能
control_system.start(auto_calibration=True, auto_monitoring=True)
print("✅ 系统已启动")
print("=" * 60)
try:
# 3. 验证自动启动状态
print("\n🔍 验证自动启动状态...")
calibration_status = control_system.get_calibration_status()
if calibration_status:
print(f" 校准状态: {calibration_status}")
if calibration_status['calibrating']:
print(" ⚠️ 校准仍在进行中...")
else:
print(" ✅ 校准已完成")
else:
print(" 无法获取校准状态")
monitoring_status = control_system.get_monitoring_status()
if monitoring_status:
print(f" 监听状态: {monitoring_status}")
if monitoring_status['enabled']:
print(" ✅ 监听已启用")
print(f" 音频流状态: {'活跃' if monitoring_status['audio_stream_active'] else '非活跃'}")
else:
print(" ❌ 监听未启用")
else:
print(" 无法获取监听状态")
# 如果校准未完成,等待一下
if calibration_status and calibration_status['calibrating']:
print("\n⏱️ 等待校准完成...")
if control_system.wait_for_calibration_complete(timeout=15):
print("✅ 校准完成")
else:
print("⚠️ 校准超时")
# 如果监听未启用,尝试启用
if monitoring_status and not monitoring_status['enabled']:
print("\n🎯 尝试启用监听...")
success = control_system.start_monitoring()
if success:
print("✅ 监听已启用")
else:
print("❌ 监听启用失败")
# 8. 运行一段时间进行演示
print("\n🎙️ 系统正在运行...")
print("💡 请说话测试语音检测和录音功能")
print("📊 可以看到实时状态更新")
print("⏱️ 运行30秒后自动停止...")
# 启动控制循环(简化版)
start_time = time.time()
while time.time() - start_time < 30:
# 检查状态
try:
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
print("\n👋 用户中断")
break
# 9. 测试停止监听功能
print("\n🎯 测试停止监听功能...")
success = control_system.stop_monitoring()
if success:
print("✅ 停止监听命令发送成功")
else:
print("❌ 停止监听命令发送失败")
# 10. 验证停止状态
print("\n🔍 验证停止状态...")
time.sleep(1) # 等待状态更新
monitoring_status = control_system.get_monitoring_status()
if monitoring_status and not monitoring_status['enabled']:
print("✅ 监听已停止")
else:
print("❌ 监听未正确停止")
# 11. 测试重新启动监听
print("\n🎯 测试重新启动监听...")
success = control_system.start_monitoring()
if success:
print("✅ 监听重新启动成功")
time.sleep(1)
monitoring_status = control_system.get_monitoring_status()
if monitoring_status and monitoring_status['enabled']:
print("✅ 监听状态正常")
else:
print("❌ 监听重新启动失败")
print("\n🎉 演示完成!")
except KeyboardInterrupt:
print("\n👋 用户中断")
except Exception as e:
print(f"❌ 运行错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理
print("\n🛑 清理系统...")
control_system.shutdown()
print("✅ 系统已关闭")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,3 @@
2025-09-21 18:27:29 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 18:27:29 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_182729.log
2025-09-21 18:27:29 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动

View File

@ -0,0 +1,3 @@
2025-09-21 18:32:04 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 18:32:04 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_183204.log
2025-09-21 18:32:04 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动

View File

@ -288,8 +288,9 @@ def main():
if args.verbose:
control_system.config['system']['log_level'] = "DEBUG"
# 启动系统
control_system.start()
# 启动系统(自动校准和监听)
print("🚀 启动系统(包含自动校准和监听)...")
control_system.start(auto_calibration=True, auto_monitoring=True)
except KeyboardInterrupt:
print("\n👋 用户中断")

194
test_auto_start.py Normal file
View File

@ -0,0 +1,194 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试新的自动校准和监听启动流程
"""
import sys
import os
import time
import signal
from control_system import ControlSystem
def signal_handler(sig, frame):
"""信号处理函数"""
print('\n👋 收到中断信号,正在关闭系统...')
if 'control_system' in globals():
control_system.shutdown()
sys.exit(0)
def main():
"""主函数"""
print("🧪 测试自动校准和监听启动流程")
print("=" * 60)
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
try:
# 1. 创建控制系统
print("📦 创建控制系统...")
config = {
'system': {
'max_queue_size': 1000,
'process_timeout': 30,
'heartbeat_interval': 1.0,
'log_level': "INFO"
},
'audio': {
'sample_rate': 16000,
'channels': 1,
'chunk_size': 1024
},
'recording': {
'min_duration': 2.0,
'max_duration': 30.0,
'silence_threshold': 3.0
},
'processing': {
'enable_asr': True,
'enable_llm': True,
'enable_tts': True,
'character': 'libai'
}
}
global control_system
control_system = ControlSystem(config)
# 设置角色
control_system.config['processing']['character'] = 'libai'
print("✅ 控制系统创建完成")
# 2. 测试手动启动流程
print("\n🎯 测试步骤1手动启动流程")
print("-" * 40)
# 2.1 启动进程但不自动校准和监听
print("🔧 启动进程...")
control_system._start_processes()
time.sleep(2) # 等待进程启动
# 2.2 检查初始状态
print("\n🔍 检查初始状态...")
monitoring_status = control_system.get_monitoring_status()
calibration_status = control_system.get_calibration_status()
print(f" 监听状态: {monitoring_status}")
print(f" 校准状态: {calibration_status}")
# 2.3 手动启动校准
print("\n🎯 手动启动校准...")
success = control_system.start_calibration()
print(f" 校准启动结果: {success}")
if success:
print("\n⏱️ 等待校准完成...")
if control_system.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
else:
print("⚠️ 校准超时")
# 2.4 手动启动监听
print("\n🎯 手动启动监听...")
success = control_system.start_monitoring()
print(f" 监听启动结果: {success}")
time.sleep(1) # 等待监听启动
# 2.5 验证监听状态
monitoring_status = control_system.get_monitoring_status()
print(f" 监听状态: {monitoring_status}")
# 3. 运行一段时间测试
print("\n🎙️ 系统运行测试")
print("-" * 40)
print("💡 请说话测试语音检测功能")
print("⏱️ 运行10秒...")
start_time = time.time()
while time.time() - start_time < 10:
try:
control_system.check_events()
time.sleep(0.1)
except KeyboardInterrupt:
break
# 4. 停止监听
print("\n🛑 停止监听...")
success = control_system.stop_monitoring()
print(f" 停止结果: {success}")
time.sleep(1)
# 5. 关闭系统
print("\n🔄 关闭当前系统...")
control_system.shutdown()
time.sleep(2)
# 6. 测试自动启动流程
print("\n🎯 测试步骤2自动启动流程")
print("-" * 40)
# 创建新的控制系统实例
control_system = ControlSystem(config)
control_system.config['processing']['character'] = 'libai'
print("\n🚀 启动系统(自动校准和监听)...")
# 这里我们只启动进程,不进入主循环
control_system._start_processes()
# 手动执行自动校准和监听流程
print("\n🎯 自动启动校准...")
success = control_system.start_calibration()
if success and control_system.wait_for_calibration_complete(timeout=30):
print("✅ 自动校准完成")
print("\n🎯 自动启动监听...")
success = control_system.start_monitoring()
if success:
print("✅ 自动监听启动完成")
time.sleep(2)
# 检查最终状态
monitoring_status = control_system.get_monitoring_status()
calibration_status = control_system.get_calibration_status()
print(f"\n📊 最终状态:")
print(f" 监听状态: {monitoring_status}")
print(f" 校准状态: {calibration_status}")
# 7. 运行一段时间
print("\n🎙️ 最终运行测试")
print("-" * 40)
print("💡 请说话测试最终功能")
print("⏱️ 运行10秒...")
start_time = time.time()
while time.time() - start_time < 10:
try:
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
break
print("\n🎉 测试完成!")
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
finally:
# 确保系统关闭
if 'control_system' in globals():
print("\n🛑 确保系统关闭...")
control_system.shutdown()
print("✅ 测试结束")
if __name__ == "__main__":
main()