377 lines
15 KiB
Python
377 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
高级语音检测器
|
||
结合能量+ZCR双重检测的自适应语音检测算法
|
||
针对16000Hz采样率优化
|
||
"""
|
||
|
||
import numpy as np
|
||
import time
|
||
from collections import deque
|
||
from typing import Dict, Any, Optional
|
||
import pyaudio
|
||
|
||
class EnhancedVoiceDetector:
|
||
"""增强版语音检测器"""
|
||
|
||
def __init__(self, sample_rate=16000, chunk_size=1024):
|
||
self.sample_rate = sample_rate
|
||
self.chunk_size = chunk_size
|
||
|
||
# 历史数据窗口
|
||
self.energy_window = deque(maxlen=100)
|
||
self.zcr_window = deque(maxlen=100)
|
||
|
||
# 统计信息
|
||
self.energy_stats = {
|
||
'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0,
|
||
'median': 0, 'q75': 0, 'q25': 0
|
||
}
|
||
self.zcr_stats = {
|
||
'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0,
|
||
'median': 0, 'q75': 0, 'q25': 0
|
||
}
|
||
|
||
# 检测参数
|
||
self.calibration_mode = True
|
||
self.calibration_samples = 0
|
||
self.required_calibration = 100 # 需要100个样本来校准
|
||
|
||
# 自适应参数 - 调整为更敏感
|
||
self.energy_multiplier = 1.0 # 能量阈值倍数(降低)
|
||
self.zcr_std_multiplier = 1.0 # ZCR标准差倍数(降低)
|
||
self.min_energy_threshold = 80 # 最小能量阈值(降低)
|
||
self.consecutive_voice_threshold = 2 # 连续语音检测阈值(降低)
|
||
self.consecutive_silence_threshold = 15 # 连续静音检测阈值(增加)
|
||
|
||
# 状态跟踪
|
||
self.consecutive_voice_count = 0
|
||
self.consecutive_silence_count = 0
|
||
self.last_voice_time = 0
|
||
|
||
# 调试信息
|
||
self.debug_mode = True
|
||
self.voice_count = 0
|
||
self.total_samples = 0
|
||
self._last_voice_state = False
|
||
|
||
def calculate_energy(self, audio_data: bytes) -> float:
|
||
"""计算音频能量(RMS)"""
|
||
if len(audio_data) == 0:
|
||
return 0
|
||
|
||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||
# RMS能量计算
|
||
rms = np.sqrt(np.mean(audio_array.astype(float) ** 2))
|
||
return rms
|
||
|
||
def calculate_zcr(self, audio_data: bytes) -> float:
|
||
"""计算零交叉率"""
|
||
if len(audio_data) == 0:
|
||
return 0
|
||
|
||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
|
||
zcr = zero_crossings / len(audio_array) * self.sample_rate
|
||
return zcr
|
||
|
||
def update_statistics(self, energy: float, zcr: float):
|
||
"""更新统计信息"""
|
||
self.energy_window.append(energy)
|
||
self.zcr_window.append(zcr)
|
||
|
||
if len(self.energy_window) >= 20:
|
||
# 计算详细统计信息
|
||
energy_array = np.array(self.energy_window)
|
||
zcr_array = np.array(self.zcr_window)
|
||
|
||
# 基础统计
|
||
self.energy_stats['mean'] = np.mean(energy_array)
|
||
self.energy_stats['std'] = np.std(energy_array)
|
||
self.energy_stats['min'] = np.min(energy_array)
|
||
self.energy_stats['max'] = np.max(energy_array)
|
||
self.energy_stats['median'] = np.median(energy_array)
|
||
self.energy_stats['q25'] = np.percentile(energy_array, 25)
|
||
self.energy_stats['q75'] = np.percentile(energy_array, 75)
|
||
|
||
self.zcr_stats['mean'] = np.mean(zcr_array)
|
||
self.zcr_stats['std'] = np.std(zcr_array)
|
||
self.zcr_stats['min'] = np.min(zcr_array)
|
||
self.zcr_stats['max'] = np.max(zcr_array)
|
||
self.zcr_stats['median'] = np.median(zcr_array)
|
||
self.zcr_stats['q25'] = np.percentile(zcr_array, 25)
|
||
self.zcr_stats['q75'] = np.percentile(zcr_array, 75)
|
||
|
||
def get_adaptive_thresholds(self) -> Dict[str, float]:
|
||
"""获取自适应阈值"""
|
||
if len(self.energy_window) < 30:
|
||
# 使用更敏感的固定阈值
|
||
return {
|
||
'energy_threshold': 120,
|
||
'zcr_min': 2000,
|
||
'zcr_max': 13000
|
||
}
|
||
|
||
# 计算动态能量阈值 - 使用更合理的算法
|
||
# 基于中位数和标准差,但使用更保守的倍数
|
||
base_energy_threshold = (self.energy_stats['median'] +
|
||
self.energy_multiplier * self.energy_stats['std'])
|
||
|
||
# 使用四分位数来避免异常值影响
|
||
q75 = self.energy_stats['q75']
|
||
q25 = self.energy_stats['q25']
|
||
iqr = q75 - q25 # 四分位距
|
||
|
||
# 基于IQR的鲁棒阈值 - 更敏感
|
||
iqr_threshold = q75 + 0.5 * iqr
|
||
|
||
# 结合两种方法的阈值 - 使用更低的阈值
|
||
energy_threshold = max(self.min_energy_threshold,
|
||
min(base_energy_threshold * 0.7, iqr_threshold))
|
||
|
||
# 计算动态ZCR阈值
|
||
zcr_center = self.zcr_stats['median']
|
||
zcr_spread = self.zcr_std_multiplier * self.zcr_stats['std']
|
||
|
||
# 确保ZCR范围在合理区间内 - 更宽松
|
||
zcr_min = max(1500, min(3000, zcr_center - zcr_spread))
|
||
zcr_max = min(14000, max(6000, zcr_center + zcr_spread * 2.0))
|
||
|
||
# 确保最小范围
|
||
if zcr_max - zcr_min < 2000:
|
||
zcr_max = zcr_min + 2000
|
||
|
||
return {
|
||
'energy_threshold': energy_threshold,
|
||
'zcr_min': zcr_min,
|
||
'zcr_max': zcr_max
|
||
}
|
||
|
||
def is_voice_basic(self, energy: float, zcr: float) -> bool:
|
||
"""基础语音检测(单帧)"""
|
||
thresholds = self.get_adaptive_thresholds()
|
||
|
||
# 能量检测
|
||
energy_ok = energy > thresholds['energy_threshold']
|
||
|
||
# ZCR检测
|
||
zcr_ok = thresholds['zcr_min'] < zcr < thresholds['zcr_max']
|
||
|
||
# 双重条件
|
||
return energy_ok and zcr_ok
|
||
|
||
def is_voice_advanced(self, audio_data: bytes) -> Dict[str, Any]:
|
||
"""高级语音检测(带状态跟踪)"""
|
||
# 计算特征
|
||
energy = self.calculate_energy(audio_data)
|
||
zcr = self.calculate_zcr(audio_data)
|
||
|
||
# 更新统计
|
||
self.update_statistics(energy, zcr)
|
||
|
||
# 总样本计数
|
||
self.total_samples += 1
|
||
|
||
# 校准模式
|
||
if self.calibration_mode:
|
||
self.calibration_samples += 1
|
||
if self.calibration_samples >= self.required_calibration:
|
||
self.calibration_mode = False
|
||
if self.debug_mode:
|
||
print(f"\n🎯 校准完成!")
|
||
print(f" 能量统计: {self.energy_stats['median']:.0f}±{self.energy_stats['std']:.0f}")
|
||
print(f" ZCR统计: {self.zcr_stats['median']:.0f}±{self.zcr_stats['std']:.0f}")
|
||
|
||
return {
|
||
'is_voice': False,
|
||
'energy': energy,
|
||
'zcr': zcr,
|
||
'calibrating': True,
|
||
'calibration_progress': self.calibration_samples / self.required_calibration,
|
||
'confidence': 0.0
|
||
}
|
||
|
||
# 基础检测
|
||
is_voice_frame = self.is_voice_basic(energy, zcr)
|
||
|
||
# 状态机处理
|
||
if is_voice_frame:
|
||
self.consecutive_voice_count += 1
|
||
self.consecutive_silence_count = 0
|
||
self.last_voice_time = time.time()
|
||
else:
|
||
self.consecutive_silence_count += 1
|
||
if self.consecutive_silence_count >= self.consecutive_silence_threshold:
|
||
self.consecutive_voice_count = 0
|
||
|
||
# 最终决策(需要连续检测到语音)
|
||
final_voice_detected = self.consecutive_voice_count >= self.consecutive_voice_threshold
|
||
|
||
if final_voice_detected and not hasattr(self, '_last_voice_state') or not self._last_voice_state:
|
||
self.voice_count += 1
|
||
|
||
# 更新最后状态
|
||
self._last_voice_state = final_voice_detected
|
||
|
||
# 计算置信度
|
||
thresholds = self.get_adaptive_thresholds()
|
||
energy_confidence = min(1.0, energy / thresholds['energy_threshold'])
|
||
zcr_confidence = 1.0 if thresholds['zcr_min'] < zcr < thresholds['zcr_max'] else 0.0
|
||
confidence = (energy_confidence + zcr_confidence) / 2
|
||
|
||
return {
|
||
'is_voice': final_voice_detected,
|
||
'energy': energy,
|
||
'zcr': zcr,
|
||
'confidence': confidence,
|
||
'energy_threshold': thresholds['energy_threshold'],
|
||
'zcr_min': thresholds['zcr_min'],
|
||
'zcr_max': thresholds['zcr_max'],
|
||
'consecutive_voice_count': self.consecutive_voice_count,
|
||
'consecutive_silence_count': self.consecutive_silence_count,
|
||
'calibrating': False,
|
||
'voice_detection_rate': self.voice_count / self.total_samples if self.total_samples > 0 else 0
|
||
}
|
||
|
||
def get_debug_info(self) -> str:
|
||
"""获取调试信息"""
|
||
if self.calibration_mode:
|
||
return f"校准中: {self.calibration_samples}/{self.required_calibration}"
|
||
|
||
thresholds = self.get_adaptive_thresholds()
|
||
return (f"能量阈值: {thresholds['energy_threshold']:.0f} | "
|
||
f"ZCR范围: {thresholds['zcr_min']:.0f}-{thresholds['zcr_max']:.0f} | "
|
||
f"检测率: {self.voice_count}/{self.total_samples} ({self.voice_count/self.total_samples*100:.1f}%)")
|
||
|
||
def reset(self):
|
||
"""重置检测器状态"""
|
||
self.energy_window.clear()
|
||
self.zcr_window.clear()
|
||
self.calibration_mode = True
|
||
self.calibration_samples = 0
|
||
self.consecutive_voice_count = 0
|
||
self.consecutive_silence_count = 0
|
||
self.voice_count = 0
|
||
self.total_samples = 0
|
||
|
||
|
||
class VoiceDetectorTester:
|
||
"""语音检测器测试器"""
|
||
|
||
def __init__(self):
|
||
self.detector = EnhancedVoiceDetector()
|
||
|
||
def run_test(self, duration=30):
|
||
"""运行测试"""
|
||
print("🎙️ 增强版语音检测器测试")
|
||
print("=" * 50)
|
||
print("📊 检测算法: 能量+ZCR双重检测")
|
||
print("📈 采样率: 16000Hz")
|
||
print("🔄 自适应阈值: 启用")
|
||
print("⏱️ 测试时长: 30秒")
|
||
print("💡 请说话测试检测效果...")
|
||
print("🛑 按 Ctrl+C 提前结束")
|
||
print("=" * 50)
|
||
|
||
try:
|
||
# 初始化音频
|
||
audio = pyaudio.PyAudio()
|
||
stream = audio.open(
|
||
format=pyaudio.paInt16,
|
||
channels=1,
|
||
rate=16000,
|
||
input=True,
|
||
frames_per_buffer=1024
|
||
)
|
||
|
||
start_time = time.time()
|
||
voice_segments = []
|
||
current_segment = None
|
||
|
||
while time.time() - start_time < duration:
|
||
# 读取音频数据
|
||
data = stream.read(1024, exception_on_overflow=False)
|
||
|
||
# 检测语音
|
||
result = self.detector.is_voice_advanced(data)
|
||
|
||
# 处理语音段
|
||
if result['is_voice']:
|
||
if current_segment is None:
|
||
current_segment = {
|
||
'start_time': time.time(),
|
||
'start_sample': self.detector.total_samples
|
||
}
|
||
else:
|
||
if current_segment is not None:
|
||
current_segment['end_time'] = time.time()
|
||
current_segment['end_sample'] = self.detector.total_samples
|
||
current_segment['duration'] = current_segment['end_time'] - current_segment['start_time']
|
||
voice_segments.append(current_segment)
|
||
current_segment = None
|
||
|
||
# 显示状态
|
||
if result['calibrating']:
|
||
progress = result['calibration_progress'] * 100
|
||
status = f"\r🔧 校准中: {progress:.0f}% | 能量: {result['energy']:.0f} | ZCR: {result['zcr']:.0f}"
|
||
else:
|
||
status_icon = "🎤" if result['is_voice'] else "🔇"
|
||
status_color = "\033[92m" if result['is_voice'] else "\033[90m"
|
||
reset_color = "\033[0m"
|
||
|
||
status = (f"{status_color}{status_icon} "
|
||
f"能量: {result['energy']:.0f}/{result['energy_threshold']:.0f} | "
|
||
f"ZCR: {result['zcr']:.0f} ({result['zcr_min']:.0f}-{result['zcr_max']:.0f}) | "
|
||
f"置信度: {result['confidence']:.2f} | "
|
||
f"连续: {result['consecutive_voice_count']}/{result['consecutive_silence_count']}{reset_color}")
|
||
|
||
print(f"\r{status}", end='', flush=True)
|
||
|
||
time.sleep(0.01)
|
||
|
||
# 结束当前段
|
||
if current_segment is not None:
|
||
current_segment['end_time'] = time.time()
|
||
current_segment['duration'] = current_segment['end_time'] - current_segment['start_time']
|
||
voice_segments.append(current_segment)
|
||
|
||
# 显示统计结果
|
||
print(f"\n\n📊 测试结果统计:")
|
||
print(f" 总检测时长: {duration}秒")
|
||
print(f" 检测到语音段: {len(voice_segments)}")
|
||
print(f" 总语音时长: {sum(s['duration'] for s in voice_segments):.1f}秒")
|
||
print(f" 语音占比: {sum(s['duration'] for s in voice_segments)/duration*100:.1f}%")
|
||
print(f" 平均置信度: {np.mean([r['confidence'] for r in [self.detector.is_voice_advanced(b'test') for _ in range(10)]]):.2f}")
|
||
|
||
if voice_segments:
|
||
print(f" 平均语音段时长: {np.mean([s['duration'] for s in voice_segments]):.1f}秒")
|
||
print(f" 最长语音段: {max(s['duration'] for s in voice_segments):.1f}秒")
|
||
|
||
print(f"\n🎯 检测器状态:")
|
||
print(f" {self.detector.get_debug_info()}")
|
||
|
||
except KeyboardInterrupt:
|
||
print(f"\n\n🛑 测试被用户中断")
|
||
except Exception as e:
|
||
print(f"\n\n❌ 测试出错: {e}")
|
||
finally:
|
||
try:
|
||
if 'stream' in locals():
|
||
stream.stop_stream()
|
||
stream.close()
|
||
if 'audio' in locals():
|
||
audio.terminate()
|
||
except:
|
||
pass
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
tester = VoiceDetectorTester()
|
||
tester.run_test()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |