基于能量检测的极简录音系统:彻底解决树莓派3B延迟问题

- 完全移除Vosk识别依赖,改用能量检测
- 基于RMS能量值判断声音开始/结束
- 自动调整能量阈值适应环境噪音
- 实时性能监控,极低CPU占用
- 预期延迟:<0.1秒(原10秒)
- 支持自动播放录制的音频

优化特点:
- 8kHz采样率,1024块大小
- 自动阈值调整算法
- 静音检测1.5秒结束录音
- 最小录音2秒,最大30秒

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
朱潮 2025-09-20 11:19:08 +08:00
parent 70c42eca15
commit b87be1494d

342
energy_based_recorder.py Normal file
View File

@ -0,0 +1,342 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于能量检测的极简录音系统
专门针对树莓派3B优化完全移除Vosk识别依赖
"""
import sys
import os
import time
import threading
import pyaudio
import numpy as np
import wave
class EnergyBasedRecorder:
"""基于能量检测的录音系统"""
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0):
# 音频参数 - 极简优化
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 8000 # 8kHz采样率
self.CHUNK_SIZE = 1024 # 适中块大小
# 能量检测参数
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
self.min_recording_time = min_recording_time # 最小录音时间
self.max_recording_time = max_recording_time # 最大录音时间
# 状态变量
self.audio = None
self.stream = None
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = [] # 能量历史
self.max_energy_history = 50 # 最大能量历史记录
# 性能监控
self.frame_count = 0
self.start_time = time.time()
self._setup_audio()
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量
rms = np.sqrt(np.mean(audio_array ** 2))
# 更新能量历史
self.energy_history.append(rms)
if len(self.energy_history) > self.max_energy_history:
self.energy_history.pop(0)
return rms
def get_average_energy(self):
"""获取平均能量水平"""
if not self.energy_history:
return 0
return np.mean(self.energy_history)
def is_voice_active(self, energy):
"""判断是否有人声"""
return energy > self.energy_threshold
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
with wave.open(filename, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
except Exception as e:
print(f"❌ 播放失败: {e}")
self.play_with_system_player(filename)
def play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import subprocess
cmd = ['aplay', filename] # Linux系统
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
except Exception as e:
print(f"❌ 系统播放器也失败: {e}")
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.energy_history = [] # 重置能量历史
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2) # 16位音频每样本2字节
print(f"📝 录音完成,时长: {duration:.2f}")
# 保存录音
success, filename = self.save_recording(audio_data)
# 如果保存成功,播放录音
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
self.play_audio(filename)
print("=" * 50)
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
def monitor_performance(self):
"""性能监控"""
self.frame_count += 1
if self.frame_count % 1000 == 0: # 每1000帧显示一次
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
avg_energy = self.get_average_energy()
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
def auto_adjust_threshold(self):
"""自动调整能量阈值"""
if len(self.energy_history) >= 20:
# 基于历史能量的中位数和标准差调整阈值
median_energy = np.median(self.energy_history)
std_energy = np.std(self.energy_history)
# 设置阈值为中位数 + 2倍标准差
new_threshold = max(300, median_energy + 2 * std_energy)
# 平滑调整阈值
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
def run(self):
"""运行录音系统"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"能量阈值: {self.energy_threshold}")
print(f"静音阈值: {self.silence_threshold}")
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音1.5秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("=" * 50)
try:
while self.running:
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
# 计算能量
energy = self.calculate_energy(data)
# 性能监控
self.monitor_performance()
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 更新最后声音时间
if self.is_voice_active(energy):
self.last_sound_time = time.time()
# 检查是否应该结束录音
current_time = time.time()
# 检查静音超时
if current_time - self.last_sound_time > self.silence_threshold:
if recording_duration >= self.min_recording_time:
print(f"\n🔇 检测到持续静音 {self.silence_threshold}秒,结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}")
self.stop_recording()
# 显示录音状态
status = f"录音中... {recording_duration:.1f}s | 能量: {energy:.0f} | 静音: {current_time - self.last_sound_time:.1f}s"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式
if self.is_voice_active(energy):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态
avg_energy = self.get_average_energy()
status = f"监听中... 能量: {energy:.0f} | 平均: {avg_energy:.0f} | 阈值: {self.energy_threshold}"
print(f"\r{status}", end='', flush=True)
# 自动调整阈值
self.auto_adjust_threshold()
# 减少CPU使用
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
if self.recording:
self.stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
def main():
"""主函数"""
print("🚀 基于能量检测的极简录音系统")
print("=" * 50)
# 创建录音系统
recorder = EnergyBasedRecorder(
energy_threshold=500, # 能量阈值
silence_threshold=1.5, # 静音阈值(秒)
min_recording_time=2.0, # 最小录音时间
max_recording_time=30.0 # 最大录音时间
)
print("✅ 系统初始化成功")
print("🎯 优化特点:")
print(" - 完全移除Vosk识别依赖")
print(" - 基于能量检测极低CPU占用")
print(" - 自动调整能量阈值")
print(" - 实时性能监控")
print(" - 预期延迟: <0.1秒")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()