394 lines
14 KiB
Python
394 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
音频格式转换工具
|
|
提供跨平台音频格式转换功能
|
|
"""
|
|
|
|
import platform
|
|
import struct
|
|
import numpy as np
|
|
from typing import Tuple, Optional
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
import wave
|
|
|
|
|
|
class AudioConverter:
|
|
"""音频格式转换工具类"""
|
|
|
|
@staticmethod
|
|
def get_platform_info() -> dict:
|
|
"""获取平台信息"""
|
|
return {
|
|
'system': platform.system(),
|
|
'machine': platform.machine(),
|
|
'platform': platform.platform()
|
|
}
|
|
|
|
@staticmethod
|
|
def float32_to_int16(float32_data: bytes) -> bytes:
|
|
"""将Float32格式转换为Int16格式"""
|
|
if len(float32_data) % 4 != 0:
|
|
# 处理数据长度不是4的倍数的情况
|
|
float32_data = float32_data[:len(float32_data) - len(float32_data) % 4]
|
|
|
|
# 解包Float32数据
|
|
float_values = struct.unpack(f'{len(float32_data) // 4}f', float32_data)
|
|
|
|
# 转换为Int16范围
|
|
int16_values = []
|
|
for val in float_values:
|
|
# 限制在[-1.0, 1.0]范围内
|
|
val = max(-1.0, min(1.0, val))
|
|
# 转换为Int16
|
|
int16_val = int(val * 32767)
|
|
int16_values.append(int16_val)
|
|
|
|
# 打包为Int16字节数据
|
|
return struct.pack(f'{len(int16_values)}h', *int16_values)
|
|
|
|
@staticmethod
|
|
def int16_to_float32(int16_data: bytes) -> bytes:
|
|
"""将Int16格式转换为Float32格式"""
|
|
if len(int16_data) % 2 != 0:
|
|
# 处理数据长度不是2的倍数的情况
|
|
int16_data = int16_data[:len(int16_data) - len(int16_data) % 2]
|
|
|
|
# 解包Int16数据
|
|
int16_values = struct.unpack(f'{len(int16_data) // 2}h', int16_data)
|
|
|
|
# 转换为Float32
|
|
float_values = []
|
|
for val in int16_values:
|
|
# 转换为Float32范围
|
|
float_val = val / 32767.0
|
|
float_values.append(float_val)
|
|
|
|
# 打包为Float32字节数据
|
|
return struct.pack(f'{len(float_values)}f', *float_values)
|
|
|
|
@staticmethod
|
|
def resample_audio(audio_data: bytes, src_rate: int, dst_rate: int,
|
|
channels: int = 1, bit_depth: int = 16) -> bytes:
|
|
"""重采样音频数据"""
|
|
if src_rate == dst_rate:
|
|
return audio_data
|
|
|
|
try:
|
|
import librosa
|
|
import soundfile as sf
|
|
|
|
# 计算样本数
|
|
if bit_depth == 32: # Float32
|
|
samples = len(audio_data) // 4
|
|
dtype = np.float32
|
|
else: # Int16
|
|
samples = len(audio_data) // 2
|
|
dtype = np.int16
|
|
|
|
# 重塑音频数据
|
|
if bit_depth == 32:
|
|
audio_array = np.frombuffer(audio_data, dtype=np.float32)
|
|
else:
|
|
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
|
|
|
audio_array = audio_array.reshape(-1, channels)
|
|
|
|
# 使用librosa进行重采样
|
|
resampled = librosa.resample(audio_array.T, orig_sr=src_rate, target_sr=dst_rate).T
|
|
|
|
# 转换回字节数据
|
|
return resampled.astype(dtype).tobytes()
|
|
|
|
except ImportError:
|
|
print("警告: librosa 未安装,跳过重采样")
|
|
return audio_data
|
|
except Exception as e:
|
|
print(f"重采样失败: {e}")
|
|
return audio_data
|
|
|
|
|
|
class AudioDeviceDetector:
|
|
"""音频设备检测器"""
|
|
|
|
def __init__(self):
|
|
self.platform_info = AudioConverter.get_platform_info()
|
|
self.is_raspberry_pi = self._is_raspberry_pi()
|
|
self.is_mac = self.platform_info['system'] == 'Darwin'
|
|
|
|
def _is_raspberry_pi(self) -> bool:
|
|
"""检测是否为树莓派"""
|
|
try:
|
|
with open('/proc/device-tree/model', 'r') as f:
|
|
model = f.read().lower()
|
|
return 'raspberry pi' in model
|
|
except:
|
|
return False
|
|
|
|
def detect_audio_devices(self) -> dict:
|
|
"""检测可用的音频设备"""
|
|
devices = {
|
|
'input_devices': [],
|
|
'output_devices': [],
|
|
'default_input': None,
|
|
'default_output': None,
|
|
'supported_formats': [],
|
|
'recommended_format': None
|
|
}
|
|
|
|
try:
|
|
import pyaudio
|
|
|
|
p = pyaudio.PyAudio()
|
|
|
|
# 检测支持的格式
|
|
test_formats = [
|
|
{'format': pyaudio.paFloat32, 'name': 'Float32'},
|
|
{'format': pyaudio.paInt16, 'name': 'Int16'},
|
|
{'format': pyaudio.paInt32, 'name': 'Int32'}
|
|
]
|
|
|
|
for fmt in test_formats:
|
|
try:
|
|
# 尝试创建一个测试流来检测格式支持
|
|
stream = p.open(
|
|
format=fmt['format'],
|
|
channels=1,
|
|
rate=24000,
|
|
output=True,
|
|
frames_per_buffer=1024
|
|
)
|
|
stream.close()
|
|
devices['supported_formats'].append(fmt['name'])
|
|
except:
|
|
pass
|
|
|
|
# 推荐格式
|
|
if 'Float32' in devices['supported_formats']:
|
|
devices['recommended_format'] = 'Float32'
|
|
elif 'Int16' in devices['supported_formats']:
|
|
devices['recommended_format'] = 'Int16'
|
|
else:
|
|
devices['recommended_format'] = 'Int16' # 默认使用Int16
|
|
|
|
# 获取设备信息
|
|
for i in range(p.get_device_count()):
|
|
device_info = p.get_device_info_by_index(i)
|
|
|
|
if device_info['maxInputChannels'] > 0:
|
|
devices['input_devices'].append({
|
|
'index': i,
|
|
'name': device_info['name'],
|
|
'channels': device_info['maxInputChannels'],
|
|
'sample_rate': device_info['defaultSampleRate']
|
|
})
|
|
|
|
if device_info.get('isDefaultInput', False):
|
|
devices['default_input'] = i
|
|
|
|
if device_info['maxOutputChannels'] > 0:
|
|
devices['output_devices'].append({
|
|
'index': i,
|
|
'name': device_info['name'],
|
|
'channels': device_info['maxOutputChannels'],
|
|
'sample_rate': device_info['defaultSampleRate']
|
|
})
|
|
|
|
if device_info.get('isDefaultOutput', False):
|
|
devices['default_output'] = i
|
|
|
|
p.terminate()
|
|
|
|
except Exception as e:
|
|
print(f"音频设备检测失败: {e}")
|
|
# 使用默认配置
|
|
devices['supported_formats'] = ['Int16']
|
|
devices['recommended_format'] = 'Int16'
|
|
|
|
return devices
|
|
|
|
def get_platform_specific_config(self) -> dict:
|
|
"""获取平台特定的配置"""
|
|
config = {
|
|
'prefer_float32': False,
|
|
'fallback_to_aplay': False,
|
|
'recommended_sample_rate': 16000,
|
|
'recommended_channels': 1,
|
|
'recommended_bit_depth': 16
|
|
}
|
|
|
|
if self.is_mac:
|
|
config['prefer_float32'] = True
|
|
config['recommended_sample_rate'] = 24000
|
|
config['recommended_bit_depth'] = 32
|
|
elif self.is_raspberry_pi:
|
|
config['fallback_to_aplay'] = True
|
|
config['recommended_sample_rate'] = 16000
|
|
config['recommended_bit_depth'] = 16
|
|
|
|
return config
|
|
|
|
def print_device_info(self):
|
|
"""打印设备信息"""
|
|
devices = self.detect_audio_devices()
|
|
config = self.get_platform_specific_config()
|
|
|
|
print(f"=== 音频设备信息 ===")
|
|
print(f"平台: {self.platform_info['platform']}")
|
|
print(f"是否为树莓派: {self.is_raspberry_pi}")
|
|
print(f"是否为Mac: {self.is_mac}")
|
|
print()
|
|
|
|
print(f"支持的音频格式: {devices['supported_formats']}")
|
|
print(f"推荐格式: {devices['recommended_format']}")
|
|
print()
|
|
|
|
print("输出设备:")
|
|
for device in devices['output_devices']:
|
|
marker = " (默认)" if device['index'] == devices['default_output'] else ""
|
|
print(f" [{device['index']}] {device['name']}{marker}")
|
|
print(f" 通道数: {device['channels']}, 采样率: {device['sample_rate']}")
|
|
print()
|
|
|
|
print("平台特定配置:")
|
|
for key, value in config.items():
|
|
print(f" {key}: {value}")
|
|
|
|
|
|
class AudioPlayer:
|
|
"""音频播放器,支持多种播放方式"""
|
|
|
|
def __init__(self):
|
|
self.detector = AudioDeviceDetector()
|
|
self.converter = AudioConverter()
|
|
self.device_info = self.detector.detect_audio_devices()
|
|
self.config = self.detector.get_platform_specific_config()
|
|
|
|
def play_audio(self, audio_data: bytes, format_type: str = 'Float32',
|
|
sample_rate: int = 24000, channels: int = 1) -> bool:
|
|
"""播放音频数据"""
|
|
print(f"开始播放音频: 格式={format_type}, 采样率={sample_rate}Hz, 通道数={channels}")
|
|
|
|
# 尝试不同的播放方式
|
|
if self._try_pyaudio_play(audio_data, format_type, sample_rate, channels):
|
|
return True
|
|
|
|
if self.config['fallback_to_aplay'] and self._try_aplay_play(audio_data, format_type, sample_rate, channels):
|
|
return True
|
|
|
|
print("所有播放方式都失败了")
|
|
return False
|
|
|
|
def _try_pyaudio_play(self, audio_data: bytes, format_type: str,
|
|
sample_rate: int, channels: int) -> bool:
|
|
"""尝试使用pyaudio播放"""
|
|
try:
|
|
import pyaudio
|
|
|
|
# 转换格式
|
|
if format_type == 'Float32' and self.device_info['recommended_format'] != 'Float32':
|
|
print(f"转换格式: Float32 -> {self.device_info['recommended_format']}")
|
|
audio_data = self.converter.float32_to_int16(audio_data)
|
|
format_type = 'Int16'
|
|
sample_rate = self.config['recommended_sample_rate']
|
|
|
|
# 确定pyaudio格式
|
|
pyaudio_format = pyaudio.paFloat32 if format_type == 'Float32' else pyaudio.paInt16
|
|
|
|
# 创建音频流
|
|
p = pyaudio.PyAudio()
|
|
stream = p.open(
|
|
format=pyaudio_format,
|
|
channels=channels,
|
|
rate=sample_rate,
|
|
output=True,
|
|
frames_per_buffer=1024
|
|
)
|
|
|
|
# 播放音频
|
|
stream.write(audio_data)
|
|
stream.stop_stream()
|
|
stream.close()
|
|
p.terminate()
|
|
|
|
print("pyaudio播放成功")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"pyaudio播放失败: {e}")
|
|
return False
|
|
|
|
def _try_aplay_play(self, audio_data: bytes, format_type: str,
|
|
sample_rate: int, channels: int) -> bool:
|
|
"""尝试使用aplay播放"""
|
|
if not self.detector.is_raspberry_pi:
|
|
return False
|
|
|
|
try:
|
|
# 创建临时文件
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
|
|
temp_path = tmp_file.name
|
|
|
|
# 转换格式并保存为WAV文件
|
|
if format_type == 'Float32':
|
|
audio_data = self.converter.float32_to_int16(audio_data)
|
|
format_type = 'Int16'
|
|
|
|
# 保存为WAV文件
|
|
with wave.open(temp_path, 'wb') as wav_file:
|
|
wav_file.setnchannels(channels)
|
|
wav_file.setsampwidth(2) # 16-bit = 2 bytes
|
|
wav_file.setframerate(sample_rate)
|
|
wav_file.writeframes(audio_data)
|
|
|
|
# 使用aplay播放
|
|
result = subprocess.run(['aplay', temp_path],
|
|
capture_output=True, text=True, timeout=30)
|
|
|
|
# 清理临时文件
|
|
os.unlink(temp_path)
|
|
|
|
if result.returncode == 0:
|
|
print("aplay播放成功")
|
|
return True
|
|
else:
|
|
print(f"aplay播放失败: {result.stderr}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"aplay播放失败: {e}")
|
|
# 清理临时文件
|
|
try:
|
|
os.unlink(temp_path)
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
|
|
# 测试函数
|
|
def test_audio_conversion():
|
|
"""测试音频转换功能"""
|
|
print("=== 音频转换测试 ===")
|
|
|
|
# 创建测试数据
|
|
test_float32 = struct.pack('4f', 0.5, -0.5, 1.0, -1.0)
|
|
|
|
# 测试转换
|
|
converter = AudioConverter()
|
|
|
|
# Float32 -> Int16
|
|
int16_data = converter.float32_to_int16(test_float32)
|
|
print(f"Float32 -> Int16 转换成功: {len(int16_data)} 字节")
|
|
|
|
# Int16 -> Float32
|
|
float32_data = converter.int16_to_float32(int16_data)
|
|
print(f"Int16 -> Float32 转换成功: {len(float32_data)} 字节")
|
|
|
|
# 设备检测
|
|
detector = AudioDeviceDetector()
|
|
detector.print_device_info()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_audio_conversion() |