#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 高级语音检测器 结合能量+ZCR双重检测的自适应语音检测算法 针对16000Hz采样率优化 """ import numpy as np import time from collections import deque from typing import Dict, Any, Optional import pyaudio class EnhancedVoiceDetector: """增强版语音检测器""" def __init__(self, sample_rate=16000, chunk_size=1024): self.sample_rate = sample_rate self.chunk_size = chunk_size # 历史数据窗口 self.energy_window = deque(maxlen=100) self.zcr_window = deque(maxlen=100) # 统计信息 self.energy_stats = { 'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0, 'median': 0, 'q75': 0, 'q25': 0 } self.zcr_stats = { 'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0, 'median': 0, 'q75': 0, 'q25': 0 } # 检测参数 self.calibration_mode = True self.calibration_samples = 0 self.required_calibration = 100 # 需要100个样本来校准 # 自适应参数 - 调整为更敏感 self.energy_multiplier = 1.0 # 能量阈值倍数(降低) self.zcr_std_multiplier = 1.0 # ZCR标准差倍数(降低) self.min_energy_threshold = 80 # 最小能量阈值(降低) self.consecutive_voice_threshold = 2 # 连续语音检测阈值(降低) self.consecutive_silence_threshold = 15 # 连续静音检测阈值(增加) # 状态跟踪 self.consecutive_voice_count = 0 self.consecutive_silence_count = 0 self.last_voice_time = 0 # 调试信息 self.debug_mode = True self.voice_count = 0 self.total_samples = 0 self._last_voice_state = False def calculate_energy(self, audio_data: bytes) -> float: """计算音频能量(RMS)""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) # RMS能量计算 rms = np.sqrt(np.mean(audio_array.astype(float) ** 2)) return rms def calculate_zcr(self, audio_data: bytes) -> float: """计算零交叉率""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) zcr = zero_crossings / len(audio_array) * self.sample_rate return zcr def update_statistics(self, energy: float, zcr: float): """更新统计信息""" self.energy_window.append(energy) self.zcr_window.append(zcr) if len(self.energy_window) >= 20: # 计算详细统计信息 energy_array = np.array(self.energy_window) zcr_array = np.array(self.zcr_window) # 基础统计 self.energy_stats['mean'] = np.mean(energy_array) self.energy_stats['std'] = np.std(energy_array) self.energy_stats['min'] = np.min(energy_array) self.energy_stats['max'] = np.max(energy_array) self.energy_stats['median'] = np.median(energy_array) self.energy_stats['q25'] = np.percentile(energy_array, 25) self.energy_stats['q75'] = np.percentile(energy_array, 75) self.zcr_stats['mean'] = np.mean(zcr_array) self.zcr_stats['std'] = np.std(zcr_array) self.zcr_stats['min'] = np.min(zcr_array) self.zcr_stats['max'] = np.max(zcr_array) self.zcr_stats['median'] = np.median(zcr_array) self.zcr_stats['q25'] = np.percentile(zcr_array, 25) self.zcr_stats['q75'] = np.percentile(zcr_array, 75) def get_adaptive_thresholds(self) -> Dict[str, float]: """获取自适应阈值""" if len(self.energy_window) < 30: # 使用更敏感的固定阈值 return { 'energy_threshold': 120, 'zcr_min': 2000, 'zcr_max': 13000 } # 计算动态能量阈值 - 使用更合理的算法 # 基于中位数和标准差,但使用更保守的倍数 base_energy_threshold = (self.energy_stats['median'] + self.energy_multiplier * self.energy_stats['std']) # 使用四分位数来避免异常值影响 q75 = self.energy_stats['q75'] q25 = self.energy_stats['q25'] iqr = q75 - q25 # 四分位距 # 基于IQR的鲁棒阈值 - 更敏感 iqr_threshold = q75 + 0.5 * iqr # 结合两种方法的阈值 - 使用更低的阈值 energy_threshold = max(self.min_energy_threshold, min(base_energy_threshold * 0.7, iqr_threshold)) # 计算动态ZCR阈值 zcr_center = self.zcr_stats['median'] zcr_spread = self.zcr_std_multiplier * self.zcr_stats['std'] # 确保ZCR范围在合理区间内 - 更宽松 zcr_min = max(1500, min(3000, zcr_center - zcr_spread)) zcr_max = min(14000, max(6000, zcr_center + zcr_spread * 2.0)) # 确保最小范围 if zcr_max - zcr_min < 2000: zcr_max = zcr_min + 2000 return { 'energy_threshold': energy_threshold, 'zcr_min': zcr_min, 'zcr_max': zcr_max } def is_voice_basic(self, energy: float, zcr: float) -> bool: """基础语音检测(单帧)""" thresholds = self.get_adaptive_thresholds() # 能量检测 energy_ok = energy > thresholds['energy_threshold'] # ZCR检测 zcr_ok = thresholds['zcr_min'] < zcr < thresholds['zcr_max'] # 双重条件 return energy_ok and zcr_ok def is_voice_advanced(self, audio_data: bytes) -> Dict[str, Any]: """高级语音检测(带状态跟踪)""" # 计算特征 energy = self.calculate_energy(audio_data) zcr = self.calculate_zcr(audio_data) # 更新统计 self.update_statistics(energy, zcr) # 总样本计数 self.total_samples += 1 # 校准模式 if self.calibration_mode: self.calibration_samples += 1 if self.calibration_samples >= self.required_calibration: self.calibration_mode = False if self.debug_mode: print(f"\n🎯 校准完成!") print(f" 能量统计: {self.energy_stats['median']:.0f}±{self.energy_stats['std']:.0f}") print(f" ZCR统计: {self.zcr_stats['median']:.0f}±{self.zcr_stats['std']:.0f}") return { 'is_voice': False, 'energy': energy, 'zcr': zcr, 'calibrating': True, 'calibration_progress': self.calibration_samples / self.required_calibration, 'confidence': 0.0 } # 基础检测 is_voice_frame = self.is_voice_basic(energy, zcr) # 状态机处理 if is_voice_frame: self.consecutive_voice_count += 1 self.consecutive_silence_count = 0 self.last_voice_time = time.time() else: self.consecutive_silence_count += 1 if self.consecutive_silence_count >= self.consecutive_silence_threshold: self.consecutive_voice_count = 0 # 最终决策(需要连续检测到语音) final_voice_detected = self.consecutive_voice_count >= self.consecutive_voice_threshold if final_voice_detected and not hasattr(self, '_last_voice_state') or not self._last_voice_state: self.voice_count += 1 # 更新最后状态 self._last_voice_state = final_voice_detected # 计算置信度 thresholds = self.get_adaptive_thresholds() energy_confidence = min(1.0, energy / thresholds['energy_threshold']) zcr_confidence = 1.0 if thresholds['zcr_min'] < zcr < thresholds['zcr_max'] else 0.0 confidence = (energy_confidence + zcr_confidence) / 2 return { 'is_voice': final_voice_detected, 'energy': energy, 'zcr': zcr, 'confidence': confidence, 'energy_threshold': thresholds['energy_threshold'], 'zcr_min': thresholds['zcr_min'], 'zcr_max': thresholds['zcr_max'], 'consecutive_voice_count': self.consecutive_voice_count, 'consecutive_silence_count': self.consecutive_silence_count, 'calibrating': False, 'voice_detection_rate': self.voice_count / self.total_samples if self.total_samples > 0 else 0 } def get_debug_info(self) -> str: """获取调试信息""" if self.calibration_mode: return f"校准中: {self.calibration_samples}/{self.required_calibration}" thresholds = self.get_adaptive_thresholds() return (f"能量阈值: {thresholds['energy_threshold']:.0f} | " f"ZCR范围: {thresholds['zcr_min']:.0f}-{thresholds['zcr_max']:.0f} | " f"检测率: {self.voice_count}/{self.total_samples} ({self.voice_count/self.total_samples*100:.1f}%)") def reset(self): """重置检测器状态""" self.energy_window.clear() self.zcr_window.clear() self.calibration_mode = True self.calibration_samples = 0 self.consecutive_voice_count = 0 self.consecutive_silence_count = 0 self.voice_count = 0 self.total_samples = 0 class VoiceDetectorTester: """语音检测器测试器""" def __init__(self): self.detector = EnhancedVoiceDetector() def run_test(self, duration=30): """运行测试""" print("🎙️ 增强版语音检测器测试") print("=" * 50) print("📊 检测算法: 能量+ZCR双重检测") print("📈 采样率: 16000Hz") print("🔄 自适应阈值: 启用") print("⏱️ 测试时长: 30秒") print("💡 请说话测试检测效果...") print("🛑 按 Ctrl+C 提前结束") print("=" * 50) try: # 初始化音频 audio = pyaudio.PyAudio() stream = audio.open( format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=1024 ) start_time = time.time() voice_segments = [] current_segment = None while time.time() - start_time < duration: # 读取音频数据 data = stream.read(1024, exception_on_overflow=False) # 检测语音 result = self.detector.is_voice_advanced(data) # 处理语音段 if result['is_voice']: if current_segment is None: current_segment = { 'start_time': time.time(), 'start_sample': self.detector.total_samples } else: if current_segment is not None: current_segment['end_time'] = time.time() current_segment['end_sample'] = self.detector.total_samples current_segment['duration'] = current_segment['end_time'] - current_segment['start_time'] voice_segments.append(current_segment) current_segment = None # 显示状态 if result['calibrating']: progress = result['calibration_progress'] * 100 status = f"\r🔧 校准中: {progress:.0f}% | 能量: {result['energy']:.0f} | ZCR: {result['zcr']:.0f}" else: status_icon = "🎤" if result['is_voice'] else "🔇" status_color = "\033[92m" if result['is_voice'] else "\033[90m" reset_color = "\033[0m" status = (f"{status_color}{status_icon} " f"能量: {result['energy']:.0f}/{result['energy_threshold']:.0f} | " f"ZCR: {result['zcr']:.0f} ({result['zcr_min']:.0f}-{result['zcr_max']:.0f}) | " f"置信度: {result['confidence']:.2f} | " f"连续: {result['consecutive_voice_count']}/{result['consecutive_silence_count']}{reset_color}") print(f"\r{status}", end='', flush=True) time.sleep(0.01) # 结束当前段 if current_segment is not None: current_segment['end_time'] = time.time() current_segment['duration'] = current_segment['end_time'] - current_segment['start_time'] voice_segments.append(current_segment) # 显示统计结果 print(f"\n\n📊 测试结果统计:") print(f" 总检测时长: {duration}秒") print(f" 检测到语音段: {len(voice_segments)}") print(f" 总语音时长: {sum(s['duration'] for s in voice_segments):.1f}秒") print(f" 语音占比: {sum(s['duration'] for s in voice_segments)/duration*100:.1f}%") print(f" 平均置信度: {np.mean([r['confidence'] for r in [self.detector.is_voice_advanced(b'test') for _ in range(10)]]):.2f}") if voice_segments: print(f" 平均语音段时长: {np.mean([s['duration'] for s in voice_segments]):.1f}秒") print(f" 最长语音段: {max(s['duration'] for s in voice_segments):.1f}秒") print(f"\n🎯 检测器状态:") print(f" {self.detector.get_debug_info()}") except KeyboardInterrupt: print(f"\n\n🛑 测试被用户中断") except Exception as e: print(f"\n\n❌ 测试出错: {e}") finally: try: if 'stream' in locals(): stream.stop_stream() stream.close() if 'audio' in locals(): audio.terminate() except: pass def main(): """主函数""" tester = VoiceDetectorTester() tester.run_test() if __name__ == "__main__": main()