#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 实时ZCR监控工具 用于观察实际的ZCR值和测试语音检测 """ import threading import time import numpy as np import pyaudio class ZCRMonitor: """ZCR实时监控器""" def __init__(self): self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 self.CHUNK_SIZE = 1024 # 监控参数 self.running = False self.zcr_history = [] self.max_history = 100 # 音频设备 self.audio = None self.stream = None # 检测阈值(匹配recorder.py的设置) self.zcr_min = 2400 self.zcr_max = 12000 def setup_audio(self): """设置音频设备""" try: self.audio = pyaudio.PyAudio() self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE ) return True except Exception as e: print(f"❌ 音频设备初始化失败: {e}") return False def calculate_zcr(self, audio_data): """计算零交叉率""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) zcr = zero_crossings / len(audio_array) * self.RATE return zcr def is_voice(self, zcr): """简单的语音检测""" return self.zcr_min < zcr < self.zcr_max def monitor_callback(self, in_data, frame_count, time_info, status): """音频回调函数""" zcr = self.calculate_zcr(in_data) # 更新历史 self.zcr_history.append(zcr) if len(self.zcr_history) > self.max_history: self.zcr_history.pop(0) # 计算统计信息 if len(self.zcr_history) > 10: avg_zcr = np.mean(self.zcr_history[-10:]) # 最近10个值的平均 std_zcr = np.std(self.zcr_history[-10:]) else: avg_zcr = zcr std_zcr = 0 # 判断是否为语音 voice_detected = self.is_voice(zcr) # 实时显示 status = "🎤" if voice_detected else "🔇" color = "\033[92m" if voice_detected else "\033[90m" # 绿色或灰色 reset = "\033[0m" # 显示信息 info = (f"{color}{status} ZCR: {zcr:.0f} | " f"阈值: {self.zcr_min}-{self.zcr_max} | " f"平均: {avg_zcr:.0f}±{std_zcr:.0f}{reset}") print(f"\r{info}", end='', flush=True) return (in_data, pyaudio.paContinue) def start_monitoring(self): """开始监控""" print("🎙️ ZCR实时监控工具") print("=" * 50) print("📊 当前检测阈值:") print(f" ZCR范围: {self.zcr_min} - {self.zcr_max}") print("💡 请说话测试语音检测...") print("🛑 按 Ctrl+C 停止监控") print("=" * 50) try: # 使用回调模式 self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE, stream_callback=self.monitor_callback ) self.stream.start_stream() self.running = True # 主循环 while self.running: time.sleep(0.1) except KeyboardInterrupt: print("\n🛑 监控停止") finally: self.cleanup() def show_statistics(self): """显示统计信息""" if not self.zcr_history: return print("\n📊 ZCR统计信息:") print(f" 样本数量: {len(self.zcr_history)}") print(f" 最小值: {min(self.zcr_history):.0f}") print(f" 最大值: {max(self.zcr_history):.0f}") print(f" 平均值: {np.mean(self.zcr_history):.0f}") print(f" 标准差: {np.std(self.zcr_history):.0f}") # 分析语音检测 voice_count = sum(1 for zcr in self.zcr_history if self.is_voice(zcr)) voice_percentage = voice_count / len(self.zcr_history) * 100 print(f" 语音检测: {voice_count}/{len(self.zcr_history)} ({voice_percentage:.1f}%)") # 建议新的阈值 avg_zcr = np.mean(self.zcr_history) std_zcr = np.std(self.zcr_history) suggested_min = max(800, avg_zcr + std_zcr) suggested_max = min(8000, avg_zcr + 4 * std_zcr) print(f"\n🎯 建议的检测阈值:") print(f" 最小值: {suggested_min:.0f}") print(f" 最大值: {suggested_max:.0f}") def cleanup(self): """清理资源""" self.running = False if self.stream: try: self.stream.stop_stream() self.stream.close() except: pass if self.audio: try: self.audio.terminate() except: pass # 显示最终统计 self.show_statistics() def main(): """主函数""" monitor = ZCRMonitor() if not monitor.setup_audio(): print("❌ 无法初始化音频设备") return try: monitor.start_monitoring() except Exception as e: print(f"❌ 监控过程中出错: {e}") finally: monitor.cleanup() if __name__ == "__main__": main()