From b87be1494d7e9cae534e2f8138a121b3bb67ead3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sat, 20 Sep 2025 11:19:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E8=83=BD=E9=87=8F=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=E7=9A=84=E6=9E=81=E7=AE=80=E5=BD=95=E9=9F=B3=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=EF=BC=9A=E5=BD=BB=E5=BA=95=E8=A7=A3=E5=86=B3=E6=A0=91?= =?UTF-8?q?=E8=8E=93=E6=B4=BE3B=E5=BB=B6=E8=BF=9F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 完全移除Vosk识别依赖,改用能量检测 - 基于RMS能量值判断声音开始/结束 - 自动调整能量阈值适应环境噪音 - 实时性能监控,极低CPU占用 - 预期延迟:<0.1秒(原10秒) - 支持自动播放录制的音频 优化特点: - 8kHz采样率,1024块大小 - 自动阈值调整算法 - 静音检测1.5秒结束录音 - 最小录音2秒,最大30秒 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- energy_based_recorder.py | 342 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 energy_based_recorder.py diff --git a/energy_based_recorder.py b/energy_based_recorder.py new file mode 100644 index 0000000..33ddf04 --- /dev/null +++ b/energy_based_recorder.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +基于能量检测的极简录音系统 +专门针对树莓派3B优化,完全移除Vosk识别依赖 +""" + +import sys +import os +import time +import threading +import pyaudio +import numpy as np +import wave + +class EnergyBasedRecorder: + """基于能量检测的录音系统""" + + def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0): + # 音频参数 - 极简优化 + self.FORMAT = pyaudio.paInt16 + self.CHANNELS = 1 + self.RATE = 8000 # 8kHz采样率 + self.CHUNK_SIZE = 1024 # 适中块大小 + + # 能量检测参数 + self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音 + self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束 + self.min_recording_time = min_recording_time # 最小录音时间 + self.max_recording_time = max_recording_time # 最大录音时间 + + # 状态变量 + self.audio = None + self.stream = None + self.running = False + self.recording = False + self.recorded_frames = [] + self.recording_start_time = None + self.last_sound_time = None + self.energy_history = [] # 能量历史 + self.max_energy_history = 50 # 最大能量历史记录 + + # 性能监控 + self.frame_count = 0 + self.start_time = time.time() + + self._setup_audio() + + def _setup_audio(self): + """设置音频设备""" + try: + self.audio = pyaudio.PyAudio() + self.stream = self.audio.open( + format=self.FORMAT, + channels=self.CHANNELS, + rate=self.RATE, + input=True, + frames_per_buffer=self.CHUNK_SIZE + ) + print("✅ 音频设备初始化成功") + except Exception as e: + print(f"❌ 音频设备初始化失败: {e}") + + def calculate_energy(self, audio_data): + """计算音频能量""" + if len(audio_data) == 0: + return 0 + + # 将字节数据转换为numpy数组 + audio_array = np.frombuffer(audio_data, dtype=np.int16) + + # 计算RMS能量 + rms = np.sqrt(np.mean(audio_array ** 2)) + + # 更新能量历史 + self.energy_history.append(rms) + if len(self.energy_history) > self.max_energy_history: + self.energy_history.pop(0) + + return rms + + def get_average_energy(self): + """获取平均能量水平""" + if not self.energy_history: + return 0 + return np.mean(self.energy_history) + + def is_voice_active(self, energy): + """判断是否有人声""" + return energy > self.energy_threshold + + def save_recording(self, audio_data, filename=None): + """保存录音""" + if filename is None: + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"recording_{timestamp}.wav" + + try: + with wave.open(filename, 'wb') as wf: + wf.setnchannels(self.CHANNELS) + wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) + wf.setframerate(self.RATE) + wf.writeframes(audio_data) + + print(f"✅ 录音已保存: {filename}") + return True, filename + except Exception as e: + print(f"❌ 保存录音失败: {e}") + return False, None + + def play_audio(self, filename): + """播放音频文件""" + try: + with wave.open(filename, 'rb') as wf: + channels = wf.getnchannels() + width = wf.getsampwidth() + rate = wf.getframerate() + total_frames = wf.getnframes() + + # 分块读取音频数据 + chunk_size = 1024 + frames = [] + + for _ in range(0, total_frames, chunk_size): + chunk = wf.readframes(chunk_size) + if chunk: + frames.append(chunk) + else: + break + + # 创建播放流 + playback_stream = self.audio.open( + format=self.audio.get_format_from_width(width), + channels=channels, + rate=rate, + output=True + ) + + print(f"🔊 开始播放: {filename}") + + # 分块播放音频 + for chunk in frames: + playback_stream.write(chunk) + + playback_stream.stop_stream() + playback_stream.close() + + print("✅ 播放完成") + + except Exception as e: + print(f"❌ 播放失败: {e}") + self.play_with_system_player(filename) + + def play_with_system_player(self, filename): + """使用系统播放器播放音频""" + try: + import subprocess + cmd = ['aplay', filename] # Linux系统 + print(f"🔊 使用系统播放器: {' '.join(cmd)}") + subprocess.run(cmd, check=True) + print("✅ 播放完成") + except Exception as e: + print(f"❌ 系统播放器也失败: {e}") + + def start_recording(self): + """开始录音""" + print("🎙️ 检测到声音,开始录音...") + self.recording = True + self.recorded_frames = [] + self.recording_start_time = time.time() + self.last_sound_time = time.time() + self.energy_history = [] # 重置能量历史 + + def stop_recording(self): + """停止录音""" + if len(self.recorded_frames) > 0: + audio_data = b''.join(self.recorded_frames) + duration = len(audio_data) / (self.RATE * 2) # 16位音频,每样本2字节 + print(f"📝 录音完成,时长: {duration:.2f}秒") + + # 保存录音 + success, filename = self.save_recording(audio_data) + + # 如果保存成功,播放录音 + if success and filename: + print("=" * 50) + print("🔊 播放刚才录制的音频...") + self.play_audio(filename) + print("=" * 50) + + self.recording = False + self.recorded_frames = [] + self.recording_start_time = None + self.last_sound_time = None + self.energy_history = [] + + def monitor_performance(self): + """性能监控""" + self.frame_count += 1 + if self.frame_count % 1000 == 0: # 每1000帧显示一次 + elapsed = time.time() - self.start_time + fps = self.frame_count / elapsed + avg_energy = self.get_average_energy() + print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}") + + def auto_adjust_threshold(self): + """自动调整能量阈值""" + if len(self.energy_history) >= 20: + # 基于历史能量的中位数和标准差调整阈值 + median_energy = np.median(self.energy_history) + std_energy = np.std(self.energy_history) + + # 设置阈值为中位数 + 2倍标准差 + new_threshold = max(300, median_energy + 2 * std_energy) + + # 平滑调整阈值 + self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold + + def run(self): + """运行录音系统""" + if not self.stream: + print("❌ 音频设备未初始化") + return + + self.running = True + print("🎤 开始监听...") + print(f"能量阈值: {self.energy_threshold}") + print(f"静音阈值: {self.silence_threshold}秒") + print("📖 使用说明:") + print("- 检测到声音自动开始录音") + print("- 持续静音1.5秒自动结束录音") + print("- 最少录音2秒,最多30秒") + print("- 录音完成后自动播放") + print("- 按 Ctrl+C 退出") + print("=" * 50) + + try: + while self.running: + # 读取音频数据 + data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) + + if len(data) == 0: + continue + + # 计算能量 + energy = self.calculate_energy(data) + + # 性能监控 + self.monitor_performance() + + if self.recording: + # 录音模式 + self.recorded_frames.append(data) + recording_duration = time.time() - self.recording_start_time + + # 更新最后声音时间 + if self.is_voice_active(energy): + self.last_sound_time = time.time() + + # 检查是否应该结束录音 + current_time = time.time() + + # 检查静音超时 + if current_time - self.last_sound_time > self.silence_threshold: + if recording_duration >= self.min_recording_time: + print(f"\n🔇 检测到持续静音 {self.silence_threshold}秒,结束录音") + self.stop_recording() + + # 检查最大录音时间 + if recording_duration > self.max_recording_time: + print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}秒") + self.stop_recording() + + # 显示录音状态 + status = f"录音中... {recording_duration:.1f}s | 能量: {energy:.0f} | 静音: {current_time - self.last_sound_time:.1f}s" + print(f"\r{status}", end='', flush=True) + + else: + # 监听模式 + if self.is_voice_active(energy): + # 检测到声音,开始录音 + self.start_recording() + else: + # 显示监听状态 + avg_energy = self.get_average_energy() + status = f"监听中... 能量: {energy:.0f} | 平均: {avg_energy:.0f} | 阈值: {self.energy_threshold}" + print(f"\r{status}", end='', flush=True) + + # 自动调整阈值 + self.auto_adjust_threshold() + + # 减少CPU使用 + time.sleep(0.01) + + except KeyboardInterrupt: + print("\n👋 退出") + except Exception as e: + print(f"❌ 错误: {e}") + finally: + self.stop() + + def stop(self): + """停止系统""" + self.running = False + if self.recording: + self.stop_recording() + + if self.stream: + self.stream.stop_stream() + self.stream.close() + + if self.audio: + self.audio.terminate() + +def main(): + """主函数""" + print("🚀 基于能量检测的极简录音系统") + print("=" * 50) + + # 创建录音系统 + recorder = EnergyBasedRecorder( + energy_threshold=500, # 能量阈值 + silence_threshold=1.5, # 静音阈值(秒) + min_recording_time=2.0, # 最小录音时间 + max_recording_time=30.0 # 最大录音时间 + ) + + print("✅ 系统初始化成功") + print("🎯 优化特点:") + print(" - 完全移除Vosk识别依赖") + print(" - 基于能量检测,极低CPU占用") + print(" - 自动调整能量阈值") + print(" - 实时性能监控") + print(" - 预期延迟: <0.1秒") + print("=" * 50) + + # 开始运行 + recorder.run() + +if __name__ == "__main__": + main() \ No newline at end of file