#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 语音录制模块 基于pyaudio实现,支持语音活动检测(VAD)自动判断录音结束 """ import pyaudio import wave import numpy as np import time import os import threading from collections import deque class VoiceRecorder: """语音录制器,支持自动检测语音结束""" def __init__(self, energy_threshold=500, silence_threshold=1.0, min_recording_time=0.5, max_recording_time=10.0, sample_rate=16000, chunk_size=1024, defer_audio_init=False): """ 初始化录音器 Args: energy_threshold: 语音能量阈值 silence_threshold: 静音持续时间阈值(秒) min_recording_time: 最小录音时间(秒) max_recording_time: 最大录音时间(秒) sample_rate: 采样率 chunk_size: 音频块大小 defer_audio_init: 是否延迟音频初始化 """ self.energy_threshold = energy_threshold self.silence_threshold = silence_threshold self.min_recording_time = min_recording_time self.max_recording_time = max_recording_time self.sample_rate = sample_rate self.chunk_size = chunk_size self.defer_audio_init = defer_audio_init # 音频参数 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 # 状态变量 self.audio = None self.stream = None self.recording = False self.recorded_frames = [] # 语音检测相关 self.silence_start_time = None self.recording_start_time = None self.audio_buffer = deque(maxlen=int(sample_rate / chunk_size * 2)) # 2秒缓冲 # 回调函数 self.on_recording_complete = None self.on_speech_detected = None if not defer_audio_init: self._setup_audio() def _setup_audio(self): """设置音频设备""" try: self.audio = pyaudio.PyAudio() # 获取默认输入设备信息 device_info = self.audio.get_default_input_device_info() print(f"使用音频设备: {device_info['name']}") except Exception as e: print(f"音频设备初始化失败: {e}") raise def _calculate_energy(self, audio_data): """计算音频能量""" if len(audio_data) == 0: return 0 # 转换为numpy数组 audio_array = np.frombuffer(audio_data, dtype=np.int16) # 计算RMS能量 rms = np.sqrt(np.mean(audio_array ** 2)) return rms def _is_speech(self, audio_data): """判断是否为语音""" energy = self._calculate_energy(audio_data) return energy > self.energy_threshold def _open_stream(self): """打开音频流""" if self.stream is not None: return self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.sample_rate, input=True, frames_per_buffer=self.chunk_size ) def _close_stream(self): """关闭音频流""" if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None def start_listening(self): """开始监听语音""" if self.recording: print("正在录音中...") return self._open_stream() self.recording = True self.recorded_frames = [] self.silence_start_time = None self.recording_start_time = None print("开始监听语音...") # 在新线程中录音 recording_thread = threading.Thread(target=self._record_loop) recording_thread.daemon = True recording_thread.start() def _record_loop(self): """录音循环""" try: while self.recording: # 读取音频数据 data = self.stream.read(self.chunk_size, exception_on_overflow=False) if len(data) == 0: continue # 计算能量 energy = self._calculate_energy(data) # 添加到缓冲区 self.audio_buffer.append(data) # 检测语音活动 if energy > self.energy_threshold: # 检测到语音 if self.recording_start_time is None: # 开始录音 self.recording_start_time = time.time() self.silence_start_time = None self.recorded_frames = list(self.audio_buffer) # 包含之前的音频 print("🎤 检测到语音,开始录音...") if self.on_speech_detected: self.on_speech_detected() # 重置静音计时 self.silence_start_time = None # 录音 self.recorded_frames.append(data) elif self.recording_start_time is not None: # 之前有语音,现在检查是否静音 if self.silence_start_time is None: self.silence_start_time = time.time() # 继续录音 self.recorded_frames.append(data) # 检查是否静音超时 silence_duration = time.time() - self.silence_start_time if silence_duration > self.silence_threshold: recording_duration = time.time() - self.recording_start_time # 检查最小录音时间 if recording_duration >= self.min_recording_time: print(f"静音 {silence_duration:.1f}s,结束录音") self.stop_recording() break else: print(f"录音时间太短 ({recording_duration:.1f}s),继续等待...") self.silence_start_time = time.time() # 检查最大录音时间 if self.recording_start_time is not None: recording_duration = time.time() - self.recording_start_time if recording_duration > self.max_recording_time: print(f"达到最大录音时间 {self.max_recording_time}s,结束录音") self.stop_recording() break # 短暂休眠 time.sleep(0.01) except Exception as e: print(f"录音过程中发生错误: {e}") self.stop_recording() def stop_recording(self): """停止录音""" if not self.recording: return self.recording = False self._close_stream() if len(self.recorded_frames) > 0: # 保存录音 audio_data = b''.join(self.recorded_frames) print(f"录音完成,共 {len(self.recorded_frames)} 帧") print(f"录音时长: {len(audio_data) / (self.sample_rate * 2):.2f} 秒") # 调用回调函数 if self.on_recording_complete: self.on_recording_complete(audio_data) # 重置状态 self.recorded_frames = [] self.silence_start_time = None self.recording_start_time = None def save_audio(self, audio_data, filename): """保存音频到文件""" try: with wave.open(filename, 'wb') as wf: wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) wf.setframerate(self.sample_rate) wf.writeframes(audio_data) print(f"音频已保存到: {filename}") return True except Exception as e: print(f"保存音频失败: {e}") return False def set_recording_complete_callback(self, callback): """设置录音完成回调函数""" self.on_recording_complete = callback def set_speech_detected_callback(self, callback): """设置语音检测回调函数""" self.on_speech_detected = callback def adjust_sensitivity(self, energy_threshold=None, silence_threshold=None): """调整灵敏度""" if energy_threshold is not None: self.energy_threshold = energy_threshold print(f"能量阈值调整为: {energy_threshold}") if silence_threshold is not None: self.silence_threshold = silence_threshold print(f"静音阈值调整为: {silence_threshold}秒") def get_audio_level(self): """获取当前音频级别""" if len(self.audio_buffer) > 0: latest_data = self.audio_buffer[-1] return self._calculate_energy(latest_data) return 0 def cleanup(self): """清理资源""" self.stop_recording() if self.audio: self.audio.terminate() self.audio = None def main(): """测试录音功能""" print("🎙️ 语音录制测试") print("=" * 50) print("配置:") print("- 能量阈值: 500") print("- 静音阈值: 1.0秒") print("- 最小录音时间: 0.5秒") print("- 最大录音时间: 10秒") print("=" * 50) print("请说话测试录音功能...") print("按 Ctrl+C 退出") def on_recording_complete(audio_data): """录音完成回调""" # 保存录音文件 timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.wav" recorder.save_audio(audio_data, filename) print(f"✅ 录音文件已保存: {filename}") # 显示录音信息 duration = len(audio_data) / (recorder.sample_rate * 2) print(f"录音时长: {duration:.2f} 秒") def on_speech_detected(): """检测到语音回调""" print("🔊 检测到语音活动...") # 创建录音器 recorder = VoiceRecorder( energy_threshold=500, silence_threshold=1.0, min_recording_time=0.5, max_recording_time=10.0 ) # 设置回调 recorder.set_recording_complete_callback(on_recording_complete) recorder.set_speech_detected_callback(on_speech_detected) try: # 开始监听 recorder.start_listening() # 保持程序运行 while True: time.sleep(0.1) # 显示当前音频级别(可选) level = recorder.get_audio_level() if level > 100: print(f"当前音频级别: {level:.0f}", end='\r') except KeyboardInterrupt: print("\n👋 退出录音测试") finally: recorder.cleanup() if __name__ == "__main__": main()