Local-Voice/recorder.py
2025-09-20 12:53:58 +08:00

580 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于能量检测的极简录音系统
专门针对树莓派3B优化完全移除Vosk识别依赖
"""
import sys
import os
import time
import threading
import pyaudio
import numpy as np
import wave
class EnergyBasedRecorder:
"""基于能量检测的录音系统"""
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0):
# 音频参数 - 极简优化
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 8000 # 8kHz采样率
self.CHUNK_SIZE = 1024 # 适中块大小
# 能量检测参数
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
self.min_recording_time = min_recording_time # 最小录音时间
self.max_recording_time = max_recording_time # 最大录音时间
self.pre_record_duration = 2.0 # 预录音时长(秒)
# 状态变量
self.audio = None
self.stream = None
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = [] # ZCR历史
self.max_energy_history = 50 # 最大能量历史记录
# 预录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数
# 播放状态
self.is_playing = False # 是否正在播放
# 智能静音检测
self.voice_activity_history = [] # 语音活动历史
self.max_voice_history = 20 # 最大语音活动历史记录
self.consecutive_silence_count = 0 # 连续静音计数
self.silence_threshold_count = 15 # 连续静音次数阈值约1.5秒)
# 智能ZCR静音检测
self.max_zcr_history = 30 # 最大ZCR历史记录
self.consecutive_low_zcr_count = 0 # 连续低ZCR计数
self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值约2秒
# 性能监控
self.frame_count = 0
self.start_time = time.time()
self._setup_audio()
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量
rms = np.sqrt(np.mean(audio_array ** 2))
# 更新能量历史(只在非录音状态下更新,避免语音影响背景噪音计算)
if not self.recording:
self.energy_history.append(rms)
if len(self.energy_history) > self.max_energy_history:
self.energy_history.pop(0)
return rms
def calculate_peak_energy(self, audio_data):
"""计算峰值能量(辅助判断)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
peak_energy = np.max(np.abs(audio_array))
return peak_energy
def calculate_zero_crossing_rate(self, audio_data):
"""计算零交叉率(主要语音检测方法)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算零交叉次数
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
# 归一化到采样率
zcr = zero_crossings / len(audio_array) * self.RATE
# 更新ZCR历史
self.zcr_history.append(zcr)
if len(self.zcr_history) > self.max_zcr_history:
self.zcr_history.pop(0)
return zcr
def is_voice_active_advanced(self, energy, zcr):
"""仅使用ZCR进行语音活动检测"""
# ZCR语音检测提高到1200-6000 Hz之间更好地区分语音和环境噪音
# 说话时ZCR会比较稳定在这个范围内
zcr_condition = 1200 < zcr < 6000
# 添加一些容错避免短暂的ZCR波动导致误判
return zcr_condition
def is_voice_active(self, energy):
"""已弃用 - 仅用于兼容性"""
# 现在主要使用ZCR检测这个方法保留但不再使用
return False
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
print("🔇 准备播放,完全停止音频输入")
# 立即停止当前录音并清空所有缓冲区
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空所有缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 完全关闭输入流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# 设置播放状态
self.is_playing = True
# 等待一小段时间确保音频设备完全停止输入
time.sleep(0.5)
with wave.open(filename, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
print("🚫 音频输入已完全关闭")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
print("🔄 重新开启音频输入")
except Exception as e:
print(f"❌ 播放失败: {e}")
self.play_with_system_player(filename)
finally:
# 恢复播放状态
self.is_playing = False
# 等待播放完全结束
time.sleep(0.3)
# 重新开启输入流
self._setup_audio()
# 重置所有状态
self.energy_history = []
self.zcr_history = []
print("📡 音频输入已重新开启")
def play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import subprocess
cmd = ['aplay', filename] # Linux系统
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
print("🚫 系统播放器播放中,音频输入保持关闭")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
print("📡 音频输入已保持关闭状态")
except Exception as e:
print(f"❌ 系统播放器也失败: {e}")
def play_audio_safe(self, filename):
"""安全的播放方式 - 使用系统播放器"""
try:
print("🔇 准备播放,完全停止音频输入")
# 立即停止当前录音并清空所有缓冲区
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空所有缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 完全关闭输入流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# 设置播放状态
self.is_playing = True
# 等待确保音频设备完全停止
time.sleep(0.5)
print(f"🔊 开始播放: {filename}")
print("🚫 使用系统播放器,音频输入已完全关闭")
# 使用系统播放器
self.play_with_system_player(filename)
print("🔄 准备重新开启音频输入")
except Exception as e:
print(f"❌ 播放失败: {e}")
finally:
# 恢复播放状态
self.is_playing = False
# 等待播放完全结束
time.sleep(0.5)
# 重新开启输入流
self._setup_audio()
# 重置所有状态
self.energy_history = []
self.zcr_history = []
print("📡 音频输入已重新开启")
def update_pre_record_buffer(self, audio_data):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
# 保持缓冲区大小
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
# 将预录音缓冲区的内容添加到录音中
self.recorded_frames.extend(self.pre_record_buffer)
# 清空预录音缓冲区
self.pre_record_buffer = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.energy_history = []
self.zcr_history = [] # 重置ZCR历史
# 重置ZCR相关计数器
self.consecutive_low_zcr_count = 0
self.consecutive_silence_count = 0
self.voice_activity_history = []
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2) # 16位音频每样本2字节
# 计算实际录音时长和预录音时长
actual_duration = duration
pre_record_duration = min(duration, self.pre_record_duration)
print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)")
# 保存录音
success, filename = self.save_recording(audio_data)
# 如果保存成功,播放录音
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
# 优先使用系统播放器避免回声
self.play_audio_safe(filename)
print("=" * 50)
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = []
def monitor_performance(self):
"""性能监控"""
self.frame_count += 1
if self.frame_count % 1000 == 0: # 每1000帧显示一次
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
avg_energy = self.get_average_energy()
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
def auto_adjust_threshold(self):
"""自动调整能量阈值"""
if len(self.energy_history) >= 20:
# 基于历史能量的中位数和标准差调整阈值
median_energy = np.median(self.energy_history)
std_energy = np.std(self.energy_history)
# 设置阈值为中位数 + 2倍标准差
new_threshold = max(300, median_energy + 2 * std_energy)
# 平滑调整阈值
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
def run(self):
"""运行录音系统"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"能量阈值: {self.energy_threshold} (已弃用)")
print(f"静音阈值: {self.silence_threshold}")
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音3秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("🎯 新增功能:")
print("- 纯ZCR语音检测移除能量检测")
print("- 零交叉率检测(区分语音和噪音)")
print("- 实时显示ZCR状态")
print("- 预录音功能包含声音开始前2秒")
print("- 环形缓冲区防止丢失开头音频")
print("🤖 纯ZCR静音检测:")
print("- 连续低ZCR计数20次=2秒")
print("- ZCR活动历史追踪")
print("- 基于ZCR模式的静音验证")
print("- 语音范围: 1200-6000 Hz (提高阈值)")
print("=" * 50)
try:
while self.running:
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
# 如果正在播放,完全跳过音频处理
if self.is_playing:
# 显示播放状态
status = "🔊 播放中... 跳过录音处理"
print(f"\r{status}", end='', flush=True)
time.sleep(0.1) # 播放时增加延迟减少CPU使用
continue
# 计算能量和零交叉率
energy = self.calculate_energy(data)
zcr = self.calculate_zero_crossing_rate(data)
peak_energy = self.calculate_peak_energy(data)
# 性能监控
self.monitor_performance()
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 基于ZCR的智能静音检测
if self.is_voice_active_advanced(energy, zcr):
self.last_sound_time = time.time()
self.consecutive_low_zcr_count = 0 # 重置低ZCR计数
self.consecutive_silence_count = 0 # 重置静音计数
else:
self.consecutive_low_zcr_count += 1 # 增加低ZCR计数
self.consecutive_silence_count += 1 # 增加静音计数
# 更新ZCR活动历史基于ZCR是否在语音范围内
self.voice_activity_history.append(1200 < zcr < 6000)
if len(self.voice_activity_history) > self.max_voice_history:
self.voice_activity_history.pop(0)
# 检查是否应该结束录音
current_time = time.time()
# 纯ZCR静音检测
should_stop = False
stop_reason = ""
# 主要检测连续低ZCR计数
if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count:
# 进一步验证检查最近的ZCR活动历史
if len(self.voice_activity_history) >= 15:
recent_voice_activity = sum(self.voice_activity_history[-15:])
if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动
should_stop = True
stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)"
else:
# 如果历史数据不足,使用基础检测
should_stop = True
stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)"
# 备用检测:基于时间的静音检测
if not should_stop and current_time - self.last_sound_time > self.silence_threshold:
should_stop = True
stop_reason = f"时间静音检测 ({self.silence_threshold}秒)"
# 执行停止录音
if should_stop and recording_duration >= self.min_recording_time:
print(f"\n🔇 {stop_reason},结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}")
self.stop_recording()
# 显示录音状态仅ZCR相关信息
is_voice = self.is_voice_active_advanced(energy, zcr)
zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}"
recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0
status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式 - 更新预录音缓冲区
self.update_pre_record_buffer(data)
# 使用高级检测
if self.is_voice_active_advanced(energy, zcr):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态仅ZCR相关信息
is_voice = self.is_voice_active_advanced(energy, zcr)
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%"
print(f"\r{status}", end='', flush=True)
# 减少CPU使用
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
if self.recording:
self.stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
def main():
"""主函数"""
print("🚀 基于能量检测的极简录音系统")
print("=" * 50)
# 创建录音系统
recorder = EnergyBasedRecorder(
energy_threshold=200, # 能量阈值(降低以提高灵敏度)
silence_threshold=3.0, # 静音阈值(秒)- 改为3秒
min_recording_time=2.0, # 最小录音时间
max_recording_time=30.0 # 最大录音时间
)
print("✅ 系统初始化成功")
print("🎯 优化特点:")
print(" - 完全移除Vosk识别依赖")
print(" - 基于能量检测极低CPU占用")
print(" - 自动调整能量阈值")
print(" - 实时性能监控")
print(" - 预期延迟: <0.1秒")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()