Compare commits

...

7 Commits

Author SHA1 Message Date
朱潮
cc79605405 config 2025-09-19 21:13:02 +08:00
朱潮
3958d2ff81 fix audio 2025-09-19 20:49:20 +08:00
朱潮
bc1dd7f03f fix audio 2025-09-19 20:47:18 +08:00
朱潮
e4bcce4946 fix audio 2025-09-19 20:44:35 +08:00
朱潮
d5f2957984 fix audio 2025-09-19 20:42:44 +08:00
朱潮
e4503e2d1a config 2025-09-19 20:28:43 +08:00
朱潮
38d015d3f2 fix audio 2025-09-19 20:16:39 +08:00
10 changed files with 668 additions and 46 deletions

View File

@ -11,7 +11,8 @@ from dataclasses import dataclass
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import config import config
import pyaudio import sounddevice as sd
import numpy as np
from realtime_dialog_client import RealtimeDialogClient from realtime_dialog_client import RealtimeDialogClient
@ -19,7 +20,7 @@ from realtime_dialog_client import RealtimeDialogClient
class AudioConfig: class AudioConfig:
"""音频配置数据类""" """音频配置数据类"""
format: str format: str
bit_size: int bit_size: str # 改为字符串类型
channels: int channels: int
sample_rate: int sample_rate: int
chunk: int chunk: int
@ -31,40 +32,160 @@ class AudioDeviceManager:
def __init__(self, input_config: AudioConfig, output_config: AudioConfig): def __init__(self, input_config: AudioConfig, output_config: AudioConfig):
self.input_config = input_config self.input_config = input_config
self.output_config = output_config self.output_config = output_config
self.pyaudio = pyaudio.PyAudio() self.input_stream = None
self.input_stream: Optional[pyaudio.Stream] = None self.output_stream = None
self.output_stream: Optional[pyaudio.Stream] = None self.audio_queue = None
self.recording = False
def open_input_stream(self) -> pyaudio.Stream: # 音频缓冲播放相关
self.audio_buffer = bytes() # 使用不可变的bytes而不是bytearray
self.buffer_playback_threshold = self.output_config.sample_rate * 15 # 15秒的音频数据
self.min_buffer_size = self.output_config.sample_rate * 3 # 最小缓冲3秒
self.is_buffer_playing = False
self.last_play_time = 0
def open_input_stream(self):
"""打开音频输入流""" """打开音频输入流"""
# p = pyaudio.PyAudio() try:
self.input_stream = self.pyaudio.open( import queue
format=self.input_config.bit_size, self.audio_queue = queue.Queue(maxsize=100) # 音频数据队列
channels=self.input_config.channels,
rate=self.input_config.sample_rate,
input=True,
frames_per_buffer=self.input_config.chunk
)
return self.input_stream
def open_output_stream(self) -> pyaudio.Stream: def audio_callback(indata, frames, time_info, status):
"""打开音频输出流""" """音频数据回调"""
self.output_stream = self.pyaudio.open( if status:
format=self.output_config.bit_size, print(f"音频流状态: {status}")
channels=self.output_config.channels, if self.recording and self.audio_queue:
rate=self.output_config.sample_rate, try:
output=True, # 将numpy数组转换为字节数据
frames_per_buffer=self.output_config.chunk audio_bytes = indata.tobytes()
self.audio_queue.put_nowait(audio_bytes)
except queue.Full:
print("警告: 音频队列已满,丢弃数据")
self.input_stream = sd.InputStream(
samplerate=self.input_config.sample_rate,
channels=self.input_config.channels,
dtype='int16', # 16-bit PCM
blocksize=self.input_config.chunk,
callback=audio_callback,
device=None # 使用默认设备
) )
self.input_stream.start()
self.recording = True
return self.input_stream
except Exception as e:
print(f"打开输入流失败: {e}")
return None
def open_output_stream(self):
"""打开音频输出流"""
try:
self.output_stream = sd.OutputStream(
samplerate=self.output_config.sample_rate,
channels=self.output_config.channels,
dtype='int16', # 16-bit PCM
blocksize=self.output_config.chunk,
device=None # 使用默认设备
)
self.output_stream.start()
return self.output_stream return self.output_stream
except Exception as e:
print(f"打开输出流失败: {e}")
return None
def play_audio(self, audio_data: bytes) -> None:
"""播放音频数据 - 原有的实时播放方法"""
try:
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, self.output_config.channels)
# 使用sounddevice播放
sd.play(audio_array, samplerate=self.output_config.sample_rate)
sd.wait() # 等待播放完成
except Exception as e:
print(f"音频播放失败: {e}")
def buffer_audio(self, audio_data: bytes) -> bool:
"""缓冲音频数据,返回是否应该播放"""
try:
# 使用bytes连接而不是extend
self.audio_buffer = self.audio_buffer + audio_data
current_time = time.time()
# 判断是否应该播放缓冲的音频
should_play = (
len(self.audio_buffer) >= self.buffer_playback_threshold or # 达到缓冲阈值
(len(self.audio_buffer) >= self.min_buffer_size and
current_time - self.last_play_time > 5.0) # 最小缓冲且距离上次播放超过5秒
)
return should_play
except Exception as e:
print(f"音频缓冲失败: {e}")
return False
def play_buffered_audio(self) -> None:
"""播放缓冲的音频数据"""
try:
if not self.audio_buffer:
return
# 将缓冲数据转换为numpy数组
audio_array = np.frombuffer(self.audio_buffer, dtype=np.int16)
audio_array = audio_array.reshape(-1, self.output_config.channels)
# 使用非阻塞播放,避免等待
sd.play(audio_array, samplerate=self.output_config.sample_rate)
# 清空缓冲区
self.audio_buffer = bytes()
self.last_play_time = time.time()
self.is_buffer_playing = True
except Exception as e:
print(f"缓冲音频播放失败: {e}")
def clear_audio_buffer(self) -> None:
"""清空音频缓冲区"""
self.audio_buffer = bytes()
self.is_buffer_playing = False
def read_audio_data(self, frames: int) -> bytes:
"""读取音频数据"""
try:
if not self.recording or self.audio_queue is None:
return b'\x00' * (frames * 2) # 返回静音数据
# 从队列获取音频数据
try:
audio_data = self.audio_queue.get(timeout=0.1) # 100ms超时
return audio_data
except queue.Empty:
# 队列为空,返回静音数据
return b'\x00' * (frames * 2)
except Exception as e:
print(f"读取音频数据失败: {e}")
return b'\x00' * (frames * 2) # 返回静音数据
def stop_recording(self):
"""停止录音"""
self.recording = False
def cleanup(self) -> None: def cleanup(self) -> None:
"""清理音频设备资源""" """清理音频设备资源"""
for stream in [self.input_stream, self.output_stream]: try:
if stream: self.recording = False
stream.stop_stream() if self.input_stream:
stream.close() self.input_stream.stop()
self.pyaudio.terminate() self.input_stream.close()
if self.output_stream:
self.output_stream.stop()
self.output_stream.close()
sd.stop() # 停止所有音频播放
except Exception as e:
print(f"清理音频设备失败: {e}")
class DialogSession: class DialogSession:
@ -73,7 +194,7 @@ class DialogSession:
mod: str mod: str
def __init__(self, ws_config: Dict[str, Any], output_audio_format: str = "pcm", audio_file_path: str = "", def __init__(self, ws_config: Dict[str, Any], output_audio_format: str = "pcm", audio_file_path: str = "",
mod: str = "audio", recv_timeout: int = 10): mod: str = "audio", recv_timeout: int = 10, use_buffered_playback: bool = False):
self.audio_file_path = audio_file_path self.audio_file_path = audio_file_path
self.recv_timeout = recv_timeout self.recv_timeout = recv_timeout
self.is_audio_file_input = self.audio_file_path != "" self.is_audio_file_input = self.audio_file_path != ""
@ -88,7 +209,7 @@ class DialogSession:
output_audio_format=output_audio_format, mod=mod, recv_timeout=recv_timeout) output_audio_format=output_audio_format, mod=mod, recv_timeout=recv_timeout)
if output_audio_format == "pcm_s16le": if output_audio_format == "pcm_s16le":
config.output_audio_config["format"] = "pcm_s16le" config.output_audio_config["format"] = "pcm_s16le"
config.output_audio_config["bit_size"] = pyaudio.paInt16 config.output_audio_config["bit_size"] = "int16" # 使用字符串标识符
self.is_running = True self.is_running = True
self.is_session_finished = False self.is_session_finished = False
@ -104,6 +225,10 @@ class DialogSession:
self.last_recording_state = False # 上次录音状态 self.last_recording_state = False # 上次录音状态
self.say_hello_completed = False # say hello 是否已完成 self.say_hello_completed = False # say hello 是否已完成
# 音频缓冲播放相关
self.use_buffered_playback = use_buffered_playback # 根据参数启用缓冲播放模式
self.buffer_check_interval = 0.1 # 缓冲检查间隔
# 新增:音频输入流控制 # 新增:音频输入流控制
self.input_stream_paused = False # 输入流是否被暂停 self.input_stream_paused = False # 输入流是否被暂停
self.force_silence_mode = False # 强制静音模式 self.force_silence_mode = False # 强制静音模式
@ -118,17 +243,24 @@ class DialogSession:
) )
# 初始化音频队列和输出流 # 初始化音频队列和输出流
print(f"输出音频配置: {config.output_audio_config}") print(f"输出音频配置: {config.output_audio_config}")
self.output_stream = self.audio_device.open_output_stream() output_stream = self.audio_device.open_output_stream()
if output_stream:
print("音频输出流已打开") print("音频输出流已打开")
self.output_stream = output_stream
else:
print("警告:音频输出流打开失败,将使用直接播放模式")
# 启动播放线程 # 启动播放线程
self.is_recording = True self.is_recording = True
self.is_playing = True self.is_playing = True
if self.use_buffered_playback:
self.player_thread = threading.Thread(target=self._buffered_audio_player_thread)
else:
self.player_thread = threading.Thread(target=self._audio_player_thread) self.player_thread = threading.Thread(target=self._audio_player_thread)
self.player_thread.daemon = True self.player_thread.daemon = True
self.player_thread.start() self.player_thread.start()
def _audio_player_thread(self): def _audio_player_thread(self):
"""音频播放线程""" """音频播放线程 - 原有的实时播放模式"""
audio_playing_timeout = 1.0 # 1秒没有音频数据认为播放结束 audio_playing_timeout = 1.0 # 1秒没有音频数据认为播放结束
queue_check_interval = 0.1 # 每100ms检查一次队列状态 queue_check_interval = 0.1 # 每100ms检查一次队列状态
@ -155,11 +287,15 @@ class DialogSession:
if was_not_playing: if was_not_playing:
print("播放开始前,额外发送静音数据清理管道") print("播放开始前,额外发送静音数据清理管道")
for _ in range(3): for _ in range(3):
self.output_stream.write(b'\x00' * len(audio_data)) # 播放静音数据
self.audio_device.play_audio(b'\x00' * len(audio_data))
time.sleep(0.1) time.sleep(0.1)
# 播放音频数据 # 播放音频数据
self.output_stream.write(audio_data) try:
self.audio_device.play_audio(audio_data)
except Exception as e:
print(f"音频播放错误: {e}")
except queue.Empty: except queue.Empty:
# 队列为空,检查是否超时 # 队列为空,检查是否超时
@ -197,6 +333,93 @@ class DialogSession:
self.is_recording_paused = False self.is_recording_paused = False
time.sleep(0.1) time.sleep(0.1)
def _buffered_audio_player_thread(self):
"""音频缓冲播放线程 - 新的缓冲播放模式"""
audio_playing_timeout = 2.0 # 2秒没有音频数据认为播放结束
buffer_check_interval = 0.05 # 每50ms检查一次缓冲区状态
print("启动缓冲音频播放线程")
while self.is_playing:
try:
current_time = time.time()
# 检查是否有新的音频数据
audio_data = None
try:
audio_data = self.audio_queue.get(timeout=buffer_check_interval)
except queue.Empty:
pass
if audio_data is not None:
with self.audio_queue_lock:
# 接收到音频数据,更新播放状态
was_not_playing = not self.is_playing_audio
if was_not_playing:
# 从非播放状态进入播放状态
self.is_playing_audio = True
if not self.is_recording_paused:
self.is_recording_paused = True
print("缓冲播放开始,确认暂停录音")
# 更新最后音频时间
self.last_audio_time = current_time
# 播放前清理管道
if was_not_playing:
print("缓冲播放开始前,清理管道")
for _ in range(2):
self.audio_device.play_audio(b'\x00' * len(audio_data))
time.sleep(0.05)
# 缓冲音频数据
should_play = self.audio_device.buffer_audio(audio_data)
# 如果达到播放条件,播放缓冲的音频
if should_play:
print(f"播放缓冲音频,缓冲大小: {len(self.audio_device.audio_buffer)} 字节")
self.audio_device.play_buffered_audio()
else:
# 没有新的音频数据,检查是否超时
with self.audio_queue_lock:
if self.is_playing_audio:
if hasattr(self, 'last_audio_time') and current_time - self.last_audio_time > audio_playing_timeout:
# 超时检查:如果缓冲区有数据,先播放
if len(self.audio_device.audio_buffer) > 0:
print("播放超时,播放剩余缓冲音频")
self.audio_device.play_buffered_audio()
# 然后恢复录音状态
self.is_playing_audio = False
self.is_recording_paused = False
self.force_silence_mode = False
self.input_stream_paused = False
# 标记 say hello 完成
if hasattr(self, 'say_hello_completed') and not self.say_hello_completed:
self.say_hello_completed = True
print("缓冲播放 say hello 音频播放完成")
print("缓冲播放超时,恢复录音")
# 设置静音数据发送标志
try:
silence_data = b'\x00' * config.input_audio_config["chunk"]
self.silence_send_count = 2
self.should_send_silence = True
except Exception as e:
print(f"准备静音数据失败: {e}")
time.sleep(buffer_check_interval)
except Exception as e:
print(f"缓冲音频播放错误: {e}")
with self.audio_queue_lock:
self.is_playing_audio = False
self.is_recording_paused = False
time.sleep(0.1)
# 移除了静音检测函数,避免干扰正常的音频处理 # 移除了静音检测函数,避免干扰正常的音频处理
async def _send_silence_on_playback_end(self): async def _send_silence_on_playback_end(self):
@ -263,6 +486,10 @@ class DialogSession:
self.audio_queue.get_nowait() self.audio_queue.get_nowait()
except queue.Empty: except queue.Empty:
continue continue
# 如果是缓冲播放模式,也要清空音频设备缓冲区
if self.use_buffered_playback:
self.audio_device.clear_audio_buffer()
print("缓冲播放:清空音频设备缓冲区")
self.is_user_querying = True self.is_user_querying = True
print("服务器准备接收用户输入") print("服务器准备接收用户输入")
@ -303,6 +530,12 @@ class DialogSession:
self.is_playing_audio = False self.is_playing_audio = False
self.force_silence_mode = False # 关闭强制静音模式 self.force_silence_mode = False # 关闭强制静音模式
self.input_stream_paused = False # 恢复输入流 self.input_stream_paused = False # 恢复输入流
# 如果是缓冲播放模式,清空缓冲区
if self.use_buffered_playback:
self.audio_device.clear_audio_buffer()
print("缓冲播放:服务器响应完成,清空音频缓冲区")
if was_paused: if was_paused:
print("服务器响应完成,立即恢复录音") print("服务器响应完成,立即恢复录音")
# 设置标志发送静音数据 # 设置标志发送静音数据
@ -614,8 +847,8 @@ class DialogSession:
# 非播放期间:正常录音 # 非播放期间:正常录音
last_silence_time = current_time last_silence_time = current_time
# 添加exception_on_overflow=False参数来忽略溢出错误 # 使用AudioDeviceManager的专用读取方法
audio_data = stream.read(config.input_audio_config["chunk"], exception_on_overflow=False) audio_data = self.audio_device.read_audio_data(config.input_audio_config["chunk"])
# 在发送前再次检查是否应该发送静音数据(最后一道防线) # 在发送前再次检查是否应该发送静音数据(最后一道防线)
with self.audio_queue_lock: with self.audio_queue_lock:
@ -671,6 +904,7 @@ class DialogSession:
print(f"会话错误: {e}") print(f"会话错误: {e}")
finally: finally:
if not self.is_audio_file_input: if not self.is_audio_file_input:
self.audio_device.stop_recording() # 先停止录音
self.audio_device.cleanup() self.audio_device.cleanup()

View File

@ -1,7 +1,5 @@
import uuid import uuid
import pyaudio
# 配置信息 # 配置信息
ws_connect_config = { ws_connect_config = {
"base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue", "base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue",
@ -48,7 +46,7 @@ input_audio_config = {
"format": "pcm", "format": "pcm",
"channels": 1, "channels": 1,
"sample_rate": 16000, "sample_rate": 16000,
"bit_size": pyaudio.paInt16, "bit_size": "int16",
} }
output_audio_config = { output_audio_config = {
@ -56,5 +54,5 @@ output_audio_config = {
"format": "pcm", "format": "pcm",
"channels": 1, "channels": 1,
"sample_rate": 24000, "sample_rate": 24000,
"bit_size": pyaudio.paFloat32, "bit_size": "int16",
} }

Binary file not shown.

View File

@ -6,14 +6,15 @@ from audio_manager import DialogSession
async def main() -> None: async def main() -> None:
parser = argparse.ArgumentParser(description="Real-time Dialog Client") parser = argparse.ArgumentParser(description="Real-time Dialog Client")
parser.add_argument("--format", type=str, default="pcm", help="The audio format (e.g., pcm, pcm_s16le).") parser.add_argument("--format", type=str, default="pcm_s16le", help="The audio format (e.g., pcm, pcm_s16le).")
parser.add_argument("--audio", type=str, default="", help="audio file send to server, if not set, will use microphone input.") parser.add_argument("--audio", type=str, default="", help="audio file send to server, if not set, will use microphone input.")
parser.add_argument("--mod",type=str,default="audio",help="Use mod to select plain text input mode or audio mode, the default is audio mode") parser.add_argument("--mod",type=str,default="audio",help="Use mod to select plain text input mode or audio mode, the default is audio mode")
parser.add_argument("--recv_timeout",type=int,default=10,help="Timeout for receiving messages,value range [10,120]") parser.add_argument("--recv_timeout",type=int,default=10,help="Timeout for receiving messages,value range [10,120]")
parser.add_argument("--buffered_playback",action="store_true",help="Enable buffered audio playback mode for better performance on low-end devices")
args = parser.parse_args() args = parser.parse_args()
session = DialogSession(ws_config=config.ws_connect_config, output_audio_format=args.format, audio_file_path=args.audio,mod=args.mod,recv_timeout=args.recv_timeout) session = DialogSession(ws_config=config.ws_connect_config, output_audio_format=args.format, audio_file_path=args.audio,mod=args.mod,recv_timeout=args.recv_timeout, use_buffered_playback=args.buffered_playback)
await session.start() await session.start()
if __name__ == "__main__": if __name__ == "__main__":

Binary file not shown.

177
doubao/test_microphone.py Normal file
View File

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
测试sounddevice麦克风录音功能
用于验证新的麦克风输入实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
import threading
import queue
import sys
def test_microphone():
"""测试麦克风录音"""
print("=== SoundDevice麦克风录音测试 ===")
# 1. 检查音频输入设备
print("\n1. 检查音频输入设备...")
try:
devices = sd.query_devices()
input_devices = [dev for dev in devices if dev['max_input_channels'] > 0]
print(f"找到 {len(input_devices)} 个输入设备:")
for i, dev in enumerate(input_devices):
print(f" [{i}] {dev['name']} (输入通道: {dev['max_input_channels']})")
if not input_devices:
print("错误: 没有找到可用的音频输入设备")
return False
# 查找默认输入设备
default_input = sd.default.device[0] if isinstance(sd.default.device, tuple) else sd.default.device
print(f"默认输入设备: {default_input}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试录音5秒
print("\n2. 测试录音5秒...")
try:
sample_rate = 16000
channels = 1
duration = 5
chunk_size = 3200
print(f"录音参数: 采样率={sample_rate}Hz, 通道={channels}, 时长={duration}")
print("开始录音,请说话...")
# 创建音频队列
audio_queue = queue.Queue()
recording = True
def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if recording:
audio_queue.put(indata.copy())
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size,
callback=audio_callback
) as stream:
# 录音指定时长
start_time = time.time()
audio_data = []
while time.time() - start_time < duration:
try:
data = audio_queue.get(timeout=1.0)
audio_data.append(data)
except queue.Empty:
print("警告: 音频队列为空")
break
print(f"录音完成,共收集到 {len(audio_data)} 个音频块")
# 3. 播放录制的音频
if audio_data:
print("\n3. 播放录制的音频...")
# 合并音频数据
recorded_audio = np.concatenate(audio_data, axis=0)
print(f"录制音频形状: {recorded_audio.shape}")
# 播放
print("开始播放录制的音频...")
sd.play(recorded_audio, sample_rate)
sd.wait()
print("✓ 音频播放完成")
# 保存音频文件
print("\n4. 保存音频文件...")
try:
from scipy.io import wavfile
wavfile.write('test_recording.wav', sample_rate, recorded_audio)
print("✓ 音频已保存为 test_recording.wav")
except ImportError:
print("提示: 安装scipy可保存WAV文件: pip install scipy")
else:
print("警告: 没有录制到音频数据")
return False
except Exception as e:
print(f"录音测试失败: {e}")
return False
return True
def test_stream_reading():
"""测试流式读取"""
print("\n5. 测试流式读取...")
try:
sample_rate = 16000
channels = 1
chunk_size = 3200
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size
) as stream:
print("开始流式读取测试...")
# 读取10个数据块
for i in range(10):
audio_data = stream.read(chunk_size)
print(f"读取第 {i+1} 块数据: 形状={audio_data.shape}, 类型={audio_data.dtype}")
# 转换为字节数据
byte_data = audio_data.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
time.sleep(0.1) # 模拟实际处理间隔
print("✓ 流式读取测试完成")
except Exception as e:
print(f"流式读取测试失败: {e}")
return False
return True
if __name__ == "__main__":
print("SoundDevice麦克风录音功能测试")
print("=" * 50)
success = True
# 测试麦克风
if not test_microphone():
success = False
# 测试流式读取
if not test_stream_reading():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有麦克风测试通过")
print("树莓派应该可以正常录音了!")
else:
print("✗ 部分测试失败,需要检查音频设备和权限")
print("请确保:")
print("1. 麦克风已正确连接")
print("2. 用户有音频设备访问权限")
print("3. 没有其他程序占用音频设备")

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
快速测试修复后的录音功能
验证回调模式是否解决了元组数据问题
"""
import sys
import time
import threading
def test_recording_fix():
"""测试录音修复"""
print("=== 测试录音修复 ===")
try:
# 导入修改后的模块
sys.path.append('/home/zhuchaowe/Local-Voice/doubao')
import audio_manager
import config
# 创建音频设备管理器
audio_device = audio_manager.AudioDeviceManager(
audio_manager.AudioConfig(**config.input_audio_config),
audio_manager.AudioConfig(**config.output_audio_config)
)
print("1. 打开音频输入流...")
input_stream = audio_device.open_input_stream()
if input_stream:
print("✓ 音频输入流打开成功")
else:
print("✗ 音频输入流打开失败")
return False
print("2. 测试读取音频数据...")
# 读取几秒钟的音频数据
for i in range(10):
audio_data = audio_device.read_audio_data(config.input_audio_config["chunk"])
if audio_data:
print(f"{i+1}次读取成功: {len(audio_data)} 字节")
if i == 0:
print(f"音频数据类型: {type(audio_data)}")
else:
print(f"{i+1}次读取失败")
time.sleep(0.1)
print("3. 停止录音...")
audio_device.stop_recording()
print("4. 清理资源...")
audio_device.cleanup()
print("✓ 所有测试通过!")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_recording_fix()
if success:
print("\n🎉 录音功能修复成功!")
print("现在可以运行 main.py 测试完整功能")
else:
print("\n❌ 录音功能仍有问题")
print("请检查错误信息并调整代码")

143
doubao/test_sounddevice.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
测试sounddevice音频播放功能
用于验证新的音频实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
def test_sounddevice():
"""测试sounddevice音频播放"""
print("=== SoundDevice音频播放测试 ===")
# 1. 检查音频设备
print("\n1. 检查音频设备...")
try:
devices = sd.query_devices()
print(f"找到 {len(devices)} 个音频设备:")
for i, dev in enumerate(devices):
print(f" [{i}] {dev['name']} (输入: {dev['max_input_channels']}, 输出: {dev['max_output_channels']})")
# 查找默认输出设备
default_output = sd.default.device
print(f"默认输出设备: {default_output}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试生成和播放音频
print("\n2. 测试生成和播放音频...")
try:
# 生成1秒的440Hz正弦波
sample_rate = 24000
duration = 1.0
frequency = 440
t = np.linspace(0, duration, int(sample_rate * duration), False)
audio_data = np.sin(2 * np.pi * frequency * t) * 0.3 # 30%音量
# 转换为16-bit整数
audio_data_int16 = (audio_data * 32767).astype(np.int16)
print(f"生成音频数据: 采样率={sample_rate}Hz, 时长={duration}秒, 频率={frequency}Hz")
print(f"音频数据形状: {audio_data_int16.shape}, 数据类型: {audio_data_int16.dtype}")
# 播放音频
print("开始播放测试音频...")
sd.play(audio_data_int16, sample_rate)
sd.wait() # 等待播放完成
print("✓ 音频播放成功")
except Exception as e:
print(f"音频播放失败: {e}")
return False
# 3. 测试直接播放字节数据
print("\n3. 测试直接播放字节数据...")
try:
# 将numpy数组转换为字节数据
byte_data = audio_data_int16.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
# 将字节数据转换回numpy数组
audio_array = np.frombuffer(byte_data, dtype=np.int16)
# 播放
print("开始播放字节数据...")
sd.play(audio_array, sample_rate)
sd.wait()
print("✓ 字节数据播放成功")
except Exception as e:
print(f"字节数据播放失败: {e}")
return False
# 4. 测试立体声
print("\n4. 测试立体声播放...")
try:
# 创建立体声数据
stereo_data = np.column_stack([audio_data_int16, audio_data_int16])
print(f"立体声数据形状: {stereo_data.shape}")
print("开始播放立体声音频...")
sd.play(stereo_data, sample_rate)
sd.wait()
print("✓ 立体声播放成功")
except Exception as e:
print(f"立体声播放失败: {e}")
return False
return True
def test_numpy_conversion():
"""测试numpy数组转换"""
print("\n5. 测试数据类型转换...")
# 模拟火山引擎返回的16bit PCM数据
test_data = b'\x00\x00\x7f\x7f\x80\x00\xff\xff' # 一些测试音频数据
try:
# 字节数据转numpy数组
audio_array = np.frombuffer(test_data, dtype=np.int16)
print(f"原始字节数据: {test_data}")
print(f"转换后numpy数组: {audio_array}")
print(f"数组形状: {audio_array.shape}, 数据类型: {audio_array.dtype}")
# 重塑为单声道
audio_reshaped = audio_array.reshape(-1, 1)
print(f"重塑后形状: {audio_reshaped.shape}")
# 转回字节数据
byte_data = audio_array.tobytes()
print(f"转回字节数据: {byte_data}")
print("✓ 数据类型转换测试成功")
return True
except Exception as e:
print(f"数据类型转换失败: {e}")
return False
if __name__ == "__main__":
print("SoundDevice音频播放功能测试")
print("=" * 50)
success = True
# 测试sounddevice
if not test_sounddevice():
success = False
# 测试数据转换
if not test_numpy_conversion():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有SoundDevice测试通过")
print("树莓派应该可以正常播放音频了!")
else:
print("✗ 部分测试失败,需要进一步调试")