fix audio

This commit is contained in:
朱潮 2025-09-19 20:49:20 +08:00
parent bc1dd7f03f
commit 3958d2ff81
3 changed files with 278 additions and 8 deletions

View File

@ -34,18 +34,37 @@ class AudioDeviceManager:
self.output_config = output_config self.output_config = output_config
self.input_stream = None self.input_stream = None
self.output_stream = None self.output_stream = None
self.audio_queue = None
self.recording = False
def open_input_stream(self): def open_input_stream(self):
"""打开音频输入流""" """打开音频输入流"""
try: try:
import queue
self.audio_queue = queue.Queue(maxsize=100) # 音频数据队列
def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if self.recording and self.audio_queue:
try:
# 将numpy数组转换为字节数据
audio_bytes = indata.tobytes()
self.audio_queue.put_nowait(audio_bytes)
except queue.Full:
print("警告: 音频队列已满,丢弃数据")
self.input_stream = sd.InputStream( self.input_stream = sd.InputStream(
samplerate=self.input_config.sample_rate, samplerate=self.input_config.sample_rate,
channels=self.input_config.channels, channels=self.input_config.channels,
dtype='int16', # 16-bit PCM dtype='int16', # 16-bit PCM
blocksize=self.input_config.chunk, blocksize=self.input_config.chunk,
callback=audio_callback,
device=None # 使用默认设备 device=None # 使用默认设备
) )
self.input_stream.start() self.input_stream.start()
self.recording = True
return self.input_stream return self.input_stream
except Exception as e: except Exception as e:
print(f"打开输入流失败: {e}") print(f"打开输入流失败: {e}")
@ -83,25 +102,29 @@ class AudioDeviceManager:
def read_audio_data(self, frames: int) -> bytes: def read_audio_data(self, frames: int) -> bytes:
"""读取音频数据""" """读取音频数据"""
try: try:
if self.input_stream is None: if not self.recording or self.audio_queue is None:
return b'\x00' * (frames * 2) # 返回静音数据 return b'\x00' * (frames * 2) # 返回静音数据
# sounddevice返回numpy数组 # 从队列获取音频数据
audio_data = self.input_stream.read(frames) try:
audio_data = self.audio_queue.get(timeout=0.1) # 100ms超时
# 转换为字节数据
if isinstance(audio_data, np.ndarray):
return audio_data.tobytes()
else:
return audio_data return audio_data
except queue.Empty:
# 队列为空,返回静音数据
return b'\x00' * (frames * 2)
except Exception as e: except Exception as e:
print(f"读取音频数据失败: {e}") print(f"读取音频数据失败: {e}")
return b'\x00' * (frames * 2) # 返回静音数据 return b'\x00' * (frames * 2) # 返回静音数据
def stop_recording(self):
"""停止录音"""
self.recording = False
def cleanup(self) -> None: def cleanup(self) -> None:
"""清理音频设备资源""" """清理音频设备资源"""
try: try:
self.recording = False
if self.input_stream: if self.input_stream:
self.input_stream.stop() self.input_stream.stop()
self.input_stream.close() self.input_stream.close()
@ -725,6 +748,7 @@ class DialogSession:
print(f"会话错误: {e}") print(f"会话错误: {e}")
finally: finally:
if not self.is_audio_file_input: if not self.is_audio_file_input:
self.audio_device.stop_recording() # 先停止录音
self.audio_device.cleanup() self.audio_device.cleanup()

177
doubao/test_microphone.py Normal file
View File

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
测试sounddevice麦克风录音功能
用于验证新的麦克风输入实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
import threading
import queue
import sys
def test_microphone():
"""测试麦克风录音"""
print("=== SoundDevice麦克风录音测试 ===")
# 1. 检查音频输入设备
print("\n1. 检查音频输入设备...")
try:
devices = sd.query_devices()
input_devices = [dev for dev in devices if dev['max_input_channels'] > 0]
print(f"找到 {len(input_devices)} 个输入设备:")
for i, dev in enumerate(input_devices):
print(f" [{i}] {dev['name']} (输入通道: {dev['max_input_channels']})")
if not input_devices:
print("错误: 没有找到可用的音频输入设备")
return False
# 查找默认输入设备
default_input = sd.default.device[0] if isinstance(sd.default.device, tuple) else sd.default.device
print(f"默认输入设备: {default_input}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试录音5秒
print("\n2. 测试录音5秒...")
try:
sample_rate = 16000
channels = 1
duration = 5
chunk_size = 3200
print(f"录音参数: 采样率={sample_rate}Hz, 通道={channels}, 时长={duration}")
print("开始录音,请说话...")
# 创建音频队列
audio_queue = queue.Queue()
recording = True
def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if recording:
audio_queue.put(indata.copy())
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size,
callback=audio_callback
) as stream:
# 录音指定时长
start_time = time.time()
audio_data = []
while time.time() - start_time < duration:
try:
data = audio_queue.get(timeout=1.0)
audio_data.append(data)
except queue.Empty:
print("警告: 音频队列为空")
break
print(f"录音完成,共收集到 {len(audio_data)} 个音频块")
# 3. 播放录制的音频
if audio_data:
print("\n3. 播放录制的音频...")
# 合并音频数据
recorded_audio = np.concatenate(audio_data, axis=0)
print(f"录制音频形状: {recorded_audio.shape}")
# 播放
print("开始播放录制的音频...")
sd.play(recorded_audio, sample_rate)
sd.wait()
print("✓ 音频播放完成")
# 保存音频文件
print("\n4. 保存音频文件...")
try:
from scipy.io import wavfile
wavfile.write('test_recording.wav', sample_rate, recorded_audio)
print("✓ 音频已保存为 test_recording.wav")
except ImportError:
print("提示: 安装scipy可保存WAV文件: pip install scipy")
else:
print("警告: 没有录制到音频数据")
return False
except Exception as e:
print(f"录音测试失败: {e}")
return False
return True
def test_stream_reading():
"""测试流式读取"""
print("\n5. 测试流式读取...")
try:
sample_rate = 16000
channels = 1
chunk_size = 3200
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size
) as stream:
print("开始流式读取测试...")
# 读取10个数据块
for i in range(10):
audio_data = stream.read(chunk_size)
print(f"读取第 {i+1} 块数据: 形状={audio_data.shape}, 类型={audio_data.dtype}")
# 转换为字节数据
byte_data = audio_data.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
time.sleep(0.1) # 模拟实际处理间隔
print("✓ 流式读取测试完成")
except Exception as e:
print(f"流式读取测试失败: {e}")
return False
return True
if __name__ == "__main__":
print("SoundDevice麦克风录音功能测试")
print("=" * 50)
success = True
# 测试麦克风
if not test_microphone():
success = False
# 测试流式读取
if not test_stream_reading():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有麦克风测试通过")
print("树莓派应该可以正常录音了!")
else:
print("✗ 部分测试失败,需要检查音频设备和权限")
print("请确保:")
print("1. 麦克风已正确连接")
print("2. 用户有音频设备访问权限")
print("3. 没有其他程序占用音频设备")

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
快速测试修复后的录音功能
验证回调模式是否解决了元组数据问题
"""
import sys
import time
import threading
def test_recording_fix():
"""测试录音修复"""
print("=== 测试录音修复 ===")
try:
# 导入修改后的模块
sys.path.append('/home/zhuchaowe/Local-Voice/doubao')
import audio_manager
import config
# 创建音频设备管理器
audio_device = audio_manager.AudioDeviceManager(
audio_manager.AudioConfig(**config.input_audio_config),
audio_manager.AudioConfig(**config.output_audio_config)
)
print("1. 打开音频输入流...")
input_stream = audio_device.open_input_stream()
if input_stream:
print("✓ 音频输入流打开成功")
else:
print("✗ 音频输入流打开失败")
return False
print("2. 测试读取音频数据...")
# 读取几秒钟的音频数据
for i in range(10):
audio_data = audio_device.read_audio_data(config.input_audio_config["chunk"])
if audio_data:
print(f"{i+1}次读取成功: {len(audio_data)} 字节")
if i == 0:
print(f"音频数据类型: {type(audio_data)}")
else:
print(f"{i+1}次读取失败")
time.sleep(0.1)
print("3. 停止录音...")
audio_device.stop_recording()
print("4. 清理资源...")
audio_device.cleanup()
print("✓ 所有测试通过!")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_recording_fix()
if success:
print("\n🎉 录音功能修复成功!")
print("现在可以运行 main.py 测试完整功能")
else:
print("\n❌ 录音功能仍有问题")
print("请检查错误信息并调整代码")