fix audio

This commit is contained in:
朱潮 2025-09-19 20:42:44 +08:00
parent e4503e2d1a
commit d5f2957984
5 changed files with 211 additions and 33 deletions

View File

@ -11,7 +11,8 @@ from dataclasses import dataclass
from typing import Any, Dict, Optional
import config
import pyaudio
import sounddevice as sd
import numpy as np
from realtime_dialog_client import RealtimeDialogClient
@ -31,40 +32,66 @@ class AudioDeviceManager:
def __init__(self, input_config: AudioConfig, output_config: AudioConfig):
self.input_config = input_config
self.output_config = output_config
self.pyaudio = pyaudio.PyAudio()
self.input_stream: Optional[pyaudio.Stream] = None
self.output_stream: Optional[pyaudio.Stream] = None
self.input_stream = None
self.output_stream = None
def open_input_stream(self) -> pyaudio.Stream:
def open_input_stream(self):
"""打开音频输入流"""
# p = pyaudio.PyAudio()
self.input_stream = self.pyaudio.open(
format=self.input_config.bit_size,
try:
self.input_stream = sd.InputStream(
samplerate=self.input_config.sample_rate,
channels=self.input_config.channels,
rate=self.input_config.sample_rate,
input=True,
frames_per_buffer=self.input_config.chunk
dtype='int16', # 16-bit PCM
blocksize=self.input_config.chunk,
device=None # 使用默认设备
)
self.input_stream.start()
return self.input_stream
except Exception as e:
print(f"打开输入流失败: {e}")
return None
def open_output_stream(self) -> pyaudio.Stream:
def open_output_stream(self):
"""打开音频输出流"""
self.output_stream = self.pyaudio.open(
format=self.output_config.bit_size,
try:
self.output_stream = sd.OutputStream(
samplerate=self.output_config.sample_rate,
channels=self.output_config.channels,
rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk
dtype='int16', # 16-bit PCM
blocksize=self.output_config.chunk,
device=None # 使用默认设备
)
self.output_stream.start()
return self.output_stream
except Exception as e:
print(f"打开输出流失败: {e}")
return None
def play_audio(self, audio_data: bytes) -> None:
"""播放音频数据"""
try:
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, self.output_config.channels)
# 使用sounddevice播放
sd.play(audio_array, samplerate=self.output_config.sample_rate)
sd.wait() # 等待播放完成
except Exception as e:
print(f"音频播放失败: {e}")
def cleanup(self) -> None:
"""清理音频设备资源"""
for stream in [self.input_stream, self.output_stream]:
if stream:
stream.stop_stream()
stream.close()
self.pyaudio.terminate()
try:
if self.input_stream:
self.input_stream.stop()
self.input_stream.close()
if self.output_stream:
self.output_stream.stop()
self.output_stream.close()
sd.stop() # 停止所有音频播放
except Exception as e:
print(f"清理音频设备失败: {e}")
class DialogSession:
@ -118,8 +145,12 @@ class DialogSession:
)
# 初始化音频队列和输出流
print(f"输出音频配置: {config.output_audio_config}")
self.output_stream = self.audio_device.open_output_stream()
output_stream = self.audio_device.open_output_stream()
if output_stream:
print("音频输出流已打开")
self.output_stream = output_stream
else:
print("警告:音频输出流打开失败,将使用直接播放模式")
# 启动播放线程
self.is_recording = True
self.is_playing = True
@ -155,11 +186,15 @@ class DialogSession:
if was_not_playing:
print("播放开始前,额外发送静音数据清理管道")
for _ in range(3):
self.output_stream.write(b'\x00' * len(audio_data))
# 播放静音数据
self.audio_device.play_audio(b'\x00' * len(audio_data))
time.sleep(0.1)
# 播放音频数据
self.output_stream.write(audio_data)
try:
self.audio_device.play_audio(audio_data)
except Exception as e:
print(f"音频播放错误: {e}")
except queue.Empty:
# 队列为空,检查是否超时

Binary file not shown.

Binary file not shown.

143
doubao/test_sounddevice.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
测试sounddevice音频播放功能
用于验证新的音频实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
def test_sounddevice():
"""测试sounddevice音频播放"""
print("=== SoundDevice音频播放测试 ===")
# 1. 检查音频设备
print("\n1. 检查音频设备...")
try:
devices = sd.query_devices()
print(f"找到 {len(devices)} 个音频设备:")
for i, dev in enumerate(devices):
print(f" [{i}] {dev['name']} (输入: {dev['max_input_channels']}, 输出: {dev['max_output_channels']})")
# 查找默认输出设备
default_output = sd.default.device
print(f"默认输出设备: {default_output}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试生成和播放音频
print("\n2. 测试生成和播放音频...")
try:
# 生成1秒的440Hz正弦波
sample_rate = 24000
duration = 1.0
frequency = 440
t = np.linspace(0, duration, int(sample_rate * duration), False)
audio_data = np.sin(2 * np.pi * frequency * t) * 0.3 # 30%音量
# 转换为16-bit整数
audio_data_int16 = (audio_data * 32767).astype(np.int16)
print(f"生成音频数据: 采样率={sample_rate}Hz, 时长={duration}秒, 频率={frequency}Hz")
print(f"音频数据形状: {audio_data_int16.shape}, 数据类型: {audio_data_int16.dtype}")
# 播放音频
print("开始播放测试音频...")
sd.play(audio_data_int16, sample_rate)
sd.wait() # 等待播放完成
print("✓ 音频播放成功")
except Exception as e:
print(f"音频播放失败: {e}")
return False
# 3. 测试直接播放字节数据
print("\n3. 测试直接播放字节数据...")
try:
# 将numpy数组转换为字节数据
byte_data = audio_data_int16.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
# 将字节数据转换回numpy数组
audio_array = np.frombuffer(byte_data, dtype=np.int16)
# 播放
print("开始播放字节数据...")
sd.play(audio_array, sample_rate)
sd.wait()
print("✓ 字节数据播放成功")
except Exception as e:
print(f"字节数据播放失败: {e}")
return False
# 4. 测试立体声
print("\n4. 测试立体声播放...")
try:
# 创建立体声数据
stereo_data = np.column_stack([audio_data_int16, audio_data_int16])
print(f"立体声数据形状: {stereo_data.shape}")
print("开始播放立体声音频...")
sd.play(stereo_data, sample_rate)
sd.wait()
print("✓ 立体声播放成功")
except Exception as e:
print(f"立体声播放失败: {e}")
return False
return True
def test_numpy_conversion():
"""测试numpy数组转换"""
print("\n5. 测试数据类型转换...")
# 模拟火山引擎返回的16bit PCM数据
test_data = b'\x00\x00\x7f\x7f\x80\x00\xff\xff' # 一些测试音频数据
try:
# 字节数据转numpy数组
audio_array = np.frombuffer(test_data, dtype=np.int16)
print(f"原始字节数据: {test_data}")
print(f"转换后numpy数组: {audio_array}")
print(f"数组形状: {audio_array.shape}, 数据类型: {audio_array.dtype}")
# 重塑为单声道
audio_reshaped = audio_array.reshape(-1, 1)
print(f"重塑后形状: {audio_reshaped.shape}")
# 转回字节数据
byte_data = audio_array.tobytes()
print(f"转回字节数据: {byte_data}")
print("✓ 数据类型转换测试成功")
return True
except Exception as e:
print(f"数据类型转换失败: {e}")
return False
if __name__ == "__main__":
print("SoundDevice音频播放功能测试")
print("=" * 50)
success = True
# 测试sounddevice
if not test_sounddevice():
success = False
# 测试数据转换
if not test_numpy_conversion():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有SoundDevice测试通过")
print("树莓派应该可以正常播放音频了!")
else:
print("✗ 部分测试失败,需要进一步调试")