Compare commits

...

7 Commits

Author SHA1 Message Date
朱潮
7eff24a175 fix audio 2025-09-19 20:58:35 +08:00
朱潮
3958d2ff81 fix audio 2025-09-19 20:49:20 +08:00
朱潮
bc1dd7f03f fix audio 2025-09-19 20:47:18 +08:00
朱潮
e4bcce4946 fix audio 2025-09-19 20:44:35 +08:00
朱潮
d5f2957984 fix audio 2025-09-19 20:42:44 +08:00
朱潮
e4503e2d1a config 2025-09-19 20:28:43 +08:00
朱潮
38d015d3f2 fix audio 2025-09-19 20:16:39 +08:00
10 changed files with 664 additions and 44 deletions

View File

@ -11,7 +11,8 @@ from dataclasses import dataclass
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import config import config
import pyaudio import sounddevice as sd
import numpy as np
from realtime_dialog_client import RealtimeDialogClient from realtime_dialog_client import RealtimeDialogClient
@ -19,7 +20,7 @@ from realtime_dialog_client import RealtimeDialogClient
class AudioConfig: class AudioConfig:
"""音频配置数据类""" """音频配置数据类"""
format: str format: str
bit_size: int bit_size: str # 改为字符串类型
channels: int channels: int
sample_rate: int sample_rate: int
chunk: int chunk: int
@ -31,40 +32,263 @@ class AudioDeviceManager:
def __init__(self, input_config: AudioConfig, output_config: AudioConfig): def __init__(self, input_config: AudioConfig, output_config: AudioConfig):
self.input_config = input_config self.input_config = input_config
self.output_config = output_config self.output_config = output_config
self.pyaudio = pyaudio.PyAudio() self.input_stream = None
self.input_stream: Optional[pyaudio.Stream] = None self.output_stream = None
self.output_stream: Optional[pyaudio.Stream] = None self.audio_queue = None
self.playback_queue = None # 播放队列
self.recording = False
self.playing = False
def open_input_stream(self) -> pyaudio.Stream: # 预缓冲机制
self.pre_buffer = []
self.pre_buffer_size = 5 # 预缓冲5个音频块
self.buffer_threshold = 3 # 缓冲阈值,低于此值开始预缓冲
# 静音检测和回声消除
self.silence_threshold = 500 # 静音阈值
self.echo_suppression_enabled = True
self.last_audio_level = 0
self.audio_level_history = []
def open_input_stream(self):
"""打开音频输入流""" """打开音频输入流"""
# p = pyaudio.PyAudio() try:
self.input_stream = self.pyaudio.open( import queue
format=self.input_config.bit_size, self.audio_queue = queue.Queue(maxsize=100) # 增大队列大小,提供更多缓冲
channels=self.input_config.channels,
rate=self.input_config.sample_rate,
input=True,
frames_per_buffer=self.input_config.chunk
)
return self.input_stream
def open_output_stream(self) -> pyaudio.Stream: def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if self.recording and self.audio_queue:
try:
# 将numpy数组转换为字节数据
audio_bytes = indata.tobytes()
# 添加音频数据预处理,提高质量
if hasattr(self, '_audio_processor'):
audio_bytes = self._audio_processor(audio_bytes)
self.audio_queue.put_nowait(audio_bytes)
except queue.Full:
pass # 静默丢弃,避免阻塞
self.input_stream = sd.InputStream(
samplerate=self.input_config.sample_rate,
channels=self.input_config.channels,
dtype='int16',
blocksize=self.input_config.chunk,
callback=audio_callback,
device=None,
latency='low' # 低延迟模式
)
self.input_stream.start()
self.recording = True
return self.input_stream
except Exception as e:
print(f"打开输入流失败: {e}")
return None
def open_output_stream(self):
"""打开音频输出流""" """打开音频输出流"""
self.output_stream = self.pyaudio.open( try:
format=self.output_config.bit_size, import queue
channels=self.output_config.channels, self.playback_queue = queue.Queue(maxsize=50) # 增大播放队列,提供更多缓冲
rate=self.output_config.sample_rate,
output=True, def playback_callback(outdata, frames, time_info, status):
frames_per_buffer=self.output_config.chunk """音频播放回调"""
) if status:
return self.output_stream print(f"播放状态: {status}")
try:
# 从队列获取音频数据
audio_data = self.playback_queue.get_nowait()
# 转换字节数据为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, self.output_config.channels)
# 应用音频淡入淡出效果,减少爆音
if hasattr(self, '_apply_volume_fade'):
audio_array = self._apply_volume_fade(audio_array)
# 确保数据大小匹配
if len(audio_array) < frames:
# 数据不足用0填充
padded = np.zeros((frames, self.output_config.channels), dtype=np.int16)
padded[:len(audio_array)] = audio_array
outdata[:] = padded
else:
outdata[:] = audio_array[:frames]
except queue.Empty:
# 队列为空,输出静音
outdata.fill(0)
except Exception as e:
print(f"播放回调错误: {e}")
outdata.fill(0)
self.output_stream = sd.OutputStream(
samplerate=self.output_config.sample_rate,
channels=self.output_config.channels,
dtype='int16',
blocksize=self.output_config.chunk,
callback=playback_callback,
device=None,
latency='low' # 低延迟模式
)
self.output_stream.start()
self.playing = True
return self.output_stream
except Exception as e:
print(f"打开输出流失败: {e}")
return None
def play_audio(self, audio_data: bytes) -> None:
"""播放音频数据"""
try:
if self.playing and self.playback_queue:
# 音频数据预缓冲:将大数据块分成更小的块以获得更流畅的播放
chunk_size = self.output_config.chunk * 2 # 每个样本2字节
# 预处理音频数据
if hasattr(self, '_playback_processor'):
audio_data = self._playback_processor(audio_data)
# 预缓冲机制:在播放前积累一些音频块
if len(self.pre_buffer) < self.pre_buffer_size:
chunk_size = self.output_config.chunk * 2
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i+chunk_size]
self.pre_buffer.append(chunk)
if len(self.pre_buffer) >= self.pre_buffer_size:
break
# 如果预缓冲已满,开始播放
if len(self.pre_buffer) >= self.pre_buffer_size:
self._flush_pre_buffer()
# 分块处理音频数据,避免单个数据块过大
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i+chunk_size]
try:
# 使用阻塞式put确保不丢失数据
self.playback_queue.put(chunk, timeout=0.1)
except queue.Full:
print("警告: 播放队列已满,丢弃音频数据")
# 如果队列满,尝试清空一些旧数据
try:
self.playback_queue.get_nowait()
self.playback_queue.put(chunk, timeout=0.05)
except:
pass
break
except Exception as e:
print(f"音频播放失败: {e}")
def read_audio_data(self, frames: int) -> bytes:
"""读取音频数据"""
try:
if not self.recording or self.audio_queue is None:
return b'\x00' * (frames * 2) # 返回静音数据
# 使用更长的超时时间,提高音频数据获取成功率
try:
audio_data = self.audio_queue.get(timeout=0.1) # 增加超时时间
return audio_data
except queue.Empty:
# 队列为空,返回静音数据
return b'\x00' * (frames * 2)
except Exception as e:
print(f"读取音频数据失败: {e}")
return b'\x00' * (frames * 2) # 返回静音数据
def stop_recording(self):
"""停止录音"""
self.recording = False
def stop_playing(self):
"""停止播放"""
self.playing = False
if self.playback_queue:
# 清空播放队列
while not self.playback_queue.empty():
try:
self.playback_queue.get_nowait()
except queue.Empty:
break
def _flush_pre_buffer(self):
"""刷新预缓冲区到播放队列"""
if hasattr(self, 'pre_buffer') and self.pre_buffer:
for chunk in self.pre_buffer:
try:
self.playback_queue.put(chunk, timeout=0.1)
except queue.Full:
print("警告: 播放队列已满,丢弃预缓冲数据")
break
self.pre_buffer.clear()
def _apply_volume_fade(self, audio_array):
"""应用音量淡入淡出效果,减少爆音"""
try:
# 简单的淡入淡出效果
fade_samples = min(100, len(audio_array) // 10) # 淡入淡出样本数
# 淡入
for i in range(fade_samples):
factor = i / fade_samples
audio_array[i] = int(audio_array[i] * factor)
# 淡出
for i in range(fade_samples):
factor = (fade_samples - i) / fade_samples
audio_array[-(i+1)] = int(audio_array[-(i+1)] * factor)
return audio_array
except Exception as e:
print(f"音量淡入淡出失败: {e}")
return audio_array
def _detect_silence(self, audio_data):
"""检测静音"""
try:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_level = np.abs(audio_array).mean()
# 更新音频电平历史
self.audio_level_history.append(audio_level)
if len(self.audio_level_history) > 10:
self.audio_level_history.pop(0)
# 计算平均音频电平
avg_level = np.mean(self.audio_level_history) if self.audio_level_history else 0
# 检测静音
is_silence = audio_level < self.silence_threshold
return is_silence, audio_level, avg_level
except Exception as e:
print(f"静音检测失败: {e}")
return False, 0, 0
def cleanup(self) -> None: def cleanup(self) -> None:
"""清理音频设备资源""" """清理音频设备资源"""
for stream in [self.input_stream, self.output_stream]: try:
if stream: self.stop_recording()
stream.stop_stream() self.stop_playing()
stream.close() if self.input_stream:
self.pyaudio.terminate() self.input_stream.stop()
self.input_stream.close()
if self.output_stream:
self.output_stream.stop()
self.output_stream.close()
sd.stop() # 停止所有音频播放
# 清空预缓冲区
if hasattr(self, 'pre_buffer'):
self.pre_buffer.clear()
except Exception as e:
print(f"清理音频设备失败: {e}")
class DialogSession: class DialogSession:
@ -88,7 +312,7 @@ class DialogSession:
output_audio_format=output_audio_format, mod=mod, recv_timeout=recv_timeout) output_audio_format=output_audio_format, mod=mod, recv_timeout=recv_timeout)
if output_audio_format == "pcm_s16le": if output_audio_format == "pcm_s16le":
config.output_audio_config["format"] = "pcm_s16le" config.output_audio_config["format"] = "pcm_s16le"
config.output_audio_config["bit_size"] = pyaudio.paInt16 config.output_audio_config["bit_size"] = "int16" # 使用字符串标识符
self.is_running = True self.is_running = True
self.is_session_finished = False self.is_session_finished = False
@ -118,8 +342,12 @@ class DialogSession:
) )
# 初始化音频队列和输出流 # 初始化音频队列和输出流
print(f"输出音频配置: {config.output_audio_config}") print(f"输出音频配置: {config.output_audio_config}")
self.output_stream = self.audio_device.open_output_stream() output_stream = self.audio_device.open_output_stream()
print("音频输出流已打开") if output_stream:
print("音频输出流已打开")
self.output_stream = output_stream
else:
print("警告:音频输出流打开失败,将使用直接播放模式")
# 启动播放线程 # 启动播放线程
self.is_recording = True self.is_recording = True
self.is_playing = True self.is_playing = True
@ -155,11 +383,15 @@ class DialogSession:
if was_not_playing: if was_not_playing:
print("播放开始前,额外发送静音数据清理管道") print("播放开始前,额外发送静音数据清理管道")
for _ in range(3): for _ in range(3):
self.output_stream.write(b'\x00' * len(audio_data)) # 播放静音数据
self.audio_device.play_audio(b'\x00' * len(audio_data))
time.sleep(0.1) time.sleep(0.1)
# 播放音频数据 # 播放音频数据
self.output_stream.write(audio_data) try:
self.audio_device.play_audio(audio_data)
except Exception as e:
print(f"音频播放错误: {e}")
except queue.Empty: except queue.Empty:
# 队列为空,检查是否超时 # 队列为空,检查是否超时
@ -614,8 +846,8 @@ class DialogSession:
# 非播放期间:正常录音 # 非播放期间:正常录音
last_silence_time = current_time last_silence_time = current_time
# 添加exception_on_overflow=False参数来忽略溢出错误 # 使用AudioDeviceManager的专用读取方法
audio_data = stream.read(config.input_audio_config["chunk"], exception_on_overflow=False) audio_data = self.audio_device.read_audio_data(config.input_audio_config["chunk"])
# 在发送前再次检查是否应该发送静音数据(最后一道防线) # 在发送前再次检查是否应该发送静音数据(最后一道防线)
with self.audio_queue_lock: with self.audio_queue_lock:
@ -671,6 +903,7 @@ class DialogSession:
print(f"会话错误: {e}") print(f"会话错误: {e}")
finally: finally:
if not self.is_audio_file_input: if not self.is_audio_file_input:
self.audio_device.stop_recording() # 先停止录音
self.audio_device.cleanup() self.audio_device.cleanup()

View File

@ -1,7 +1,5 @@
import uuid import uuid
import pyaudio
# 配置信息 # 配置信息
ws_connect_config = { ws_connect_config = {
"base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue", "base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue",
@ -44,17 +42,17 @@ start_session_req = {
} }
input_audio_config = { input_audio_config = {
"chunk": 3200, "chunk": 6400, # 增大缓冲区大小,减少处理频率
"format": "pcm", "format": "pcm",
"channels": 1, "channels": 1,
"sample_rate": 16000, "sample_rate": 16000,
"bit_size": pyaudio.paInt16, "bit_size": "int16",
} }
output_audio_config = { output_audio_config = {
"chunk": 3200, "chunk": 6400, # 增大缓冲区大小,减少处理频率
"format": "pcm", "format": "pcm",
"channels": 1, "channels": 1,
"sample_rate": 24000, "sample_rate": 24000,
"bit_size": pyaudio.paFloat32, "bit_size": "int16",
} }

Binary file not shown.

View File

@ -6,7 +6,7 @@ from audio_manager import DialogSession
async def main() -> None: async def main() -> None:
parser = argparse.ArgumentParser(description="Real-time Dialog Client") parser = argparse.ArgumentParser(description="Real-time Dialog Client")
parser.add_argument("--format", type=str, default="pcm", help="The audio format (e.g., pcm, pcm_s16le).") parser.add_argument("--format", type=str, default="pcm_s16le", help="The audio format (e.g., pcm, pcm_s16le).")
parser.add_argument("--audio", type=str, default="", help="audio file send to server, if not set, will use microphone input.") parser.add_argument("--audio", type=str, default="", help="audio file send to server, if not set, will use microphone input.")
parser.add_argument("--mod",type=str,default="audio",help="Use mod to select plain text input mode or audio mode, the default is audio mode") parser.add_argument("--mod",type=str,default="audio",help="Use mod to select plain text input mode or audio mode, the default is audio mode")
parser.add_argument("--recv_timeout",type=int,default=10,help="Timeout for receiving messages,value range [10,120]") parser.add_argument("--recv_timeout",type=int,default=10,help="Timeout for receiving messages,value range [10,120]")

Binary file not shown.

177
doubao/test_microphone.py Normal file
View File

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
测试sounddevice麦克风录音功能
用于验证新的麦克风输入实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
import threading
import queue
import sys
def test_microphone():
"""测试麦克风录音"""
print("=== SoundDevice麦克风录音测试 ===")
# 1. 检查音频输入设备
print("\n1. 检查音频输入设备...")
try:
devices = sd.query_devices()
input_devices = [dev for dev in devices if dev['max_input_channels'] > 0]
print(f"找到 {len(input_devices)} 个输入设备:")
for i, dev in enumerate(input_devices):
print(f" [{i}] {dev['name']} (输入通道: {dev['max_input_channels']})")
if not input_devices:
print("错误: 没有找到可用的音频输入设备")
return False
# 查找默认输入设备
default_input = sd.default.device[0] if isinstance(sd.default.device, tuple) else sd.default.device
print(f"默认输入设备: {default_input}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试录音5秒
print("\n2. 测试录音5秒...")
try:
sample_rate = 16000
channels = 1
duration = 5
chunk_size = 3200
print(f"录音参数: 采样率={sample_rate}Hz, 通道={channels}, 时长={duration}")
print("开始录音,请说话...")
# 创建音频队列
audio_queue = queue.Queue()
recording = True
def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if recording:
audio_queue.put(indata.copy())
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size,
callback=audio_callback
) as stream:
# 录音指定时长
start_time = time.time()
audio_data = []
while time.time() - start_time < duration:
try:
data = audio_queue.get(timeout=1.0)
audio_data.append(data)
except queue.Empty:
print("警告: 音频队列为空")
break
print(f"录音完成,共收集到 {len(audio_data)} 个音频块")
# 3. 播放录制的音频
if audio_data:
print("\n3. 播放录制的音频...")
# 合并音频数据
recorded_audio = np.concatenate(audio_data, axis=0)
print(f"录制音频形状: {recorded_audio.shape}")
# 播放
print("开始播放录制的音频...")
sd.play(recorded_audio, sample_rate)
sd.wait()
print("✓ 音频播放完成")
# 保存音频文件
print("\n4. 保存音频文件...")
try:
from scipy.io import wavfile
wavfile.write('test_recording.wav', sample_rate, recorded_audio)
print("✓ 音频已保存为 test_recording.wav")
except ImportError:
print("提示: 安装scipy可保存WAV文件: pip install scipy")
else:
print("警告: 没有录制到音频数据")
return False
except Exception as e:
print(f"录音测试失败: {e}")
return False
return True
def test_stream_reading():
"""测试流式读取"""
print("\n5. 测试流式读取...")
try:
sample_rate = 16000
channels = 1
chunk_size = 3200
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size
) as stream:
print("开始流式读取测试...")
# 读取10个数据块
for i in range(10):
audio_data = stream.read(chunk_size)
print(f"读取第 {i+1} 块数据: 形状={audio_data.shape}, 类型={audio_data.dtype}")
# 转换为字节数据
byte_data = audio_data.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
time.sleep(0.1) # 模拟实际处理间隔
print("✓ 流式读取测试完成")
except Exception as e:
print(f"流式读取测试失败: {e}")
return False
return True
if __name__ == "__main__":
print("SoundDevice麦克风录音功能测试")
print("=" * 50)
success = True
# 测试麦克风
if not test_microphone():
success = False
# 测试流式读取
if not test_stream_reading():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有麦克风测试通过")
print("树莓派应该可以正常录音了!")
else:
print("✗ 部分测试失败,需要检查音频设备和权限")
print("请确保:")
print("1. 麦克风已正确连接")
print("2. 用户有音频设备访问权限")
print("3. 没有其他程序占用音频设备")

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
快速测试修复后的录音功能
验证回调模式是否解决了元组数据问题
"""
import sys
import time
import threading
def test_recording_fix():
"""测试录音修复"""
print("=== 测试录音修复 ===")
try:
# 导入修改后的模块
sys.path.append('/home/zhuchaowe/Local-Voice/doubao')
import audio_manager
import config
# 创建音频设备管理器
audio_device = audio_manager.AudioDeviceManager(
audio_manager.AudioConfig(**config.input_audio_config),
audio_manager.AudioConfig(**config.output_audio_config)
)
print("1. 打开音频输入流...")
input_stream = audio_device.open_input_stream()
if input_stream:
print("✓ 音频输入流打开成功")
else:
print("✗ 音频输入流打开失败")
return False
print("2. 测试读取音频数据...")
# 读取几秒钟的音频数据
for i in range(10):
audio_data = audio_device.read_audio_data(config.input_audio_config["chunk"])
if audio_data:
print(f"{i+1}次读取成功: {len(audio_data)} 字节")
if i == 0:
print(f"音频数据类型: {type(audio_data)}")
else:
print(f"{i+1}次读取失败")
time.sleep(0.1)
print("3. 停止录音...")
audio_device.stop_recording()
print("4. 清理资源...")
audio_device.cleanup()
print("✓ 所有测试通过!")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_recording_fix()
if success:
print("\n🎉 录音功能修复成功!")
print("现在可以运行 main.py 测试完整功能")
else:
print("\n❌ 录音功能仍有问题")
print("请检查错误信息并调整代码")

143
doubao/test_sounddevice.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
测试sounddevice音频播放功能
用于验证新的音频实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
def test_sounddevice():
"""测试sounddevice音频播放"""
print("=== SoundDevice音频播放测试 ===")
# 1. 检查音频设备
print("\n1. 检查音频设备...")
try:
devices = sd.query_devices()
print(f"找到 {len(devices)} 个音频设备:")
for i, dev in enumerate(devices):
print(f" [{i}] {dev['name']} (输入: {dev['max_input_channels']}, 输出: {dev['max_output_channels']})")
# 查找默认输出设备
default_output = sd.default.device
print(f"默认输出设备: {default_output}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试生成和播放音频
print("\n2. 测试生成和播放音频...")
try:
# 生成1秒的440Hz正弦波
sample_rate = 24000
duration = 1.0
frequency = 440
t = np.linspace(0, duration, int(sample_rate * duration), False)
audio_data = np.sin(2 * np.pi * frequency * t) * 0.3 # 30%音量
# 转换为16-bit整数
audio_data_int16 = (audio_data * 32767).astype(np.int16)
print(f"生成音频数据: 采样率={sample_rate}Hz, 时长={duration}秒, 频率={frequency}Hz")
print(f"音频数据形状: {audio_data_int16.shape}, 数据类型: {audio_data_int16.dtype}")
# 播放音频
print("开始播放测试音频...")
sd.play(audio_data_int16, sample_rate)
sd.wait() # 等待播放完成
print("✓ 音频播放成功")
except Exception as e:
print(f"音频播放失败: {e}")
return False
# 3. 测试直接播放字节数据
print("\n3. 测试直接播放字节数据...")
try:
# 将numpy数组转换为字节数据
byte_data = audio_data_int16.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
# 将字节数据转换回numpy数组
audio_array = np.frombuffer(byte_data, dtype=np.int16)
# 播放
print("开始播放字节数据...")
sd.play(audio_array, sample_rate)
sd.wait()
print("✓ 字节数据播放成功")
except Exception as e:
print(f"字节数据播放失败: {e}")
return False
# 4. 测试立体声
print("\n4. 测试立体声播放...")
try:
# 创建立体声数据
stereo_data = np.column_stack([audio_data_int16, audio_data_int16])
print(f"立体声数据形状: {stereo_data.shape}")
print("开始播放立体声音频...")
sd.play(stereo_data, sample_rate)
sd.wait()
print("✓ 立体声播放成功")
except Exception as e:
print(f"立体声播放失败: {e}")
return False
return True
def test_numpy_conversion():
"""测试numpy数组转换"""
print("\n5. 测试数据类型转换...")
# 模拟火山引擎返回的16bit PCM数据
test_data = b'\x00\x00\x7f\x7f\x80\x00\xff\xff' # 一些测试音频数据
try:
# 字节数据转numpy数组
audio_array = np.frombuffer(test_data, dtype=np.int16)
print(f"原始字节数据: {test_data}")
print(f"转换后numpy数组: {audio_array}")
print(f"数组形状: {audio_array.shape}, 数据类型: {audio_array.dtype}")
# 重塑为单声道
audio_reshaped = audio_array.reshape(-1, 1)
print(f"重塑后形状: {audio_reshaped.shape}")
# 转回字节数据
byte_data = audio_array.tobytes()
print(f"转回字节数据: {byte_data}")
print("✓ 数据类型转换测试成功")
return True
except Exception as e:
print(f"数据类型转换失败: {e}")
return False
if __name__ == "__main__":
print("SoundDevice音频播放功能测试")
print("=" * 50)
success = True
# 测试sounddevice
if not test_sounddevice():
success = False
# 测试数据转换
if not test_numpy_conversion():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有SoundDevice测试通过")
print("树莓派应该可以正常播放音频了!")
else:
print("✗ 部分测试失败,需要进一步调试")