Local-Voice/doubao/audio_converter.py
2025-09-19 20:04:09 +08:00

424 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
音频格式转换工具
提供跨平台音频格式转换功能
"""
import platform
import struct
import numpy as np
from typing import Tuple, Optional
import subprocess
import tempfile
import os
import wave
class AudioConverter:
"""音频格式转换工具类"""
def __init__(self):
# 预分配转换缓冲区
self._conversion_cache = {}
self._numpy_available = self._check_numpy()
self._conversion_buffer = bytearray(65536) # 64KB预分配缓冲区
def _check_numpy(self) -> bool:
"""检查numpy是否可用"""
try:
import numpy as np
return True
except ImportError:
return False
@staticmethod
def get_platform_info() -> dict:
"""获取平台信息"""
return {
'system': platform.system(),
'machine': platform.machine(),
'platform': platform.platform()
}
def float32_to_int16_fast(self, float32_data: bytes) -> bytes:
"""高性能Float32到Int16转换"""
if len(float32_data) % 4 != 0:
float32_data = float32_data[:len(float32_data) - len(float32_data) % 4]
# 使用numpy进行快速转换如果可用
if self._numpy_available:
try:
import numpy as np
# 直接转换为numpy数组避免多次解包
float_array = np.frombuffer(float32_data, dtype=np.float32)
# 限制范围并转换
float_array = np.clip(float_array, -1.0, 1.0)
int16_array = (float_array * 32767).astype(np.int16)
return int16_array.tobytes()
except Exception as e:
print(f"numpy转换失败使用备用方法: {e}")
# 高效的纯Python实现
num_samples = len(float32_data) // 4
if num_samples == 0:
return b''
# 使用内存视图和struct模块进行高效转换
float_values = struct.unpack(f'{num_samples}f', float32_data)
# 使用列表推导式和生成器表达式提高性能
int16_values = [max(-32768, min(32767, int(val * 32767))) for val in float_values]
# 批量打包
return struct.pack(f'{len(int16_values)}h', *int16_values)
@staticmethod
def float32_to_int16(float32_data: bytes) -> bytes:
"""将Float32格式转换为Int16格式保持向后兼容"""
converter = AudioConverter()
return converter.float32_to_int16_fast(float32_data)
@staticmethod
def int16_to_float32(int16_data: bytes) -> bytes:
"""将Int16格式转换为Float32格式"""
if len(int16_data) % 2 != 0:
# 处理数据长度不是2的倍数的情况
int16_data = int16_data[:len(int16_data) - len(int16_data) % 2]
# 解包Int16数据
int16_values = struct.unpack(f'{len(int16_data) // 2}h', int16_data)
# 转换为Float32
float_values = []
for val in int16_values:
# 转换为Float32范围
float_val = val / 32767.0
float_values.append(float_val)
# 打包为Float32字节数据
return struct.pack(f'{len(float_values)}f', *float_values)
@staticmethod
def resample_audio(audio_data: bytes, src_rate: int, dst_rate: int,
channels: int = 1, bit_depth: int = 16) -> bytes:
"""重采样音频数据"""
if src_rate == dst_rate:
return audio_data
try:
import librosa
import soundfile as sf
# 计算样本数
if bit_depth == 32: # Float32
samples = len(audio_data) // 4
dtype = np.float32
else: # Int16
samples = len(audio_data) // 2
dtype = np.int16
# 重塑音频数据
if bit_depth == 32:
audio_array = np.frombuffer(audio_data, dtype=np.float32)
else:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, channels)
# 使用librosa进行重采样
resampled = librosa.resample(audio_array.T, orig_sr=src_rate, target_sr=dst_rate).T
# 转换回字节数据
return resampled.astype(dtype).tobytes()
except ImportError:
print("警告: librosa 未安装,跳过重采样")
return audio_data
except Exception as e:
print(f"重采样失败: {e}")
return audio_data
class AudioDeviceDetector:
"""音频设备检测器"""
def __init__(self):
self.platform_info = AudioConverter.get_platform_info()
self.is_raspberry_pi = self._is_raspberry_pi()
self.is_mac = self.platform_info['system'] == 'Darwin'
def _is_raspberry_pi(self) -> bool:
"""检测是否为树莓派"""
try:
with open('/proc/device-tree/model', 'r') as f:
model = f.read().lower()
return 'raspberry pi' in model
except:
return False
def detect_audio_devices(self) -> dict:
"""检测可用的音频设备"""
devices = {
'input_devices': [],
'output_devices': [],
'default_input': None,
'default_output': None,
'supported_formats': [],
'recommended_format': None
}
try:
import pyaudio
p = pyaudio.PyAudio()
# 检测支持的格式
test_formats = [
{'format': pyaudio.paFloat32, 'name': 'Float32'},
{'format': pyaudio.paInt16, 'name': 'Int16'},
{'format': pyaudio.paInt32, 'name': 'Int32'}
]
for fmt in test_formats:
try:
# 尝试创建一个测试流来检测格式支持
stream = p.open(
format=fmt['format'],
channels=1,
rate=24000,
output=True,
frames_per_buffer=1024
)
stream.close()
devices['supported_formats'].append(fmt['name'])
except:
pass
# 推荐格式
if 'Float32' in devices['supported_formats']:
devices['recommended_format'] = 'Float32'
elif 'Int16' in devices['supported_formats']:
devices['recommended_format'] = 'Int16'
else:
devices['recommended_format'] = 'Int16' # 默认使用Int16
# 获取设备信息
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
devices['input_devices'].append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxInputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
if device_info.get('isDefaultInput', False):
devices['default_input'] = i
if device_info['maxOutputChannels'] > 0:
devices['output_devices'].append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxOutputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
if device_info.get('isDefaultOutput', False):
devices['default_output'] = i
p.terminate()
except Exception as e:
print(f"音频设备检测失败: {e}")
# 使用默认配置
devices['supported_formats'] = ['Int16']
devices['recommended_format'] = 'Int16'
return devices
def get_platform_specific_config(self) -> dict:
"""获取平台特定的配置"""
config = {
'prefer_float32': False,
'fallback_to_aplay': False,
'recommended_sample_rate': 16000,
'recommended_channels': 1,
'recommended_bit_depth': 16
}
if self.is_mac:
config['prefer_float32'] = True
config['recommended_sample_rate'] = 24000
config['recommended_bit_depth'] = 32
elif self.is_raspberry_pi:
config['fallback_to_aplay'] = True
config['recommended_sample_rate'] = 16000
config['recommended_bit_depth'] = 16
return config
def print_device_info(self):
"""打印设备信息"""
devices = self.detect_audio_devices()
config = self.get_platform_specific_config()
print(f"=== 音频设备信息 ===")
print(f"平台: {self.platform_info['platform']}")
print(f"是否为树莓派: {self.is_raspberry_pi}")
print(f"是否为Mac: {self.is_mac}")
print()
print(f"支持的音频格式: {devices['supported_formats']}")
print(f"推荐格式: {devices['recommended_format']}")
print()
print("输出设备:")
for device in devices['output_devices']:
marker = " (默认)" if device['index'] == devices['default_output'] else ""
print(f" [{device['index']}] {device['name']}{marker}")
print(f" 通道数: {device['channels']}, 采样率: {device['sample_rate']}")
print()
print("平台特定配置:")
for key, value in config.items():
print(f" {key}: {value}")
class AudioPlayer:
"""音频播放器,支持多种播放方式"""
def __init__(self):
self.detector = AudioDeviceDetector()
self.converter = AudioConverter()
self.device_info = self.detector.detect_audio_devices()
self.config = self.detector.get_platform_specific_config()
def play_audio(self, audio_data: bytes, format_type: str = 'Float32',
sample_rate: int = 24000, channels: int = 1) -> bool:
"""播放音频数据"""
print(f"开始播放音频: 格式={format_type}, 采样率={sample_rate}Hz, 通道数={channels}")
# 尝试不同的播放方式
if self._try_pyaudio_play(audio_data, format_type, sample_rate, channels):
return True
if self.config['fallback_to_aplay'] and self._try_aplay_play(audio_data, format_type, sample_rate, channels):
return True
print("所有播放方式都失败了")
return False
def _try_pyaudio_play(self, audio_data: bytes, format_type: str,
sample_rate: int, channels: int) -> bool:
"""尝试使用pyaudio播放"""
try:
import pyaudio
# 转换格式
if format_type == 'Float32' and self.device_info['recommended_format'] != 'Float32':
print(f"转换格式: Float32 -> {self.device_info['recommended_format']}")
audio_data = self.converter.float32_to_int16(audio_data)
format_type = 'Int16'
sample_rate = self.config['recommended_sample_rate']
# 确定pyaudio格式
pyaudio_format = pyaudio.paFloat32 if format_type == 'Float32' else pyaudio.paInt16
# 创建音频流
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio_format,
channels=channels,
rate=sample_rate,
output=True,
frames_per_buffer=1024
)
# 播放音频
stream.write(audio_data)
stream.stop_stream()
stream.close()
p.terminate()
print("pyaudio播放成功")
return True
except Exception as e:
print(f"pyaudio播放失败: {e}")
return False
def _try_aplay_play(self, audio_data: bytes, format_type: str,
sample_rate: int, channels: int) -> bool:
"""尝试使用aplay播放"""
if not self.detector.is_raspberry_pi:
return False
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
temp_path = tmp_file.name
# 转换格式并保存为WAV文件
if format_type == 'Float32':
audio_data = self.converter.float32_to_int16(audio_data)
format_type = 'Int16'
# 保存为WAV文件
with wave.open(temp_path, 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(2) # 16-bit = 2 bytes
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data)
# 使用aplay播放
result = subprocess.run(['aplay', temp_path],
capture_output=True, text=True, timeout=30)
# 清理临时文件
os.unlink(temp_path)
if result.returncode == 0:
print("aplay播放成功")
return True
else:
print(f"aplay播放失败: {result.stderr}")
return False
except Exception as e:
print(f"aplay播放失败: {e}")
# 清理临时文件
try:
os.unlink(temp_path)
except:
pass
return False
# 测试函数
def test_audio_conversion():
"""测试音频转换功能"""
print("=== 音频转换测试 ===")
# 创建测试数据
test_float32 = struct.pack('4f', 0.5, -0.5, 1.0, -1.0)
# 测试转换
converter = AudioConverter()
# Float32 -> Int16
int16_data = converter.float32_to_int16(test_float32)
print(f"Float32 -> Int16 转换成功: {len(int16_data)} 字节")
# Int16 -> Float32
float32_data = converter.int16_to_float32(int16_data)
print(f"Int16 -> Float32 转换成功: {len(float32_data)} 字节")
# 设备检测
detector = AudioDeviceDetector()
detector.print_device_info()
if __name__ == "__main__":
test_audio_conversion()