Local-Voice/energy_based_recorder.py

342 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于能量检测的极简录音系统
专门针对树莓派3B优化完全移除Vosk识别依赖
"""
import sys
import os
import time
import threading
import pyaudio
import numpy as np
import wave
class EnergyBasedRecorder:
"""基于能量检测的录音系统"""
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0):
# 音频参数 - 极简优化
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 8000 # 8kHz采样率
self.CHUNK_SIZE = 1024 # 适中块大小
# 能量检测参数
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
self.min_recording_time = min_recording_time # 最小录音时间
self.max_recording_time = max_recording_time # 最大录音时间
# 状态变量
self.audio = None
self.stream = None
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = [] # 能量历史
self.max_energy_history = 50 # 最大能量历史记录
# 性能监控
self.frame_count = 0
self.start_time = time.time()
self._setup_audio()
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量
rms = np.sqrt(np.mean(audio_array ** 2))
# 更新能量历史
self.energy_history.append(rms)
if len(self.energy_history) > self.max_energy_history:
self.energy_history.pop(0)
return rms
def get_average_energy(self):
"""获取平均能量水平"""
if not self.energy_history:
return 0
return np.mean(self.energy_history)
def is_voice_active(self, energy):
"""判断是否有人声"""
return energy > self.energy_threshold
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
with wave.open(filename, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
except Exception as e:
print(f"❌ 播放失败: {e}")
self.play_with_system_player(filename)
def play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import subprocess
cmd = ['aplay', filename] # Linux系统
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
except Exception as e:
print(f"❌ 系统播放器也失败: {e}")
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.energy_history = [] # 重置能量历史
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2) # 16位音频每样本2字节
print(f"📝 录音完成,时长: {duration:.2f}")
# 保存录音
success, filename = self.save_recording(audio_data)
# 如果保存成功,播放录音
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
self.play_audio(filename)
print("=" * 50)
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
def monitor_performance(self):
"""性能监控"""
self.frame_count += 1
if self.frame_count % 1000 == 0: # 每1000帧显示一次
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
avg_energy = self.get_average_energy()
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
def auto_adjust_threshold(self):
"""自动调整能量阈值"""
if len(self.energy_history) >= 20:
# 基于历史能量的中位数和标准差调整阈值
median_energy = np.median(self.energy_history)
std_energy = np.std(self.energy_history)
# 设置阈值为中位数 + 2倍标准差
new_threshold = max(300, median_energy + 2 * std_energy)
# 平滑调整阈值
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
def run(self):
"""运行录音系统"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"能量阈值: {self.energy_threshold}")
print(f"静音阈值: {self.silence_threshold}")
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音1.5秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("=" * 50)
try:
while self.running:
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
# 计算能量
energy = self.calculate_energy(data)
# 性能监控
self.monitor_performance()
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 更新最后声音时间
if self.is_voice_active(energy):
self.last_sound_time = time.time()
# 检查是否应该结束录音
current_time = time.time()
# 检查静音超时
if current_time - self.last_sound_time > self.silence_threshold:
if recording_duration >= self.min_recording_time:
print(f"\n🔇 检测到持续静音 {self.silence_threshold}秒,结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}")
self.stop_recording()
# 显示录音状态
status = f"录音中... {recording_duration:.1f}s | 能量: {energy:.0f} | 静音: {current_time - self.last_sound_time:.1f}s"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式
if self.is_voice_active(energy):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态
avg_energy = self.get_average_energy()
status = f"监听中... 能量: {energy:.0f} | 平均: {avg_energy:.0f} | 阈值: {self.energy_threshold}"
print(f"\r{status}", end='', flush=True)
# 自动调整阈值
self.auto_adjust_threshold()
# 减少CPU使用
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
if self.recording:
self.stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
def main():
"""主函数"""
print("🚀 基于能量检测的极简录音系统")
print("=" * 50)
# 创建录音系统
recorder = EnergyBasedRecorder(
energy_threshold=200, # 能量阈值(降低以提高灵敏度)
silence_threshold=1.5, # 静音阈值(秒)
min_recording_time=2.0, # 最小录音时间
max_recording_time=30.0 # 最大录音时间
)
print("✅ 系统初始化成功")
print("🎯 优化特点:")
print(" - 完全移除Vosk识别依赖")
print(" - 基于能量检测极低CPU占用")
print(" - 自动调整能量阈值")
print(" - 实时性能监控")
print(" - 预期延迟: <0.1秒")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()