Compare commits

...

3 Commits

Author SHA1 Message Date
朱潮
93a0b0a446 audio 2025-09-19 20:04:09 +08:00
朱潮
1bb6a32dc2 audio 2025-09-19 20:01:03 +08:00
朱潮
1e0fd6e234 fix audio 2025-09-19 19:56:53 +08:00
9 changed files with 706 additions and 15 deletions

Binary file not shown.

424
doubao/audio_converter.py Normal file
View File

@ -0,0 +1,424 @@
#!/usr/bin/env python3
"""
音频格式转换工具
提供跨平台音频格式转换功能
"""
import platform
import struct
import numpy as np
from typing import Tuple, Optional
import subprocess
import tempfile
import os
import wave
class AudioConverter:
"""音频格式转换工具类"""
def __init__(self):
# 预分配转换缓冲区
self._conversion_cache = {}
self._numpy_available = self._check_numpy()
self._conversion_buffer = bytearray(65536) # 64KB预分配缓冲区
def _check_numpy(self) -> bool:
"""检查numpy是否可用"""
try:
import numpy as np
return True
except ImportError:
return False
@staticmethod
def get_platform_info() -> dict:
"""获取平台信息"""
return {
'system': platform.system(),
'machine': platform.machine(),
'platform': platform.platform()
}
def float32_to_int16_fast(self, float32_data: bytes) -> bytes:
"""高性能Float32到Int16转换"""
if len(float32_data) % 4 != 0:
float32_data = float32_data[:len(float32_data) - len(float32_data) % 4]
# 使用numpy进行快速转换如果可用
if self._numpy_available:
try:
import numpy as np
# 直接转换为numpy数组避免多次解包
float_array = np.frombuffer(float32_data, dtype=np.float32)
# 限制范围并转换
float_array = np.clip(float_array, -1.0, 1.0)
int16_array = (float_array * 32767).astype(np.int16)
return int16_array.tobytes()
except Exception as e:
print(f"numpy转换失败使用备用方法: {e}")
# 高效的纯Python实现
num_samples = len(float32_data) // 4
if num_samples == 0:
return b''
# 使用内存视图和struct模块进行高效转换
float_values = struct.unpack(f'{num_samples}f', float32_data)
# 使用列表推导式和生成器表达式提高性能
int16_values = [max(-32768, min(32767, int(val * 32767))) for val in float_values]
# 批量打包
return struct.pack(f'{len(int16_values)}h', *int16_values)
@staticmethod
def float32_to_int16(float32_data: bytes) -> bytes:
"""将Float32格式转换为Int16格式保持向后兼容"""
converter = AudioConverter()
return converter.float32_to_int16_fast(float32_data)
@staticmethod
def int16_to_float32(int16_data: bytes) -> bytes:
"""将Int16格式转换为Float32格式"""
if len(int16_data) % 2 != 0:
# 处理数据长度不是2的倍数的情况
int16_data = int16_data[:len(int16_data) - len(int16_data) % 2]
# 解包Int16数据
int16_values = struct.unpack(f'{len(int16_data) // 2}h', int16_data)
# 转换为Float32
float_values = []
for val in int16_values:
# 转换为Float32范围
float_val = val / 32767.0
float_values.append(float_val)
# 打包为Float32字节数据
return struct.pack(f'{len(float_values)}f', *float_values)
@staticmethod
def resample_audio(audio_data: bytes, src_rate: int, dst_rate: int,
channels: int = 1, bit_depth: int = 16) -> bytes:
"""重采样音频数据"""
if src_rate == dst_rate:
return audio_data
try:
import librosa
import soundfile as sf
# 计算样本数
if bit_depth == 32: # Float32
samples = len(audio_data) // 4
dtype = np.float32
else: # Int16
samples = len(audio_data) // 2
dtype = np.int16
# 重塑音频数据
if bit_depth == 32:
audio_array = np.frombuffer(audio_data, dtype=np.float32)
else:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, channels)
# 使用librosa进行重采样
resampled = librosa.resample(audio_array.T, orig_sr=src_rate, target_sr=dst_rate).T
# 转换回字节数据
return resampled.astype(dtype).tobytes()
except ImportError:
print("警告: librosa 未安装,跳过重采样")
return audio_data
except Exception as e:
print(f"重采样失败: {e}")
return audio_data
class AudioDeviceDetector:
"""音频设备检测器"""
def __init__(self):
self.platform_info = AudioConverter.get_platform_info()
self.is_raspberry_pi = self._is_raspberry_pi()
self.is_mac = self.platform_info['system'] == 'Darwin'
def _is_raspberry_pi(self) -> bool:
"""检测是否为树莓派"""
try:
with open('/proc/device-tree/model', 'r') as f:
model = f.read().lower()
return 'raspberry pi' in model
except:
return False
def detect_audio_devices(self) -> dict:
"""检测可用的音频设备"""
devices = {
'input_devices': [],
'output_devices': [],
'default_input': None,
'default_output': None,
'supported_formats': [],
'recommended_format': None
}
try:
import pyaudio
p = pyaudio.PyAudio()
# 检测支持的格式
test_formats = [
{'format': pyaudio.paFloat32, 'name': 'Float32'},
{'format': pyaudio.paInt16, 'name': 'Int16'},
{'format': pyaudio.paInt32, 'name': 'Int32'}
]
for fmt in test_formats:
try:
# 尝试创建一个测试流来检测格式支持
stream = p.open(
format=fmt['format'],
channels=1,
rate=24000,
output=True,
frames_per_buffer=1024
)
stream.close()
devices['supported_formats'].append(fmt['name'])
except:
pass
# 推荐格式
if 'Float32' in devices['supported_formats']:
devices['recommended_format'] = 'Float32'
elif 'Int16' in devices['supported_formats']:
devices['recommended_format'] = 'Int16'
else:
devices['recommended_format'] = 'Int16' # 默认使用Int16
# 获取设备信息
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
devices['input_devices'].append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxInputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
if device_info.get('isDefaultInput', False):
devices['default_input'] = i
if device_info['maxOutputChannels'] > 0:
devices['output_devices'].append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxOutputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
if device_info.get('isDefaultOutput', False):
devices['default_output'] = i
p.terminate()
except Exception as e:
print(f"音频设备检测失败: {e}")
# 使用默认配置
devices['supported_formats'] = ['Int16']
devices['recommended_format'] = 'Int16'
return devices
def get_platform_specific_config(self) -> dict:
"""获取平台特定的配置"""
config = {
'prefer_float32': False,
'fallback_to_aplay': False,
'recommended_sample_rate': 16000,
'recommended_channels': 1,
'recommended_bit_depth': 16
}
if self.is_mac:
config['prefer_float32'] = True
config['recommended_sample_rate'] = 24000
config['recommended_bit_depth'] = 32
elif self.is_raspberry_pi:
config['fallback_to_aplay'] = True
config['recommended_sample_rate'] = 16000
config['recommended_bit_depth'] = 16
return config
def print_device_info(self):
"""打印设备信息"""
devices = self.detect_audio_devices()
config = self.get_platform_specific_config()
print(f"=== 音频设备信息 ===")
print(f"平台: {self.platform_info['platform']}")
print(f"是否为树莓派: {self.is_raspberry_pi}")
print(f"是否为Mac: {self.is_mac}")
print()
print(f"支持的音频格式: {devices['supported_formats']}")
print(f"推荐格式: {devices['recommended_format']}")
print()
print("输出设备:")
for device in devices['output_devices']:
marker = " (默认)" if device['index'] == devices['default_output'] else ""
print(f" [{device['index']}] {device['name']}{marker}")
print(f" 通道数: {device['channels']}, 采样率: {device['sample_rate']}")
print()
print("平台特定配置:")
for key, value in config.items():
print(f" {key}: {value}")
class AudioPlayer:
"""音频播放器,支持多种播放方式"""
def __init__(self):
self.detector = AudioDeviceDetector()
self.converter = AudioConverter()
self.device_info = self.detector.detect_audio_devices()
self.config = self.detector.get_platform_specific_config()
def play_audio(self, audio_data: bytes, format_type: str = 'Float32',
sample_rate: int = 24000, channels: int = 1) -> bool:
"""播放音频数据"""
print(f"开始播放音频: 格式={format_type}, 采样率={sample_rate}Hz, 通道数={channels}")
# 尝试不同的播放方式
if self._try_pyaudio_play(audio_data, format_type, sample_rate, channels):
return True
if self.config['fallback_to_aplay'] and self._try_aplay_play(audio_data, format_type, sample_rate, channels):
return True
print("所有播放方式都失败了")
return False
def _try_pyaudio_play(self, audio_data: bytes, format_type: str,
sample_rate: int, channels: int) -> bool:
"""尝试使用pyaudio播放"""
try:
import pyaudio
# 转换格式
if format_type == 'Float32' and self.device_info['recommended_format'] != 'Float32':
print(f"转换格式: Float32 -> {self.device_info['recommended_format']}")
audio_data = self.converter.float32_to_int16(audio_data)
format_type = 'Int16'
sample_rate = self.config['recommended_sample_rate']
# 确定pyaudio格式
pyaudio_format = pyaudio.paFloat32 if format_type == 'Float32' else pyaudio.paInt16
# 创建音频流
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio_format,
channels=channels,
rate=sample_rate,
output=True,
frames_per_buffer=1024
)
# 播放音频
stream.write(audio_data)
stream.stop_stream()
stream.close()
p.terminate()
print("pyaudio播放成功")
return True
except Exception as e:
print(f"pyaudio播放失败: {e}")
return False
def _try_aplay_play(self, audio_data: bytes, format_type: str,
sample_rate: int, channels: int) -> bool:
"""尝试使用aplay播放"""
if not self.detector.is_raspberry_pi:
return False
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
temp_path = tmp_file.name
# 转换格式并保存为WAV文件
if format_type == 'Float32':
audio_data = self.converter.float32_to_int16(audio_data)
format_type = 'Int16'
# 保存为WAV文件
with wave.open(temp_path, 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(2) # 16-bit = 2 bytes
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data)
# 使用aplay播放
result = subprocess.run(['aplay', temp_path],
capture_output=True, text=True, timeout=30)
# 清理临时文件
os.unlink(temp_path)
if result.returncode == 0:
print("aplay播放成功")
return True
else:
print(f"aplay播放失败: {result.stderr}")
return False
except Exception as e:
print(f"aplay播放失败: {e}")
# 清理临时文件
try:
os.unlink(temp_path)
except:
pass
return False
# 测试函数
def test_audio_conversion():
"""测试音频转换功能"""
print("=== 音频转换测试 ===")
# 创建测试数据
test_float32 = struct.pack('4f', 0.5, -0.5, 1.0, -1.0)
# 测试转换
converter = AudioConverter()
# Float32 -> Int16
int16_data = converter.float32_to_int16(test_float32)
print(f"Float32 -> Int16 转换成功: {len(int16_data)} 字节")
# Int16 -> Float32
float32_data = converter.int16_to_float32(int16_data)
print(f"Int16 -> Float32 转换成功: {len(float32_data)} 字节")
# 设备检测
detector = AudioDeviceDetector()
detector.print_device_info()
if __name__ == "__main__":
test_audio_conversion()

View File

@ -13,6 +13,7 @@ from typing import Any, Dict, Optional
import config import config
import pyaudio import pyaudio
from realtime_dialog_client import RealtimeDialogClient from realtime_dialog_client import RealtimeDialogClient
from audio_converter import AudioConverter, AudioDeviceDetector, AudioPlayer
@dataclass @dataclass
@ -34,6 +35,63 @@ class AudioDeviceManager:
self.pyaudio = pyaudio.PyAudio() self.pyaudio = pyaudio.PyAudio()
self.input_stream: Optional[pyaudio.Stream] = None self.input_stream: Optional[pyaudio.Stream] = None
self.output_stream: Optional[pyaudio.Stream] = None self.output_stream: Optional[pyaudio.Stream] = None
# 初始化音频转换工具
self.converter = AudioConverter()
self.detector = AudioDeviceDetector()
self.player = AudioPlayer()
# 获取设备信息
self.device_info = self.detector.detect_audio_devices()
self.platform_config = self.detector.get_platform_specific_config()
# 打印设备信息
print("=== 音频设备信息 ===")
self.detector.print_device_info()
print()
# 根据平台调整输出配置
self._adjust_output_config()
def _adjust_output_config(self):
"""根据平台能力调整输出配置"""
original_format = self.output_config.format
original_bit_size = self.output_config.bit_size
original_sample_rate = self.output_config.sample_rate
# 获取配置选项
prefer_native = config.audio_config.get('prefer_native_format', True)
fallback_int16 = config.audio_config.get('fallback_to_int16', True)
# 检查是否需要格式转换
needs_conversion = False
# 关键检查:如果当前格式不被支持,必须转换
if (self.output_config.bit_size == pyaudio.paFloat32 and
'Float32' not in self.device_info['supported_formats']):
if fallback_int16:
needs_conversion = True
print(f"检测到平台不支持Float32格式将进行实时转换")
else:
print(f"警告: 平台不支持Float32格式但转换已禁用")
# 如果优先使用原生格式,且平台推荐格式不是当前格式
elif (prefer_native and
self.device_info['recommended_format'] == 'Int16' and
self.output_config.bit_size == pyaudio.paFloat32):
print("提示: 优先使用平台原生格式Int16")
needs_conversion = True
if needs_conversion:
print(f"将使用实时格式转换: Float32 -> Int16")
print(f" 保持原始采样率: {original_sample_rate}Hz")
print(f" 转换将在播放时进行")
# 注意:这里不修改配置,而是在播放时进行转换
self.needs_conversion = True
else:
print(f"使用原始输出配置: 格式={original_format}, 比特深度={original_bit_size}, 采样率={original_sample_rate}Hz")
self.needs_conversion = False
def open_input_stream(self) -> pyaudio.Stream: def open_input_stream(self) -> pyaudio.Stream:
"""打开音频输入流""" """打开音频输入流"""
@ -49,14 +107,90 @@ class AudioDeviceManager:
def open_output_stream(self) -> pyaudio.Stream: def open_output_stream(self) -> pyaudio.Stream:
"""打开音频输出流""" """打开音频输出流"""
self.output_stream = self.pyaudio.open( # 尝试使用默认输出设备
format=self.output_config.bit_size, output_device_index = self.device_info.get('default_output')
channels=self.output_config.channels,
rate=self.output_config.sample_rate, try:
output=True, self.output_stream = self.pyaudio.open(
frames_per_buffer=self.output_config.chunk format=self.output_config.bit_size,
) channels=self.output_config.channels,
return self.output_stream rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk,
output_device_index=output_device_index
)
print(f"音频输出流已打开 - 设备: {output_device_index}")
return self.output_stream
except Exception as e:
print(f"打开音频输出流失败: {e}")
# 尝试使用其他设备
for device in self.device_info['output_devices']:
if device['index'] != output_device_index:
try:
self.output_stream = self.pyaudio.open(
format=self.output_config.bit_size,
channels=self.output_config.channels,
rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk,
output_device_index=device['index']
)
print(f"音频输出流已打开 - 备选设备: {device['index']}")
return self.output_stream
except Exception as e2:
print(f"设备 {device['index']} 失败: {e2}")
continue
# 如果所有设备都失败,抛出异常
raise Exception("无法打开任何音频输出设备")
def write_audio_data(self, audio_data: bytes) -> bool:
"""写入音频数据,高性能版本"""
# 预缓冲:对于较小的音频数据,等待积累到一定大小再播放
min_buffer_size = 2048 # 最小缓冲大小
if not hasattr(self, '_audio_buffer'):
self._audio_buffer = b''
# 累积音频数据
self._audio_buffer += audio_data
# 如果缓冲区足够大或者数据包较大,直接播放
if len(self._audio_buffer) >= min_buffer_size or len(audio_data) > min_buffer_size:
buffer_to_play = self._audio_buffer
self._audio_buffer = b''
else:
# 继续等待更多数据
return True
# 根据设备能力决定是否需要格式转换
final_data = buffer_to_play
# 如果需要格式转换,进行高效转换
if hasattr(self, 'needs_conversion') and self.needs_conversion:
# 使用快速转换方法
final_data = self.converter.float32_to_int16_fast(buffer_to_play)
# 尝试写入pyaudio流
try:
if self.output_stream:
self.output_stream.write(final_data)
return True
except Exception as e:
print(f"pyaudio写入失败: {e}")
# 如果pyaudio失败使用备选播放方式
enable_aplay = config.audio_config.get('enable_aplay_fallback', True)
if enable_aplay and self.platform_config['fallback_to_aplay']:
print("尝试使用aplay播放...")
return self.player.play_audio(
audio_data=final_data,
format_type='Int16', # 转换后的数据
sample_rate=self.output_config.sample_rate,
channels=self.output_config.channels
)
return False
def cleanup(self) -> None: def cleanup(self) -> None:
"""清理音频设备资源""" """清理音频设备资源"""
@ -128,9 +262,9 @@ class DialogSession:
self.player_thread.start() self.player_thread.start()
def _audio_player_thread(self): def _audio_player_thread(self):
"""音频播放线程""" """音频播放线程 - 性能优化版本"""
audio_playing_timeout = 1.0 # 1秒没有音频数据认为播放结束 audio_playing_timeout = 0.5 # 0.5秒没有音频数据认为播放结束
queue_check_interval = 0.1 # 每100ms检查一次队列状态 queue_check_interval = 0.05 # 每50ms检查一次队列状态更低的延迟
while self.is_playing: while self.is_playing:
try: try:
@ -155,11 +289,12 @@ class DialogSession:
if was_not_playing: if was_not_playing:
print("播放开始前,额外发送静音数据清理管道") print("播放开始前,额外发送静音数据清理管道")
for _ in range(3): for _ in range(3):
self.output_stream.write(b'\x00' * len(audio_data)) self.audio_device.write_audio_data(b'\x00' * len(audio_data))
time.sleep(0.1) time.sleep(0.1)
# 播放音频数据 # 播放音频数据
self.output_stream.write(audio_data) if not self.audio_device.write_audio_data(audio_data):
print("音频播放失败,但继续处理队列")
except queue.Empty: except queue.Empty:
# 队列为空,检查是否超时 # 队列为空,检查是否超时
@ -177,6 +312,14 @@ class DialogSession:
self.say_hello_completed = True self.say_hello_completed = True
print("say hello 音频播放完成") print("say hello 音频播放完成")
print("音频播放超时,恢复录音") print("音频播放超时,恢复录音")
# 刷新音频缓冲区
if hasattr(self.audio_device, '_audio_buffer') and self.audio_device._audio_buffer:
print("刷新剩余音频缓冲区")
remaining_buffer = self.audio_device._audio_buffer
self.audio_device._audio_buffer = b''
self.audio_device.write_audio_data(remaining_buffer)
# 直接发送静音数据,而不是在协程中发送 # 直接发送静音数据,而不是在协程中发送
try: try:
silence_data = b'\x00' * config.input_audio_config["chunk"] silence_data = b'\x00' * config.input_audio_config["chunk"]

View File

@ -3,6 +3,12 @@ import uuid
import pyaudio import pyaudio
# 配置信息 # 配置信息
audio_config = {
"prefer_native_format": True, # 是否优先使用平台原生格式
"fallback_to_int16": True, # 是否在Float32不支持时降级到Int16
"enable_aplay_fallback": True, # 是否启用aplay作为备选播放方式
}
ws_connect_config = { ws_connect_config = {
"base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue", "base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue",
"headers": { "headers": {
@ -52,9 +58,9 @@ input_audio_config = {
} }
output_audio_config = { output_audio_config = {
"chunk": 3200, "chunk": 4096, # 增加缓冲区大小
"format": "pcm", "format": "pcm",
"channels": 1, "channels": 1,
"sample_rate": 24000, "sample_rate": 24000,
"bit_size": pyaudio.paFloat32, "bit_size": pyaudio.paFloat32, # 服务器返回的是Float32格式
} }

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
音频转换测试脚本
用于测试高性能音频格式转换
"""
import time
import struct
from audio_converter import AudioConverter
def test_conversion_performance():
"""测试转换性能"""
print("=== 音频转换性能测试 ===")
converter = AudioConverter()
# 生成测试数据1秒的24kHz Float32音频
sample_rate = 24000
duration = 1.0 # 1秒
num_samples = int(sample_rate * duration)
# 生成正弦波测试数据
test_data = bytearray()
for i in range(num_samples):
# 生成440Hz正弦波
value = 0.5 * (i / sample_rate * 440 * 2 * 3.14159)
sample = (value).astype('float32') if hasattr(value, 'astype') else float(value)
test_data.extend(struct.pack('f', sample))
test_data = bytes(test_data)
print(f"生成了 {len(test_data)} 字节的测试数据")
# 测试转换性能
start_time = time.time()
converted_data = converter.float32_to_int16_fast(test_data)
end_time = time.time()
conversion_time = end_time - start_time
data_ratio = len(converted_data) / len(test_data)
print(f"转换结果:")
print(f" 原始数据: {len(test_data)} 字节")
print(f" 转换后: {len(converted_data)} 字节")
print(f" 数据比例: {data_ratio:.2f}")
print(f" 转换时间: {conversion_time:.4f}")
print(f" 转换速度: {len(test_data) / conversion_time / 1024:.1f} KB/s")
# 验证转换质量
print("\n=== 转换质量验证 ===")
# 检查一些样本值
original_samples = struct.unpack('10f', test_data[:40])
converted_samples = struct.unpack('10h', converted_data[:20])
print("前10个样本的转换结果:")
for i, (orig, conv) in enumerate(zip(original_samples, converted_samples)):
expected = int(orig * 32767)
print(f" 样本{i}: {orig:.6f} -> {conv} (期望: {expected})")
# 检查是否有明显错误
errors = 0
for orig, conv in zip(original_samples, converted_samples):
expected = int(orig * 32767)
if abs(conv - expected) > 1: # 允许1的误差
errors += 1
if errors == 0:
print("✓ 转换质量验证通过")
else:
print(f"✗ 转换质量验证失败,{errors}个样本有误差")
def test_numpy_vs_python():
"""测试numpy和纯Python实现的性能差异"""
print("\n=== NumPy vs Python 性能对比 ===")
converter = AudioConverter()
# 生成较大的测试数据
sample_rate = 24000
duration = 2.0 # 2秒
num_samples = int(sample_rate * duration)
# 生成测试数据
import random
test_data = bytearray()
for _ in range(num_samples):
sample = random.uniform(-1.0, 1.0)
test_data.extend(struct.pack('f', sample))
test_data = bytes(test_data)
# 测试NumPy版本
if converter._numpy_available:
print("测试NumPy版本...")
start_time = time.time()
for _ in range(10): # 重复10次
converter.float32_to_int16_fast(test_data)
numpy_time = time.time() - start_time
print(f"NumPy版本: {numpy_time:.4f} 秒 (10次)")
else:
print("NumPy不可用")
numpy_time = None
# 测试纯Python版本
print("测试纯Python版本...")
start_time = time.time()
for _ in range(10): # 重复10次
AudioConverter.float32_to_int16(test_data)
python_time = time.time() - start_time
print(f"纯Python版本: {python_time:.4f} 秒 (10次)")
if numpy_time:
speedup = python_time / numpy_time
print(f"NumPy加速比: {speedup:.2f}x")
if __name__ == "__main__":
test_conversion_performance()
test_numpy_vs_python()