Local-Voice/energy_based_recorder.py
2025-09-20 11:51:02 +08:00

416 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于能量检测的极简录音系统
专门针对树莓派3B优化完全移除Vosk识别依赖
"""
import sys
import os
import time
import threading
import pyaudio
import numpy as np
import wave
class EnergyBasedRecorder:
"""基于能量检测的录音系统"""
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0):
# 音频参数 - 极简优化
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 8000 # 8kHz采样率
self.CHUNK_SIZE = 1024 # 适中块大小
# 能量检测参数
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
self.min_recording_time = min_recording_time # 最小录音时间
self.max_recording_time = max_recording_time # 最大录音时间
self.pre_record_duration = 2.0 # 预录音时长(秒)
# 状态变量
self.audio = None
self.stream = None
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = [] # 能量历史
self.max_energy_history = 50 # 最大能量历史记录
# 预录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数
# 性能监控
self.frame_count = 0
self.start_time = time.time()
self._setup_audio()
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量
rms = np.sqrt(np.mean(audio_array ** 2))
# 更新能量历史
self.energy_history.append(rms)
if len(self.energy_history) > self.max_energy_history:
self.energy_history.pop(0)
return rms
def calculate_zero_crossing_rate(self, audio_data):
"""计算零交叉率(辅助判断语音)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算零交叉次数
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
# 归一化到采样率
zcr = zero_crossings / len(audio_array) * self.RATE
return zcr
def is_voice_active_advanced(self, energy, zcr):
"""高级语音活动检测"""
# 动态阈值:基于背景噪音
if len(self.energy_history) >= 10:
# 使用最近10个样本的中位数作为背景噪音
background_energy = np.median(self.energy_history[-10:])
# 动态阈值:背景噪音 + 25%比原来的500更敏感
dynamic_threshold = max(50, background_energy * 1.25)
# 能量条件
energy_condition = energy > dynamic_threshold
# 零交叉率条件语音通常在500-5000 Hz之间
# 对于8kHz采样率ZCR通常在500-2000之间
zcr_condition = 500 < zcr < 3000
# 同时满足能量和ZCR条件才认为是语音
return energy_condition and zcr_condition
else:
# 初始阶段使用固定阈值
return energy > 80 # 更低的初始阈值
def get_average_energy(self):
"""获取平均能量水平"""
if not self.energy_history:
return 0
return np.mean(self.energy_history)
def is_voice_active(self, energy):
"""判断是否有人声"""
return energy > self.energy_threshold
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
with wave.open(filename, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
except Exception as e:
print(f"❌ 播放失败: {e}")
self.play_with_system_player(filename)
def play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import subprocess
cmd = ['aplay', filename] # Linux系统
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
except Exception as e:
print(f"❌ 系统播放器也失败: {e}")
def update_pre_record_buffer(self, audio_data):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
# 保持缓冲区大小
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
# 将预录音缓冲区的内容添加到录音中
self.recorded_frames.extend(self.pre_record_buffer)
# 清空预录音缓冲区
self.pre_record_buffer = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.energy_history = [] # 重置能量历史
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2) # 16位音频每样本2字节
# 计算实际录音时长和预录音时长
actual_duration = duration
pre_record_duration = min(duration, self.pre_record_duration)
print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)")
# 保存录音
success, filename = self.save_recording(audio_data)
# 如果保存成功,播放录音
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
self.play_audio(filename)
print("=" * 50)
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
def monitor_performance(self):
"""性能监控"""
self.frame_count += 1
if self.frame_count % 1000 == 0: # 每1000帧显示一次
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
avg_energy = self.get_average_energy()
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
def auto_adjust_threshold(self):
"""自动调整能量阈值"""
if len(self.energy_history) >= 20:
# 基于历史能量的中位数和标准差调整阈值
median_energy = np.median(self.energy_history)
std_energy = np.std(self.energy_history)
# 设置阈值为中位数 + 2倍标准差
new_threshold = max(300, median_energy + 2 * std_energy)
# 平滑调整阈值
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
def run(self):
"""运行录音系统"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"能量阈值: {self.energy_threshold}")
print(f"静音阈值: {self.silence_threshold}")
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音3秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("🎯 新增功能:")
print("- 动态阈值调整(基于背景噪音)")
print("- 零交叉率检测(区分语音和噪音)")
print("- 实时显示ZCR和背景能量")
print("- 预录音功能包含声音开始前2秒")
print("- 环形缓冲区防止丢失开头音频")
print("=" * 50)
try:
while self.running:
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
# 计算能量和零交叉率
energy = self.calculate_energy(data)
zcr = self.calculate_zero_crossing_rate(data)
# 性能监控
self.monitor_performance()
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 更新最后声音时间
if self.is_voice_active_advanced(energy, zcr):
self.last_sound_time = time.time()
# 检查是否应该结束录音
current_time = time.time()
# 检查静音超时
if current_time - self.last_sound_time > self.silence_threshold:
if recording_duration >= self.min_recording_time:
print(f"\n🔇 检测到持续静音 {self.silence_threshold}秒,结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}")
self.stop_recording()
# 显示录音状态(包含预录音信息)
pre_duration = len(self.pre_record_buffer) * self.CHUNK_SIZE / self.RATE
bg_energy = np.median(self.energy_history[-10:]) if len(self.energy_history) >= 10 else 0
status = f"录音中... {recording_duration:.1f}s | 能量: {energy:.0f} | ZCR: {zcr:.0f} | 背景: {bg_energy:.0f}"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式 - 更新预录音缓冲区
self.update_pre_record_buffer(data)
# 使用高级检测
if self.is_voice_active_advanced(energy, zcr):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态(包含缓冲区信息)
avg_energy = self.get_average_energy()
bg_energy = np.median(self.energy_history[-10:]) if len(self.energy_history) >= 10 else 0
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = f"监听中... 能量: {energy:.0f} | ZCR: {zcr:.0f} | 背景: {bg_energy:.0f} | 缓冲: {buffer_usage:.0f}%"
print(f"\r{status}", end='', flush=True)
# 减少CPU使用
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
if self.recording:
self.stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
def main():
"""主函数"""
print("🚀 基于能量检测的极简录音系统")
print("=" * 50)
# 创建录音系统
recorder = EnergyBasedRecorder(
energy_threshold=200, # 能量阈值(降低以提高灵敏度)
silence_threshold=3.0, # 静音阈值(秒)- 改为3秒
min_recording_time=2.0, # 最小录音时间
max_recording_time=30.0 # 最大录音时间
)
print("✅ 系统初始化成功")
print("🎯 优化特点:")
print(" - 完全移除Vosk识别依赖")
print(" - 基于能量检测极低CPU占用")
print(" - 自动调整能量阈值")
print(" - 实时性能监控")
print(" - 预期延迟: <0.1秒")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()