#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 语音检测测试脚本 用于测试和调试ZCR语音检测功能 """ import numpy as np import time import pyaudio from audio_processes import InputProcess import multiprocessing as mp import queue class VoiceDetectionTester: """语音检测测试器""" def __init__(self): self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 self.CHUNK_SIZE = 1024 # 测试参数 self.test_duration = 30 # 测试30秒 self.zcr_history = [] self.voice_count = 0 # 音频设备 self.audio = None self.stream = None def setup_audio(self): """设置音频设备""" try: self.audio = pyaudio.PyAudio() self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE ) print("✅ 音频设备初始化成功") return True except Exception as e: print(f"❌ 音频设备初始化失败: {e}") return False def calculate_zcr(self, audio_data): """计算零交叉率""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) zcr = zero_crossings / len(audio_array) * self.RATE return zcr def test_detection(self): """测试语音检测""" print("🎙️ 开始语音检测测试") print("=" * 50) # 环境校准阶段 print("🔍 第一阶段:环境噪音校准 (10秒)") print("请保持安静,不要说话...") calibration_samples = [] start_time = time.time() try: while time.time() - start_time < 10: data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) if len(data) > 0: zcr = self.calculate_zcr(data) calibration_samples.append(zcr) # 显示进度 progress = (time.time() - start_time) / 10 * 100 print(f"\r校准进度: {progress:.1f}%", end='', flush=True) time.sleep(0.01) print("\n✅ 环境校准完成") # 计算统计数据 if calibration_samples: avg_zcr = np.mean(calibration_samples) std_zcr = np.std(calibration_samples) min_zcr = min(calibration_samples) max_zcr = max(calibration_samples) print(f"📊 环境噪音统计:") print(f" 平均ZCR: {avg_zcr:.0f}") print(f" 标准差: {std_zcr:.0f}") print(f" 最小值: {min_zcr:.0f}") print(f" 最大值: {max_zcr:.0f}") # 建议的检测阈值 suggested_min = max(2400, avg_zcr + 2 * std_zcr) suggested_max = min(12000, avg_zcr + 6 * std_zcr) print(f"\n🎯 建议的语音检测阈值:") print(f" 最小阈值: {suggested_min:.0f}") print(f" 最大阈值: {suggested_max:.0f}") # 测试检测 print(f"\n🎙️ 第二阶段:语音检测测试 (20秒)") print("现在请说话,测试语音检测...") voice_threshold = suggested_min silence_threshold = suggested_max consecutive_voice = 0 voice_detected = False test_start = time.time() while time.time() - test_start < 20: data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) if len(data) > 0: zcr = self.calculate_zcr(data) # 简单的语音检测 is_voice = voice_threshold < zcr < silence_threshold if is_voice: consecutive_voice += 1 if consecutive_voice >= 5 and not voice_detected: voice_detected = True self.voice_count += 1 print(f"\n🎤 检测到语音 #{self.voice_count}! ZCR: {zcr:.0f}") else: consecutive_voice = 0 if voice_detected: voice_detected = False print(f" 语音结束,持续时间: {time.time() - last_voice_time:.1f}秒") if voice_detected: last_voice_time = time.time() # 实时显示ZCR值 status = "🎤" if voice_detected else "🔇" print(f"\r{status} ZCR: {zcr:.0f} | 阈值: {voice_threshold:.0f}-{silence_threshold:.0f} | " f"连续语音: {consecutive_voice}/5", end='', flush=True) time.sleep(0.01) print(f"\n\n✅ 测试完成!共检测到 {self.voice_count} 次语音") except KeyboardInterrupt: print("\n🛑 测试被用户中断") except Exception as e: print(f"\n❌ 测试过程中出错: {e}") def cleanup(self): """清理资源""" if self.stream: try: self.stream.stop_stream() self.stream.close() except: pass if self.audio: try: self.audio.terminate() except: pass def run_test(self): """运行完整测试""" print("🚀 语音检测测试工具") print("=" * 60) if not self.setup_audio(): print("❌ 无法初始化音频设备,测试终止") return try: self.test_detection() finally: self.cleanup() print("\n👋 测试结束") def main(): """主函数""" tester = VoiceDetectionTester() tester.run_test() if __name__ == "__main__": main()