Local-Voice/voice_recorder.py
2025-09-20 00:39:42 +08:00

344 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
语音录制模块
基于pyaudio实现支持语音活动检测(VAD)自动判断录音结束
"""
import pyaudio
import wave
import numpy as np
import time
import os
import threading
from collections import deque
class VoiceRecorder:
"""语音录制器,支持自动检测语音结束"""
def __init__(self,
energy_threshold=500,
silence_threshold=1.0,
min_recording_time=0.5,
max_recording_time=10.0,
sample_rate=16000,
chunk_size=1024,
defer_audio_init=False):
"""
初始化录音器
Args:
energy_threshold: 语音能量阈值
silence_threshold: 静音持续时间阈值(秒)
min_recording_time: 最小录音时间(秒)
max_recording_time: 最大录音时间(秒)
sample_rate: 采样率
chunk_size: 音频块大小
defer_audio_init: 是否延迟音频初始化
"""
self.energy_threshold = energy_threshold
self.silence_threshold = silence_threshold
self.min_recording_time = min_recording_time
self.max_recording_time = max_recording_time
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.defer_audio_init = defer_audio_init
# 音频参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
# 状态变量
self.audio = None
self.stream = None
self.recording = False
self.recorded_frames = []
# 语音检测相关
self.silence_start_time = None
self.recording_start_time = None
self.audio_buffer = deque(maxlen=int(sample_rate / chunk_size * 2)) # 2秒缓冲
# 回调函数
self.on_recording_complete = None
self.on_speech_detected = None
if not defer_audio_init:
self._setup_audio()
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
# 获取默认输入设备信息
device_info = self.audio.get_default_input_device_info()
print(f"使用音频设备: {device_info['name']}")
except Exception as e:
print(f"音频设备初始化失败: {e}")
raise
def _calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量
rms = np.sqrt(np.mean(audio_array ** 2))
return rms
def _is_speech(self, audio_data):
"""判断是否为语音"""
energy = self._calculate_energy(audio_data)
return energy > self.energy_threshold
def _open_stream(self):
"""打开音频流"""
if self.stream is not None:
return
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size
)
def _close_stream(self):
"""关闭音频流"""
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
def start_listening(self):
"""开始监听语音"""
if self.recording:
print("正在录音中...")
return
self._open_stream()
self.recording = True
self.recorded_frames = []
self.silence_start_time = None
self.recording_start_time = None
print("开始监听语音...")
# 在新线程中录音
recording_thread = threading.Thread(target=self._record_loop)
recording_thread.daemon = True
recording_thread.start()
def _record_loop(self):
"""录音循环"""
try:
while self.recording:
# 读取音频数据
data = self.stream.read(self.chunk_size, exception_on_overflow=False)
if len(data) == 0:
continue
# 计算能量
energy = self._calculate_energy(data)
# 添加到缓冲区
self.audio_buffer.append(data)
# 检测语音活动
if energy > self.energy_threshold:
# 检测到语音
if self.recording_start_time is None:
# 开始录音
self.recording_start_time = time.time()
self.silence_start_time = None
self.recorded_frames = list(self.audio_buffer) # 包含之前的音频
print("🎤 检测到语音,开始录音...")
if self.on_speech_detected:
self.on_speech_detected()
# 重置静音计时
self.silence_start_time = None
# 录音
self.recorded_frames.append(data)
elif self.recording_start_time is not None:
# 之前有语音,现在检查是否静音
if self.silence_start_time is None:
self.silence_start_time = time.time()
# 继续录音
self.recorded_frames.append(data)
# 检查是否静音超时
silence_duration = time.time() - self.silence_start_time
if silence_duration > self.silence_threshold:
recording_duration = time.time() - self.recording_start_time
# 检查最小录音时间
if recording_duration >= self.min_recording_time:
print(f"静音 {silence_duration:.1f}s结束录音")
self.stop_recording()
break
else:
print(f"录音时间太短 ({recording_duration:.1f}s),继续等待...")
self.silence_start_time = time.time()
# 检查最大录音时间
if self.recording_start_time is not None:
recording_duration = time.time() - self.recording_start_time
if recording_duration > self.max_recording_time:
print(f"达到最大录音时间 {self.max_recording_time}s结束录音")
self.stop_recording()
break
# 短暂休眠
time.sleep(0.01)
except Exception as e:
print(f"录音过程中发生错误: {e}")
self.stop_recording()
def stop_recording(self):
"""停止录音"""
if not self.recording:
return
self.recording = False
self._close_stream()
if len(self.recorded_frames) > 0:
# 保存录音
audio_data = b''.join(self.recorded_frames)
print(f"录音完成,共 {len(self.recorded_frames)}")
print(f"录音时长: {len(audio_data) / (self.sample_rate * 2):.2f}")
# 调用回调函数
if self.on_recording_complete:
self.on_recording_complete(audio_data)
# 重置状态
self.recorded_frames = []
self.silence_start_time = None
self.recording_start_time = None
def save_audio(self, audio_data, filename):
"""保存音频到文件"""
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.sample_rate)
wf.writeframes(audio_data)
print(f"音频已保存到: {filename}")
return True
except Exception as e:
print(f"保存音频失败: {e}")
return False
def set_recording_complete_callback(self, callback):
"""设置录音完成回调函数"""
self.on_recording_complete = callback
def set_speech_detected_callback(self, callback):
"""设置语音检测回调函数"""
self.on_speech_detected = callback
def adjust_sensitivity(self, energy_threshold=None, silence_threshold=None):
"""调整灵敏度"""
if energy_threshold is not None:
self.energy_threshold = energy_threshold
print(f"能量阈值调整为: {energy_threshold}")
if silence_threshold is not None:
self.silence_threshold = silence_threshold
print(f"静音阈值调整为: {silence_threshold}")
def get_audio_level(self):
"""获取当前音频级别"""
if len(self.audio_buffer) > 0:
latest_data = self.audio_buffer[-1]
return self._calculate_energy(latest_data)
return 0
def cleanup(self):
"""清理资源"""
self.stop_recording()
if self.audio:
self.audio.terminate()
self.audio = None
def main():
"""测试录音功能"""
print("🎙️ 语音录制测试")
print("=" * 50)
print("配置:")
print("- 能量阈值: 500")
print("- 静音阈值: 1.0秒")
print("- 最小录音时间: 0.5秒")
print("- 最大录音时间: 10秒")
print("=" * 50)
print("请说话测试录音功能...")
print("按 Ctrl+C 退出")
def on_recording_complete(audio_data):
"""录音完成回调"""
# 保存录音文件
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
recorder.save_audio(audio_data, filename)
print(f"✅ 录音文件已保存: {filename}")
# 显示录音信息
duration = len(audio_data) / (recorder.sample_rate * 2)
print(f"录音时长: {duration:.2f}")
def on_speech_detected():
"""检测到语音回调"""
print("🔊 检测到语音活动...")
# 创建录音器
recorder = VoiceRecorder(
energy_threshold=500,
silence_threshold=1.0,
min_recording_time=0.5,
max_recording_time=10.0
)
# 设置回调
recorder.set_recording_complete_callback(on_recording_complete)
recorder.set_speech_detected_callback(on_speech_detected)
try:
# 开始监听
recorder.start_listening()
# 保持程序运行
while True:
time.sleep(0.1)
# 显示当前音频级别(可选)
level = recorder.get_audio_level()
if level > 100:
print(f"当前音频级别: {level:.0f}", end='\r')
except KeyboardInterrupt:
print("\n👋 退出录音测试")
finally:
recorder.cleanup()
if __name__ == "__main__":
main()