fix audio

This commit is contained in:
朱潮 2025-09-19 19:56:53 +08:00
parent e432417299
commit 1e0fd6e234
8 changed files with 540 additions and 10 deletions

Binary file not shown.

394
doubao/audio_converter.py Normal file
View File

@ -0,0 +1,394 @@
#!/usr/bin/env python3
"""
音频格式转换工具
提供跨平台音频格式转换功能
"""
import platform
import struct
import numpy as np
from typing import Tuple, Optional
import subprocess
import tempfile
import os
import wave
class AudioConverter:
"""音频格式转换工具类"""
@staticmethod
def get_platform_info() -> dict:
"""获取平台信息"""
return {
'system': platform.system(),
'machine': platform.machine(),
'platform': platform.platform()
}
@staticmethod
def float32_to_int16(float32_data: bytes) -> bytes:
"""将Float32格式转换为Int16格式"""
if len(float32_data) % 4 != 0:
# 处理数据长度不是4的倍数的情况
float32_data = float32_data[:len(float32_data) - len(float32_data) % 4]
# 解包Float32数据
float_values = struct.unpack(f'{len(float32_data) // 4}f', float32_data)
# 转换为Int16范围
int16_values = []
for val in float_values:
# 限制在[-1.0, 1.0]范围内
val = max(-1.0, min(1.0, val))
# 转换为Int16
int16_val = int(val * 32767)
int16_values.append(int16_val)
# 打包为Int16字节数据
return struct.pack(f'{len(int16_values)}h', *int16_values)
@staticmethod
def int16_to_float32(int16_data: bytes) -> bytes:
"""将Int16格式转换为Float32格式"""
if len(int16_data) % 2 != 0:
# 处理数据长度不是2的倍数的情况
int16_data = int16_data[:len(int16_data) - len(int16_data) % 2]
# 解包Int16数据
int16_values = struct.unpack(f'{len(int16_data) // 2}h', int16_data)
# 转换为Float32
float_values = []
for val in int16_values:
# 转换为Float32范围
float_val = val / 32767.0
float_values.append(float_val)
# 打包为Float32字节数据
return struct.pack(f'{len(float_values)}f', *float_values)
@staticmethod
def resample_audio(audio_data: bytes, src_rate: int, dst_rate: int,
channels: int = 1, bit_depth: int = 16) -> bytes:
"""重采样音频数据"""
if src_rate == dst_rate:
return audio_data
try:
import librosa
import soundfile as sf
# 计算样本数
if bit_depth == 32: # Float32
samples = len(audio_data) // 4
dtype = np.float32
else: # Int16
samples = len(audio_data) // 2
dtype = np.int16
# 重塑音频数据
if bit_depth == 32:
audio_array = np.frombuffer(audio_data, dtype=np.float32)
else:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, channels)
# 使用librosa进行重采样
resampled = librosa.resample(audio_array.T, orig_sr=src_rate, target_sr=dst_rate).T
# 转换回字节数据
return resampled.astype(dtype).tobytes()
except ImportError:
print("警告: librosa 未安装,跳过重采样")
return audio_data
except Exception as e:
print(f"重采样失败: {e}")
return audio_data
class AudioDeviceDetector:
"""音频设备检测器"""
def __init__(self):
self.platform_info = AudioConverter.get_platform_info()
self.is_raspberry_pi = self._is_raspberry_pi()
self.is_mac = self.platform_info['system'] == 'Darwin'
def _is_raspberry_pi(self) -> bool:
"""检测是否为树莓派"""
try:
with open('/proc/device-tree/model', 'r') as f:
model = f.read().lower()
return 'raspberry pi' in model
except:
return False
def detect_audio_devices(self) -> dict:
"""检测可用的音频设备"""
devices = {
'input_devices': [],
'output_devices': [],
'default_input': None,
'default_output': None,
'supported_formats': [],
'recommended_format': None
}
try:
import pyaudio
p = pyaudio.PyAudio()
# 检测支持的格式
test_formats = [
{'format': pyaudio.paFloat32, 'name': 'Float32'},
{'format': pyaudio.paInt16, 'name': 'Int16'},
{'format': pyaudio.paInt32, 'name': 'Int32'}
]
for fmt in test_formats:
try:
# 尝试创建一个测试流来检测格式支持
stream = p.open(
format=fmt['format'],
channels=1,
rate=24000,
output=True,
frames_per_buffer=1024
)
stream.close()
devices['supported_formats'].append(fmt['name'])
except:
pass
# 推荐格式
if 'Float32' in devices['supported_formats']:
devices['recommended_format'] = 'Float32'
elif 'Int16' in devices['supported_formats']:
devices['recommended_format'] = 'Int16'
else:
devices['recommended_format'] = 'Int16' # 默认使用Int16
# 获取设备信息
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
devices['input_devices'].append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxInputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
if device_info.get('isDefaultInput', False):
devices['default_input'] = i
if device_info['maxOutputChannels'] > 0:
devices['output_devices'].append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxOutputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
if device_info.get('isDefaultOutput', False):
devices['default_output'] = i
p.terminate()
except Exception as e:
print(f"音频设备检测失败: {e}")
# 使用默认配置
devices['supported_formats'] = ['Int16']
devices['recommended_format'] = 'Int16'
return devices
def get_platform_specific_config(self) -> dict:
"""获取平台特定的配置"""
config = {
'prefer_float32': False,
'fallback_to_aplay': False,
'recommended_sample_rate': 16000,
'recommended_channels': 1,
'recommended_bit_depth': 16
}
if self.is_mac:
config['prefer_float32'] = True
config['recommended_sample_rate'] = 24000
config['recommended_bit_depth'] = 32
elif self.is_raspberry_pi:
config['fallback_to_aplay'] = True
config['recommended_sample_rate'] = 16000
config['recommended_bit_depth'] = 16
return config
def print_device_info(self):
"""打印设备信息"""
devices = self.detect_audio_devices()
config = self.get_platform_specific_config()
print(f"=== 音频设备信息 ===")
print(f"平台: {self.platform_info['platform']}")
print(f"是否为树莓派: {self.is_raspberry_pi}")
print(f"是否为Mac: {self.is_mac}")
print()
print(f"支持的音频格式: {devices['supported_formats']}")
print(f"推荐格式: {devices['recommended_format']}")
print()
print("输出设备:")
for device in devices['output_devices']:
marker = " (默认)" if device['index'] == devices['default_output'] else ""
print(f" [{device['index']}] {device['name']}{marker}")
print(f" 通道数: {device['channels']}, 采样率: {device['sample_rate']}")
print()
print("平台特定配置:")
for key, value in config.items():
print(f" {key}: {value}")
class AudioPlayer:
"""音频播放器,支持多种播放方式"""
def __init__(self):
self.detector = AudioDeviceDetector()
self.converter = AudioConverter()
self.device_info = self.detector.detect_audio_devices()
self.config = self.detector.get_platform_specific_config()
def play_audio(self, audio_data: bytes, format_type: str = 'Float32',
sample_rate: int = 24000, channels: int = 1) -> bool:
"""播放音频数据"""
print(f"开始播放音频: 格式={format_type}, 采样率={sample_rate}Hz, 通道数={channels}")
# 尝试不同的播放方式
if self._try_pyaudio_play(audio_data, format_type, sample_rate, channels):
return True
if self.config['fallback_to_aplay'] and self._try_aplay_play(audio_data, format_type, sample_rate, channels):
return True
print("所有播放方式都失败了")
return False
def _try_pyaudio_play(self, audio_data: bytes, format_type: str,
sample_rate: int, channels: int) -> bool:
"""尝试使用pyaudio播放"""
try:
import pyaudio
# 转换格式
if format_type == 'Float32' and self.device_info['recommended_format'] != 'Float32':
print(f"转换格式: Float32 -> {self.device_info['recommended_format']}")
audio_data = self.converter.float32_to_int16(audio_data)
format_type = 'Int16'
sample_rate = self.config['recommended_sample_rate']
# 确定pyaudio格式
pyaudio_format = pyaudio.paFloat32 if format_type == 'Float32' else pyaudio.paInt16
# 创建音频流
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio_format,
channels=channels,
rate=sample_rate,
output=True,
frames_per_buffer=1024
)
# 播放音频
stream.write(audio_data)
stream.stop_stream()
stream.close()
p.terminate()
print("pyaudio播放成功")
return True
except Exception as e:
print(f"pyaudio播放失败: {e}")
return False
def _try_aplay_play(self, audio_data: bytes, format_type: str,
sample_rate: int, channels: int) -> bool:
"""尝试使用aplay播放"""
if not self.detector.is_raspberry_pi:
return False
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
temp_path = tmp_file.name
# 转换格式并保存为WAV文件
if format_type == 'Float32':
audio_data = self.converter.float32_to_int16(audio_data)
format_type = 'Int16'
# 保存为WAV文件
with wave.open(temp_path, 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(2) # 16-bit = 2 bytes
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data)
# 使用aplay播放
result = subprocess.run(['aplay', temp_path],
capture_output=True, text=True, timeout=30)
# 清理临时文件
os.unlink(temp_path)
if result.returncode == 0:
print("aplay播放成功")
return True
else:
print(f"aplay播放失败: {result.stderr}")
return False
except Exception as e:
print(f"aplay播放失败: {e}")
# 清理临时文件
try:
os.unlink(temp_path)
except:
pass
return False
# 测试函数
def test_audio_conversion():
"""测试音频转换功能"""
print("=== 音频转换测试 ===")
# 创建测试数据
test_float32 = struct.pack('4f', 0.5, -0.5, 1.0, -1.0)
# 测试转换
converter = AudioConverter()
# Float32 -> Int16
int16_data = converter.float32_to_int16(test_float32)
print(f"Float32 -> Int16 转换成功: {len(int16_data)} 字节")
# Int16 -> Float32
float32_data = converter.int16_to_float32(int16_data)
print(f"Int16 -> Float32 转换成功: {len(float32_data)} 字节")
# 设备检测
detector = AudioDeviceDetector()
detector.print_device_info()
if __name__ == "__main__":
test_audio_conversion()

View File

@ -13,6 +13,7 @@ from typing import Any, Dict, Optional
import config
import pyaudio
from realtime_dialog_client import RealtimeDialogClient
from audio_converter import AudioConverter, AudioDeviceDetector, AudioPlayer
@dataclass
@ -35,6 +36,62 @@ class AudioDeviceManager:
self.input_stream: Optional[pyaudio.Stream] = None
self.output_stream: Optional[pyaudio.Stream] = None
# 初始化音频转换工具
self.converter = AudioConverter()
self.detector = AudioDeviceDetector()
self.player = AudioPlayer()
# 获取设备信息
self.device_info = self.detector.detect_audio_devices()
self.platform_config = self.detector.get_platform_specific_config()
# 打印设备信息
print("=== 音频设备信息 ===")
self.detector.print_device_info()
print()
# 根据平台调整输出配置
self._adjust_output_config()
def _adjust_output_config(self):
"""根据平台能力调整输出配置"""
original_format = self.output_config.format
original_bit_size = self.output_config.bit_size
original_sample_rate = self.output_config.sample_rate
# 获取配置选项
prefer_native = config.audio_config.get('prefer_native_format', True)
fallback_int16 = config.audio_config.get('fallback_to_int16', True)
# 检查是否需要格式转换
needs_conversion = False
# 如果当前格式不被支持,且启用降级
if (self.output_config.bit_size == pyaudio.paFloat32 and
'Float32' not in self.device_info['supported_formats'] and
fallback_int16):
needs_conversion = True
# 如果优先使用原生格式,且平台推荐格式不是当前格式
elif (prefer_native and
self.device_info['recommended_format'] == 'Int16' and
self.output_config.bit_size == pyaudio.paFloat32):
print("提示: 优先使用平台原生格式Int16")
needs_conversion = True
if needs_conversion:
print(f"警告: 音频格式转换 {self.output_config.bit_size} -> Int16")
self.output_config.format = 'pcm'
self.output_config.bit_size = pyaudio.paInt16
self.output_config.sample_rate = self.platform_config['recommended_sample_rate']
print(f"输出配置调整:")
print(f" 格式: {original_format} -> {self.output_config.format}")
print(f" 比特深度: {32 if original_bit_size == pyaudio.paFloat32 else 16} -> 16")
print(f" 采样率: {original_sample_rate} -> {self.output_config.sample_rate}Hz")
else:
print(f"使用原始输出配置: 格式={original_format}, 比特深度={original_bit_size}, 采样率={original_sample_rate}Hz")
def open_input_stream(self) -> pyaudio.Stream:
"""打开音频输入流"""
# p = pyaudio.PyAudio()
@ -49,14 +106,86 @@ class AudioDeviceManager:
def open_output_stream(self) -> pyaudio.Stream:
"""打开音频输出流"""
self.output_stream = self.pyaudio.open(
format=self.output_config.bit_size,
channels=self.output_config.channels,
rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk
)
return self.output_stream
# 尝试使用默认输出设备
output_device_index = self.device_info.get('default_output')
try:
self.output_stream = self.pyaudio.open(
format=self.output_config.bit_size,
channels=self.output_config.channels,
rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk,
output_device_index=output_device_index
)
print(f"音频输出流已打开 - 设备: {output_device_index}")
return self.output_stream
except Exception as e:
print(f"打开音频输出流失败: {e}")
# 尝试使用其他设备
for device in self.device_info['output_devices']:
if device['index'] != output_device_index:
try:
self.output_stream = self.pyaudio.open(
format=self.output_config.bit_size,
channels=self.output_config.channels,
rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk,
output_device_index=device['index']
)
print(f"音频输出流已打开 - 备选设备: {device['index']}")
return self.output_stream
except Exception as e2:
print(f"设备 {device['index']} 失败: {e2}")
continue
# 如果所有设备都失败,抛出异常
raise Exception("无法打开任何音频输出设备")
def write_audio_data(self, audio_data: bytes) -> bool:
"""写入音频数据,支持格式转换和多种播放方式"""
# 如果需要格式转换
converted_data = audio_data
converted_format = None
# 检查是否需要从Float32转换为Int16
if (self.output_config.bit_size == pyaudio.paInt16 and
len(audio_data) % 4 == 0): # 可能是Float32数据
try:
# 检查是否为Float32数据通过尝试解析
import struct
test_sample = struct.unpack('f', audio_data[:4])[0]
if -1.0 <= test_sample <= 1.0: # 合理的Float32范围
print("检测到Float32数据转换为Int16格式")
converted_data = self.converter.float32_to_int16(audio_data)
converted_format = 'Int16'
except:
pass # 不是Float32数据不进行转换
# 尝试直接写入pyaudio流
try:
if self.output_stream:
self.output_stream.write(converted_data)
return True
except Exception as e:
print(f"pyaudio写入失败: {e}")
# 如果pyaudio失败使用备选播放方式
enable_aplay = config.audio_config.get('enable_aplay_fallback', True)
if enable_aplay and self.platform_config['fallback_to_aplay']:
print("尝试使用aplay播放...")
format_type = converted_format if converted_format else ('Float32' if self.output_config.bit_size == pyaudio.paFloat32 else 'Int16')
return self.player.play_audio(
audio_data=converted_data,
format_type=format_type,
sample_rate=self.output_config.sample_rate,
channels=self.output_config.channels
)
return False
def cleanup(self) -> None:
"""清理音频设备资源"""
@ -155,11 +284,12 @@ class DialogSession:
if was_not_playing:
print("播放开始前,额外发送静音数据清理管道")
for _ in range(3):
self.output_stream.write(b'\x00' * len(audio_data))
self.audio_device.write_audio_data(b'\x00' * len(audio_data))
time.sleep(0.1)
# 播放音频数据
self.output_stream.write(audio_data)
if not self.audio_device.write_audio_data(audio_data):
print("音频播放失败,但继续处理队列")
except queue.Empty:
# 队列为空,检查是否超时

View File

@ -3,6 +3,12 @@ import uuid
import pyaudio
# 配置信息
audio_config = {
"prefer_native_format": True, # 是否优先使用平台原生格式
"fallback_to_int16": True, # 是否在Float32不支持时降级到Int16
"enable_aplay_fallback": True, # 是否启用aplay作为备选播放方式
}
ws_connect_config = {
"base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue",
"headers": {

Binary file not shown.

Binary file not shown.