#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 基于能量检测的极简录音系统 专门针对树莓派3B优化,完全移除Vosk识别依赖 """ import sys import os import time import threading import pyaudio import numpy as np import wave class EnergyBasedRecorder: """基于能量检测的录音系统""" def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0): # 音频参数 - 极简优化 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 # 16kHz采样率 self.CHUNK_SIZE = 1024 # 适中块大小 # 能量检测参数 self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音 self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束 self.min_recording_time = min_recording_time # 最小录音时间 self.max_recording_time = max_recording_time # 最大录音时间 self.pre_record_duration = 2.0 # 预录音时长(秒) # 状态变量 self.audio = None self.stream = None self.running = False self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None self.energy_history = [] self.zcr_history = [] # ZCR历史 self.max_energy_history = 50 # 最大能量历史记录 # 预录音缓冲区 self.pre_record_buffer = [] # 预录音缓冲区 self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数 # 播放状态 self.is_playing = False # 是否正在播放 # 智能静音检测 self.voice_activity_history = [] # 语音活动历史 self.max_voice_history = 20 # 最大语音活动历史记录 self.consecutive_silence_count = 0 # 连续静音计数 self.silence_threshold_count = 15 # 连续静音次数阈值(约1.5秒) # 智能ZCR静音检测 self.max_zcr_history = 30 # 最大ZCR历史记录 self.consecutive_low_zcr_count = 0 # 连续低ZCR计数 self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值(约2秒) # 性能监控 self.frame_count = 0 self.start_time = time.time() self._setup_audio() def _setup_audio(self): """设置音频设备""" try: self.audio = pyaudio.PyAudio() self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE ) print("✅ 音频设备初始化成功") except Exception as e: print(f"❌ 音频设备初始化失败: {e}") def calculate_energy(self, audio_data): """计算音频能量""" if len(audio_data) == 0: return 0 # 将字节数据转换为numpy数组 audio_array = np.frombuffer(audio_data, dtype=np.int16) # 计算RMS能量 rms = np.sqrt(np.mean(audio_array ** 2)) # 更新能量历史(只在非录音状态下更新,避免语音影响背景噪音计算) if not self.recording: self.energy_history.append(rms) if len(self.energy_history) > self.max_energy_history: self.energy_history.pop(0) return rms def calculate_peak_energy(self, audio_data): """计算峰值能量(辅助判断)""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) peak_energy = np.max(np.abs(audio_array)) return peak_energy def calculate_zero_crossing_rate(self, audio_data): """计算零交叉率(主要语音检测方法)""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) # 计算零交叉次数 zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) # 归一化到采样率 zcr = zero_crossings / len(audio_array) * self.RATE # 更新ZCR历史 self.zcr_history.append(zcr) if len(self.zcr_history) > self.max_zcr_history: self.zcr_history.pop(0) return zcr def is_voice_active_advanced(self, energy, zcr): """仅使用ZCR进行语音活动检测""" # ZCR语音检测:调整到2400-12000 Hz之间,适应16000Hz采样率 # 16000Hz采样率时,正常语音的ZCR范围会翻倍 zcr_condition = 2400 < zcr < 12000 # 添加一些容错,避免短暂的ZCR波动导致误判 return zcr_condition def is_voice_active(self, energy): """已弃用 - 仅用于兼容性""" # 现在主要使用ZCR检测,这个方法保留但不再使用 return False def save_recording(self, audio_data, filename=None): """保存录音""" if filename is None: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.wav" try: with wave.open(filename, 'wb') as wf: wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(audio_data) print(f"✅ 录音已保存: {filename}") print(f"📊 音频格式参数:") print(f" - 采样率: {self.RATE} Hz") print(f" - 声道数: {self.CHANNELS}") print(f" - 位深度: {self.audio.get_sample_size(self.FORMAT) * 8} bits") print(f" - 格式: PCM int16 小端序") return True, filename except Exception as e: print(f"❌ 保存录音失败: {e}") return False, None def play_audio(self, filename): """播放音频文件""" try: print("🔇 准备播放,完全停止音频输入") # 立即停止当前录音并清空所有缓冲区 if self.recording: self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None # 清空所有缓冲区 self.pre_record_buffer = [] self.energy_history = [] self.zcr_history = [] # 完全关闭输入流 if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # 设置播放状态 self.is_playing = True # 等待一小段时间确保音频设备完全停止输入 time.sleep(0.5) with wave.open(filename, 'rb') as wf: channels = wf.getnchannels() width = wf.getsampwidth() rate = wf.getframerate() total_frames = wf.getnframes() # 分块读取音频数据 chunk_size = 1024 frames = [] for _ in range(0, total_frames, chunk_size): chunk = wf.readframes(chunk_size) if chunk: frames.append(chunk) else: break # 创建播放流 playback_stream = self.audio.open( format=self.audio.get_format_from_width(width), channels=channels, rate=rate, output=True ) print(f"🔊 开始播放: {filename}") print("🚫 音频输入已完全关闭") # 分块播放音频 for chunk in frames: playback_stream.write(chunk) playback_stream.stop_stream() playback_stream.close() print("✅ 播放完成") print("🔄 重新开启音频输入") except Exception as e: print(f"❌ 播放失败: {e}") self.play_with_system_player(filename) finally: # 恢复播放状态 self.is_playing = False # 等待播放完全结束 time.sleep(0.3) # 重新开启输入流 self._setup_audio() # 重置所有状态 self.energy_history = [] self.zcr_history = [] print("📡 音频输入已重新开启") def play_with_system_player(self, filename): """使用系统播放器播放音频""" try: import subprocess cmd = ['aplay', filename] # Linux系统 print(f"🔊 使用系统播放器: {' '.join(cmd)}") print("🚫 系统播放器播放中,音频输入保持关闭") subprocess.run(cmd, check=True) print("✅ 播放完成") print("📡 音频输入已保持关闭状态") except Exception as e: print(f"❌ 系统播放器也失败: {e}") def play_audio_safe(self, filename): """安全的播放方式 - 使用系统播放器""" try: print("🔇 准备播放,完全停止音频输入") # 立即停止当前录音并清空所有缓冲区 if self.recording: self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None # 清空所有缓冲区 self.pre_record_buffer = [] self.energy_history = [] self.zcr_history = [] # 完全关闭输入流 if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # 设置播放状态 self.is_playing = True # 等待确保音频设备完全停止 time.sleep(0.5) print(f"🔊 开始播放: {filename}") print("🚫 使用系统播放器,音频输入已完全关闭") # 使用系统播放器 self.play_with_system_player(filename) print("🔄 准备重新开启音频输入") except Exception as e: print(f"❌ 播放失败: {e}") finally: # 恢复播放状态 self.is_playing = False # 等待播放完全结束 time.sleep(0.5) # 重新开启输入流 self._setup_audio() # 重置所有状态 self.energy_history = [] self.zcr_history = [] print("📡 音频输入已重新开启") def update_pre_record_buffer(self, audio_data): """更新预录音缓冲区""" self.pre_record_buffer.append(audio_data) # 保持缓冲区大小 if len(self.pre_record_buffer) > self.pre_record_max_frames: self.pre_record_buffer.pop(0) def start_recording(self): """开始录音""" print("🎙️ 检测到声音,开始录音...") self.recording = True self.recorded_frames = [] # 将预录音缓冲区的内容添加到录音中 self.recorded_frames.extend(self.pre_record_buffer) # 清空预录音缓冲区 self.pre_record_buffer = [] self.recording_start_time = time.time() self.last_sound_time = time.time() self.energy_history = [] self.zcr_history = [] # 重置ZCR历史 # 重置ZCR相关计数器 self.consecutive_low_zcr_count = 0 self.consecutive_silence_count = 0 self.voice_activity_history = [] def stop_recording(self): """停止录音""" if len(self.recorded_frames) > 0: audio_data = b''.join(self.recorded_frames) duration = len(audio_data) / (self.RATE * 2) # 16位音频,每样本2字节 # 计算实际录音时长和预录音时长 actual_duration = duration pre_record_duration = min(duration, self.pre_record_duration) print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)") # 保存录音 success, filename = self.save_recording(audio_data) # 如果保存成功,播放录音 if success and filename: print("=" * 50) print("🔊 播放刚才录制的音频...") # 优先使用系统播放器避免回声 self.play_audio_safe(filename) print("=" * 50) self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None self.energy_history = [] self.zcr_history = [] def monitor_performance(self): """性能监控""" self.frame_count += 1 if self.frame_count % 1000 == 0: # 每1000帧显示一次 elapsed = time.time() - self.start_time fps = self.frame_count / elapsed avg_energy = self.get_average_energy() print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}") def auto_adjust_threshold(self): """自动调整能量阈值""" if len(self.energy_history) >= 20: # 基于历史能量的中位数和标准差调整阈值 median_energy = np.median(self.energy_history) std_energy = np.std(self.energy_history) # 设置阈值为中位数 + 2倍标准差 new_threshold = max(300, median_energy + 2 * std_energy) # 平滑调整阈值 self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold def run(self): """运行录音系统""" if not self.stream: print("❌ 音频设备未初始化") return self.running = True print("🎤 开始监听...") print(f"能量阈值: {self.energy_threshold} (已弃用)") print(f"静音阈值: {self.silence_threshold}秒") print("📖 使用说明:") print("- 检测到声音自动开始录音") print("- 持续静音3秒自动结束录音") print("- 最少录音2秒,最多30秒") print("- 录音完成后自动播放") print("- 按 Ctrl+C 退出") print("🎯 新增功能:") print("- 纯ZCR语音检测(移除能量检测)") print("- 零交叉率检测(区分语音和噪音)") print("- 实时显示ZCR状态") print("- 预录音功能(包含声音开始前2秒)") print("- 环形缓冲区防止丢失开头音频") print("🤖 纯ZCR静音检测:") print("- 连续低ZCR计数(20次=2秒)") print("- ZCR活动历史追踪") print("- 基于ZCR模式的静音验证") print("- 语音范围: 2400-12000 Hz (适应16kHz采样率)") print("=" * 50) try: while self.running: # 读取音频数据 data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) if len(data) == 0: continue # 如果正在播放,完全跳过音频处理 if self.is_playing: # 显示播放状态 status = "🔊 播放中... 跳过录音处理" print(f"\r{status}", end='', flush=True) time.sleep(0.1) # 播放时增加延迟减少CPU使用 continue # 计算能量和零交叉率 energy = self.calculate_energy(data) zcr = self.calculate_zero_crossing_rate(data) peak_energy = self.calculate_peak_energy(data) # 性能监控 self.monitor_performance() if self.recording: # 录音模式 self.recorded_frames.append(data) recording_duration = time.time() - self.recording_start_time # 基于ZCR的智能静音检测 if self.is_voice_active_advanced(energy, zcr): self.last_sound_time = time.time() self.consecutive_low_zcr_count = 0 # 重置低ZCR计数 self.consecutive_silence_count = 0 # 重置静音计数 else: self.consecutive_low_zcr_count += 1 # 增加低ZCR计数 self.consecutive_silence_count += 1 # 增加静音计数 # 更新ZCR活动历史(基于ZCR是否在语音范围内) self.voice_activity_history.append(2400 < zcr < 12000) if len(self.voice_activity_history) > self.max_voice_history: self.voice_activity_history.pop(0) # 检查是否应该结束录音 current_time = time.time() # 纯ZCR静音检测 should_stop = False stop_reason = "" # 主要检测:连续低ZCR计数 if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count: # 进一步验证:检查最近的ZCR活动历史 if len(self.voice_activity_history) >= 15: recent_voice_activity = sum(self.voice_activity_history[-15:]) if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动 should_stop = True stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)" else: # 如果历史数据不足,使用基础检测 should_stop = True stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)" # 备用检测:基于时间的静音检测 if not should_stop and current_time - self.last_sound_time > self.silence_threshold: should_stop = True stop_reason = f"时间静音检测 ({self.silence_threshold}秒)" # 执行停止录音 if should_stop and recording_duration >= self.min_recording_time: print(f"\n🔇 {stop_reason},结束录音") self.stop_recording() # 检查最大录音时间 if recording_duration > self.max_recording_time: print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}秒") self.stop_recording() # 显示录音状态(仅ZCR相关信息) is_voice = self.is_voice_active_advanced(energy, zcr) zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}" recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0 status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}" print(f"\r{status}", end='', flush=True) else: # 监听模式 - 更新预录音缓冲区 self.update_pre_record_buffer(data) # 使用高级检测 if self.is_voice_active_advanced(energy, zcr): # 检测到声音,开始录音 self.start_recording() else: # 显示监听状态(仅ZCR相关信息) is_voice = self.is_voice_active_advanced(energy, zcr) buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100 status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%" print(f"\r{status}", end='', flush=True) # 减少CPU使用 time.sleep(0.01) except KeyboardInterrupt: print("\n👋 退出") except Exception as e: print(f"❌ 错误: {e}") finally: self.stop() def stop(self): """停止系统""" self.running = False if self.recording: self.stop_recording() if self.stream: self.stream.stop_stream() self.stream.close() if self.audio: self.audio.terminate() def main(): """主函数""" print("🚀 基于能量检测的极简录音系统") print("=" * 50) # 创建录音系统 recorder = EnergyBasedRecorder( energy_threshold=200, # 能量阈值(降低以提高灵敏度) silence_threshold=3.0, # 静音阈值(秒)- 改为3秒 min_recording_time=2.0, # 最小录音时间 max_recording_time=30.0 # 最大录音时间 ) print("✅ 系统初始化成功") print("🎯 优化特点:") print(" - 完全移除Vosk识别依赖") print(" - 基于能量检测,极低CPU占用") print(" - 自动调整能量阈值") print(" - 实时性能监控") print(" - 预期延迟: <0.1秒") print("=" * 50) # 开始运行 recorder.run() if __name__ == "__main__": main()