Local-Voice/enhanced_voice_detector.py
2025-09-21 03:00:11 +08:00

377 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
高级语音检测器
结合能量+ZCR双重检测的自适应语音检测算法
针对16000Hz采样率优化
"""
import numpy as np
import time
from collections import deque
from typing import Dict, Any, Optional
import pyaudio
class EnhancedVoiceDetector:
"""增强版语音检测器"""
def __init__(self, sample_rate=16000, chunk_size=1024):
self.sample_rate = sample_rate
self.chunk_size = chunk_size
# 历史数据窗口
self.energy_window = deque(maxlen=100)
self.zcr_window = deque(maxlen=100)
# 统计信息
self.energy_stats = {
'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0,
'median': 0, 'q75': 0, 'q25': 0
}
self.zcr_stats = {
'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0,
'median': 0, 'q75': 0, 'q25': 0
}
# 检测参数
self.calibration_mode = True
self.calibration_samples = 0
self.required_calibration = 100 # 需要100个样本来校准
# 自适应参数 - 调整为更敏感
self.energy_multiplier = 1.0 # 能量阈值倍数(降低)
self.zcr_std_multiplier = 1.0 # ZCR标准差倍数降低
self.min_energy_threshold = 80 # 最小能量阈值(降低)
self.consecutive_voice_threshold = 2 # 连续语音检测阈值(降低)
self.consecutive_silence_threshold = 15 # 连续静音检测阈值(增加)
# 状态跟踪
self.consecutive_voice_count = 0
self.consecutive_silence_count = 0
self.last_voice_time = 0
# 调试信息
self.debug_mode = True
self.voice_count = 0
self.total_samples = 0
self._last_voice_state = False
def calculate_energy(self, audio_data: bytes) -> float:
"""计算音频能量RMS"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# RMS能量计算
rms = np.sqrt(np.mean(audio_array.astype(float) ** 2))
return rms
def calculate_zcr(self, audio_data: bytes) -> float:
"""计算零交叉率"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
zcr = zero_crossings / len(audio_array) * self.sample_rate
return zcr
def update_statistics(self, energy: float, zcr: float):
"""更新统计信息"""
self.energy_window.append(energy)
self.zcr_window.append(zcr)
if len(self.energy_window) >= 20:
# 计算详细统计信息
energy_array = np.array(self.energy_window)
zcr_array = np.array(self.zcr_window)
# 基础统计
self.energy_stats['mean'] = np.mean(energy_array)
self.energy_stats['std'] = np.std(energy_array)
self.energy_stats['min'] = np.min(energy_array)
self.energy_stats['max'] = np.max(energy_array)
self.energy_stats['median'] = np.median(energy_array)
self.energy_stats['q25'] = np.percentile(energy_array, 25)
self.energy_stats['q75'] = np.percentile(energy_array, 75)
self.zcr_stats['mean'] = np.mean(zcr_array)
self.zcr_stats['std'] = np.std(zcr_array)
self.zcr_stats['min'] = np.min(zcr_array)
self.zcr_stats['max'] = np.max(zcr_array)
self.zcr_stats['median'] = np.median(zcr_array)
self.zcr_stats['q25'] = np.percentile(zcr_array, 25)
self.zcr_stats['q75'] = np.percentile(zcr_array, 75)
def get_adaptive_thresholds(self) -> Dict[str, float]:
"""获取自适应阈值"""
if len(self.energy_window) < 30:
# 使用更敏感的固定阈值
return {
'energy_threshold': 120,
'zcr_min': 2000,
'zcr_max': 13000
}
# 计算动态能量阈值 - 使用更合理的算法
# 基于中位数和标准差,但使用更保守的倍数
base_energy_threshold = (self.energy_stats['median'] +
self.energy_multiplier * self.energy_stats['std'])
# 使用四分位数来避免异常值影响
q75 = self.energy_stats['q75']
q25 = self.energy_stats['q25']
iqr = q75 - q25 # 四分位距
# 基于IQR的鲁棒阈值 - 更敏感
iqr_threshold = q75 + 0.5 * iqr
# 结合两种方法的阈值 - 使用更低的阈值
energy_threshold = max(self.min_energy_threshold,
min(base_energy_threshold * 0.7, iqr_threshold))
# 计算动态ZCR阈值
zcr_center = self.zcr_stats['median']
zcr_spread = self.zcr_std_multiplier * self.zcr_stats['std']
# 确保ZCR范围在合理区间内 - 更宽松
zcr_min = max(1500, min(3000, zcr_center - zcr_spread))
zcr_max = min(14000, max(6000, zcr_center + zcr_spread * 2.0))
# 确保最小范围
if zcr_max - zcr_min < 2000:
zcr_max = zcr_min + 2000
return {
'energy_threshold': energy_threshold,
'zcr_min': zcr_min,
'zcr_max': zcr_max
}
def is_voice_basic(self, energy: float, zcr: float) -> bool:
"""基础语音检测(单帧)"""
thresholds = self.get_adaptive_thresholds()
# 能量检测
energy_ok = energy > thresholds['energy_threshold']
# ZCR检测
zcr_ok = thresholds['zcr_min'] < zcr < thresholds['zcr_max']
# 双重条件
return energy_ok and zcr_ok
def is_voice_advanced(self, audio_data: bytes) -> Dict[str, Any]:
"""高级语音检测(带状态跟踪)"""
# 计算特征
energy = self.calculate_energy(audio_data)
zcr = self.calculate_zcr(audio_data)
# 更新统计
self.update_statistics(energy, zcr)
# 总样本计数
self.total_samples += 1
# 校准模式
if self.calibration_mode:
self.calibration_samples += 1
if self.calibration_samples >= self.required_calibration:
self.calibration_mode = False
if self.debug_mode:
print(f"\n🎯 校准完成!")
print(f" 能量统计: {self.energy_stats['median']:.0f}±{self.energy_stats['std']:.0f}")
print(f" ZCR统计: {self.zcr_stats['median']:.0f}±{self.zcr_stats['std']:.0f}")
return {
'is_voice': False,
'energy': energy,
'zcr': zcr,
'calibrating': True,
'calibration_progress': self.calibration_samples / self.required_calibration,
'confidence': 0.0
}
# 基础检测
is_voice_frame = self.is_voice_basic(energy, zcr)
# 状态机处理
if is_voice_frame:
self.consecutive_voice_count += 1
self.consecutive_silence_count = 0
self.last_voice_time = time.time()
else:
self.consecutive_silence_count += 1
if self.consecutive_silence_count >= self.consecutive_silence_threshold:
self.consecutive_voice_count = 0
# 最终决策(需要连续检测到语音)
final_voice_detected = self.consecutive_voice_count >= self.consecutive_voice_threshold
if final_voice_detected and not hasattr(self, '_last_voice_state') or not self._last_voice_state:
self.voice_count += 1
# 更新最后状态
self._last_voice_state = final_voice_detected
# 计算置信度
thresholds = self.get_adaptive_thresholds()
energy_confidence = min(1.0, energy / thresholds['energy_threshold'])
zcr_confidence = 1.0 if thresholds['zcr_min'] < zcr < thresholds['zcr_max'] else 0.0
confidence = (energy_confidence + zcr_confidence) / 2
return {
'is_voice': final_voice_detected,
'energy': energy,
'zcr': zcr,
'confidence': confidence,
'energy_threshold': thresholds['energy_threshold'],
'zcr_min': thresholds['zcr_min'],
'zcr_max': thresholds['zcr_max'],
'consecutive_voice_count': self.consecutive_voice_count,
'consecutive_silence_count': self.consecutive_silence_count,
'calibrating': False,
'voice_detection_rate': self.voice_count / self.total_samples if self.total_samples > 0 else 0
}
def get_debug_info(self) -> str:
"""获取调试信息"""
if self.calibration_mode:
return f"校准中: {self.calibration_samples}/{self.required_calibration}"
thresholds = self.get_adaptive_thresholds()
return (f"能量阈值: {thresholds['energy_threshold']:.0f} | "
f"ZCR范围: {thresholds['zcr_min']:.0f}-{thresholds['zcr_max']:.0f} | "
f"检测率: {self.voice_count}/{self.total_samples} ({self.voice_count/self.total_samples*100:.1f}%)")
def reset(self):
"""重置检测器状态"""
self.energy_window.clear()
self.zcr_window.clear()
self.calibration_mode = True
self.calibration_samples = 0
self.consecutive_voice_count = 0
self.consecutive_silence_count = 0
self.voice_count = 0
self.total_samples = 0
class VoiceDetectorTester:
"""语音检测器测试器"""
def __init__(self):
self.detector = EnhancedVoiceDetector()
def run_test(self, duration=30):
"""运行测试"""
print("🎙️ 增强版语音检测器测试")
print("=" * 50)
print("📊 检测算法: 能量+ZCR双重检测")
print("📈 采样率: 16000Hz")
print("🔄 自适应阈值: 启用")
print("⏱️ 测试时长: 30秒")
print("💡 请说话测试检测效果...")
print("🛑 按 Ctrl+C 提前结束")
print("=" * 50)
try:
# 初始化音频
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024
)
start_time = time.time()
voice_segments = []
current_segment = None
while time.time() - start_time < duration:
# 读取音频数据
data = stream.read(1024, exception_on_overflow=False)
# 检测语音
result = self.detector.is_voice_advanced(data)
# 处理语音段
if result['is_voice']:
if current_segment is None:
current_segment = {
'start_time': time.time(),
'start_sample': self.detector.total_samples
}
else:
if current_segment is not None:
current_segment['end_time'] = time.time()
current_segment['end_sample'] = self.detector.total_samples
current_segment['duration'] = current_segment['end_time'] - current_segment['start_time']
voice_segments.append(current_segment)
current_segment = None
# 显示状态
if result['calibrating']:
progress = result['calibration_progress'] * 100
status = f"\r🔧 校准中: {progress:.0f}% | 能量: {result['energy']:.0f} | ZCR: {result['zcr']:.0f}"
else:
status_icon = "🎤" if result['is_voice'] else "🔇"
status_color = "\033[92m" if result['is_voice'] else "\033[90m"
reset_color = "\033[0m"
status = (f"{status_color}{status_icon} "
f"能量: {result['energy']:.0f}/{result['energy_threshold']:.0f} | "
f"ZCR: {result['zcr']:.0f} ({result['zcr_min']:.0f}-{result['zcr_max']:.0f}) | "
f"置信度: {result['confidence']:.2f} | "
f"连续: {result['consecutive_voice_count']}/{result['consecutive_silence_count']}{reset_color}")
print(f"\r{status}", end='', flush=True)
time.sleep(0.01)
# 结束当前段
if current_segment is not None:
current_segment['end_time'] = time.time()
current_segment['duration'] = current_segment['end_time'] - current_segment['start_time']
voice_segments.append(current_segment)
# 显示统计结果
print(f"\n\n📊 测试结果统计:")
print(f" 总检测时长: {duration}")
print(f" 检测到语音段: {len(voice_segments)}")
print(f" 总语音时长: {sum(s['duration'] for s in voice_segments):.1f}")
print(f" 语音占比: {sum(s['duration'] for s in voice_segments)/duration*100:.1f}%")
print(f" 平均置信度: {np.mean([r['confidence'] for r in [self.detector.is_voice_advanced(b'test') for _ in range(10)]]):.2f}")
if voice_segments:
print(f" 平均语音段时长: {np.mean([s['duration'] for s in voice_segments]):.1f}")
print(f" 最长语音段: {max(s['duration'] for s in voice_segments):.1f}")
print(f"\n🎯 检测器状态:")
print(f" {self.detector.get_debug_info()}")
except KeyboardInterrupt:
print(f"\n\n🛑 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
finally:
try:
if 'stream' in locals():
stream.stop_stream()
stream.close()
if 'audio' in locals():
audio.terminate()
except:
pass
def main():
"""主函数"""
tester = VoiceDetectorTester()
tester.run_test()
if __name__ == "__main__":
main()