Compare commits

...

7 Commits

Author SHA1 Message Date
朱潮
7eff24a175 fix audio 2025-09-19 20:58:35 +08:00
朱潮
3958d2ff81 fix audio 2025-09-19 20:49:20 +08:00
朱潮
bc1dd7f03f fix audio 2025-09-19 20:47:18 +08:00
朱潮
e4bcce4946 fix audio 2025-09-19 20:44:35 +08:00
朱潮
d5f2957984 fix audio 2025-09-19 20:42:44 +08:00
朱潮
e4503e2d1a config 2025-09-19 20:28:43 +08:00
朱潮
38d015d3f2 fix audio 2025-09-19 20:16:39 +08:00
10 changed files with 664 additions and 44 deletions

View File

@ -11,7 +11,8 @@ from dataclasses import dataclass
from typing import Any, Dict, Optional
import config
import pyaudio
import sounddevice as sd
import numpy as np
from realtime_dialog_client import RealtimeDialogClient
@ -19,7 +20,7 @@ from realtime_dialog_client import RealtimeDialogClient
class AudioConfig:
"""音频配置数据类"""
format: str
bit_size: int
bit_size: str # 改为字符串类型
channels: int
sample_rate: int
chunk: int
@ -31,40 +32,263 @@ class AudioDeviceManager:
def __init__(self, input_config: AudioConfig, output_config: AudioConfig):
self.input_config = input_config
self.output_config = output_config
self.pyaudio = pyaudio.PyAudio()
self.input_stream: Optional[pyaudio.Stream] = None
self.output_stream: Optional[pyaudio.Stream] = None
self.input_stream = None
self.output_stream = None
self.audio_queue = None
self.playback_queue = None # 播放队列
self.recording = False
self.playing = False
def open_input_stream(self) -> pyaudio.Stream:
# 预缓冲机制
self.pre_buffer = []
self.pre_buffer_size = 5 # 预缓冲5个音频块
self.buffer_threshold = 3 # 缓冲阈值,低于此值开始预缓冲
# 静音检测和回声消除
self.silence_threshold = 500 # 静音阈值
self.echo_suppression_enabled = True
self.last_audio_level = 0
self.audio_level_history = []
def open_input_stream(self):
"""打开音频输入流"""
# p = pyaudio.PyAudio()
self.input_stream = self.pyaudio.open(
format=self.input_config.bit_size,
channels=self.input_config.channels,
rate=self.input_config.sample_rate,
input=True,
frames_per_buffer=self.input_config.chunk
)
return self.input_stream
try:
import queue
self.audio_queue = queue.Queue(maxsize=100) # 增大队列大小,提供更多缓冲
def open_output_stream(self) -> pyaudio.Stream:
"""打开音频输出流"""
self.output_stream = self.pyaudio.open(
format=self.output_config.bit_size,
channels=self.output_config.channels,
rate=self.output_config.sample_rate,
output=True,
frames_per_buffer=self.output_config.chunk
def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if self.recording and self.audio_queue:
try:
# 将numpy数组转换为字节数据
audio_bytes = indata.tobytes()
# 添加音频数据预处理,提高质量
if hasattr(self, '_audio_processor'):
audio_bytes = self._audio_processor(audio_bytes)
self.audio_queue.put_nowait(audio_bytes)
except queue.Full:
pass # 静默丢弃,避免阻塞
self.input_stream = sd.InputStream(
samplerate=self.input_config.sample_rate,
channels=self.input_config.channels,
dtype='int16',
blocksize=self.input_config.chunk,
callback=audio_callback,
device=None,
latency='low' # 低延迟模式
)
self.input_stream.start()
self.recording = True
return self.input_stream
except Exception as e:
print(f"打开输入流失败: {e}")
return None
def open_output_stream(self):
"""打开音频输出流"""
try:
import queue
self.playback_queue = queue.Queue(maxsize=50) # 增大播放队列,提供更多缓冲
def playback_callback(outdata, frames, time_info, status):
"""音频播放回调"""
if status:
print(f"播放状态: {status}")
try:
# 从队列获取音频数据
audio_data = self.playback_queue.get_nowait()
# 转换字节数据为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_array = audio_array.reshape(-1, self.output_config.channels)
# 应用音频淡入淡出效果,减少爆音
if hasattr(self, '_apply_volume_fade'):
audio_array = self._apply_volume_fade(audio_array)
# 确保数据大小匹配
if len(audio_array) < frames:
# 数据不足用0填充
padded = np.zeros((frames, self.output_config.channels), dtype=np.int16)
padded[:len(audio_array)] = audio_array
outdata[:] = padded
else:
outdata[:] = audio_array[:frames]
except queue.Empty:
# 队列为空,输出静音
outdata.fill(0)
except Exception as e:
print(f"播放回调错误: {e}")
outdata.fill(0)
self.output_stream = sd.OutputStream(
samplerate=self.output_config.sample_rate,
channels=self.output_config.channels,
dtype='int16',
blocksize=self.output_config.chunk,
callback=playback_callback,
device=None,
latency='low' # 低延迟模式
)
self.output_stream.start()
self.playing = True
return self.output_stream
except Exception as e:
print(f"打开输出流失败: {e}")
return None
def play_audio(self, audio_data: bytes) -> None:
"""播放音频数据"""
try:
if self.playing and self.playback_queue:
# 音频数据预缓冲:将大数据块分成更小的块以获得更流畅的播放
chunk_size = self.output_config.chunk * 2 # 每个样本2字节
# 预处理音频数据
if hasattr(self, '_playback_processor'):
audio_data = self._playback_processor(audio_data)
# 预缓冲机制:在播放前积累一些音频块
if len(self.pre_buffer) < self.pre_buffer_size:
chunk_size = self.output_config.chunk * 2
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i+chunk_size]
self.pre_buffer.append(chunk)
if len(self.pre_buffer) >= self.pre_buffer_size:
break
# 如果预缓冲已满,开始播放
if len(self.pre_buffer) >= self.pre_buffer_size:
self._flush_pre_buffer()
# 分块处理音频数据,避免单个数据块过大
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i+chunk_size]
try:
# 使用阻塞式put确保不丢失数据
self.playback_queue.put(chunk, timeout=0.1)
except queue.Full:
print("警告: 播放队列已满,丢弃音频数据")
# 如果队列满,尝试清空一些旧数据
try:
self.playback_queue.get_nowait()
self.playback_queue.put(chunk, timeout=0.05)
except:
pass
break
except Exception as e:
print(f"音频播放失败: {e}")
def read_audio_data(self, frames: int) -> bytes:
"""读取音频数据"""
try:
if not self.recording or self.audio_queue is None:
return b'\x00' * (frames * 2) # 返回静音数据
# 使用更长的超时时间,提高音频数据获取成功率
try:
audio_data = self.audio_queue.get(timeout=0.1) # 增加超时时间
return audio_data
except queue.Empty:
# 队列为空,返回静音数据
return b'\x00' * (frames * 2)
except Exception as e:
print(f"读取音频数据失败: {e}")
return b'\x00' * (frames * 2) # 返回静音数据
def stop_recording(self):
"""停止录音"""
self.recording = False
def stop_playing(self):
"""停止播放"""
self.playing = False
if self.playback_queue:
# 清空播放队列
while not self.playback_queue.empty():
try:
self.playback_queue.get_nowait()
except queue.Empty:
break
def _flush_pre_buffer(self):
"""刷新预缓冲区到播放队列"""
if hasattr(self, 'pre_buffer') and self.pre_buffer:
for chunk in self.pre_buffer:
try:
self.playback_queue.put(chunk, timeout=0.1)
except queue.Full:
print("警告: 播放队列已满,丢弃预缓冲数据")
break
self.pre_buffer.clear()
def _apply_volume_fade(self, audio_array):
"""应用音量淡入淡出效果,减少爆音"""
try:
# 简单的淡入淡出效果
fade_samples = min(100, len(audio_array) // 10) # 淡入淡出样本数
# 淡入
for i in range(fade_samples):
factor = i / fade_samples
audio_array[i] = int(audio_array[i] * factor)
# 淡出
for i in range(fade_samples):
factor = (fade_samples - i) / fade_samples
audio_array[-(i+1)] = int(audio_array[-(i+1)] * factor)
return audio_array
except Exception as e:
print(f"音量淡入淡出失败: {e}")
return audio_array
def _detect_silence(self, audio_data):
"""检测静音"""
try:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
audio_level = np.abs(audio_array).mean()
# 更新音频电平历史
self.audio_level_history.append(audio_level)
if len(self.audio_level_history) > 10:
self.audio_level_history.pop(0)
# 计算平均音频电平
avg_level = np.mean(self.audio_level_history) if self.audio_level_history else 0
# 检测静音
is_silence = audio_level < self.silence_threshold
return is_silence, audio_level, avg_level
except Exception as e:
print(f"静音检测失败: {e}")
return False, 0, 0
def cleanup(self) -> None:
"""清理音频设备资源"""
for stream in [self.input_stream, self.output_stream]:
if stream:
stream.stop_stream()
stream.close()
self.pyaudio.terminate()
try:
self.stop_recording()
self.stop_playing()
if self.input_stream:
self.input_stream.stop()
self.input_stream.close()
if self.output_stream:
self.output_stream.stop()
self.output_stream.close()
sd.stop() # 停止所有音频播放
# 清空预缓冲区
if hasattr(self, 'pre_buffer'):
self.pre_buffer.clear()
except Exception as e:
print(f"清理音频设备失败: {e}")
class DialogSession:
@ -88,7 +312,7 @@ class DialogSession:
output_audio_format=output_audio_format, mod=mod, recv_timeout=recv_timeout)
if output_audio_format == "pcm_s16le":
config.output_audio_config["format"] = "pcm_s16le"
config.output_audio_config["bit_size"] = pyaudio.paInt16
config.output_audio_config["bit_size"] = "int16" # 使用字符串标识符
self.is_running = True
self.is_session_finished = False
@ -118,8 +342,12 @@ class DialogSession:
)
# 初始化音频队列和输出流
print(f"输出音频配置: {config.output_audio_config}")
self.output_stream = self.audio_device.open_output_stream()
output_stream = self.audio_device.open_output_stream()
if output_stream:
print("音频输出流已打开")
self.output_stream = output_stream
else:
print("警告:音频输出流打开失败,将使用直接播放模式")
# 启动播放线程
self.is_recording = True
self.is_playing = True
@ -155,11 +383,15 @@ class DialogSession:
if was_not_playing:
print("播放开始前,额外发送静音数据清理管道")
for _ in range(3):
self.output_stream.write(b'\x00' * len(audio_data))
# 播放静音数据
self.audio_device.play_audio(b'\x00' * len(audio_data))
time.sleep(0.1)
# 播放音频数据
self.output_stream.write(audio_data)
try:
self.audio_device.play_audio(audio_data)
except Exception as e:
print(f"音频播放错误: {e}")
except queue.Empty:
# 队列为空,检查是否超时
@ -614,8 +846,8 @@ class DialogSession:
# 非播放期间:正常录音
last_silence_time = current_time
# 添加exception_on_overflow=False参数来忽略溢出错误
audio_data = stream.read(config.input_audio_config["chunk"], exception_on_overflow=False)
# 使用AudioDeviceManager的专用读取方法
audio_data = self.audio_device.read_audio_data(config.input_audio_config["chunk"])
# 在发送前再次检查是否应该发送静音数据(最后一道防线)
with self.audio_queue_lock:
@ -671,6 +903,7 @@ class DialogSession:
print(f"会话错误: {e}")
finally:
if not self.is_audio_file_input:
self.audio_device.stop_recording() # 先停止录音
self.audio_device.cleanup()

View File

@ -1,7 +1,5 @@
import uuid
import pyaudio
# 配置信息
ws_connect_config = {
"base_url": "wss://openspeech.bytedance.com/api/v3/realtime/dialogue",
@ -44,17 +42,17 @@ start_session_req = {
}
input_audio_config = {
"chunk": 3200,
"chunk": 6400, # 增大缓冲区大小,减少处理频率
"format": "pcm",
"channels": 1,
"sample_rate": 16000,
"bit_size": pyaudio.paInt16,
"bit_size": "int16",
}
output_audio_config = {
"chunk": 3200,
"chunk": 6400, # 增大缓冲区大小,减少处理频率
"format": "pcm",
"channels": 1,
"sample_rate": 24000,
"bit_size": pyaudio.paFloat32,
"bit_size": "int16",
}

Binary file not shown.

View File

@ -6,7 +6,7 @@ from audio_manager import DialogSession
async def main() -> None:
parser = argparse.ArgumentParser(description="Real-time Dialog Client")
parser.add_argument("--format", type=str, default="pcm", help="The audio format (e.g., pcm, pcm_s16le).")
parser.add_argument("--format", type=str, default="pcm_s16le", help="The audio format (e.g., pcm, pcm_s16le).")
parser.add_argument("--audio", type=str, default="", help="audio file send to server, if not set, will use microphone input.")
parser.add_argument("--mod",type=str,default="audio",help="Use mod to select plain text input mode or audio mode, the default is audio mode")
parser.add_argument("--recv_timeout",type=int,default=10,help="Timeout for receiving messages,value range [10,120]")

Binary file not shown.

177
doubao/test_microphone.py Normal file
View File

@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
测试sounddevice麦克风录音功能
用于验证新的麦克风输入实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
import threading
import queue
import sys
def test_microphone():
"""测试麦克风录音"""
print("=== SoundDevice麦克风录音测试 ===")
# 1. 检查音频输入设备
print("\n1. 检查音频输入设备...")
try:
devices = sd.query_devices()
input_devices = [dev for dev in devices if dev['max_input_channels'] > 0]
print(f"找到 {len(input_devices)} 个输入设备:")
for i, dev in enumerate(input_devices):
print(f" [{i}] {dev['name']} (输入通道: {dev['max_input_channels']})")
if not input_devices:
print("错误: 没有找到可用的音频输入设备")
return False
# 查找默认输入设备
default_input = sd.default.device[0] if isinstance(sd.default.device, tuple) else sd.default.device
print(f"默认输入设备: {default_input}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试录音5秒
print("\n2. 测试录音5秒...")
try:
sample_rate = 16000
channels = 1
duration = 5
chunk_size = 3200
print(f"录音参数: 采样率={sample_rate}Hz, 通道={channels}, 时长={duration}")
print("开始录音,请说话...")
# 创建音频队列
audio_queue = queue.Queue()
recording = True
def audio_callback(indata, frames, time_info, status):
"""音频数据回调"""
if status:
print(f"音频流状态: {status}")
if recording:
audio_queue.put(indata.copy())
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size,
callback=audio_callback
) as stream:
# 录音指定时长
start_time = time.time()
audio_data = []
while time.time() - start_time < duration:
try:
data = audio_queue.get(timeout=1.0)
audio_data.append(data)
except queue.Empty:
print("警告: 音频队列为空")
break
print(f"录音完成,共收集到 {len(audio_data)} 个音频块")
# 3. 播放录制的音频
if audio_data:
print("\n3. 播放录制的音频...")
# 合并音频数据
recorded_audio = np.concatenate(audio_data, axis=0)
print(f"录制音频形状: {recorded_audio.shape}")
# 播放
print("开始播放录制的音频...")
sd.play(recorded_audio, sample_rate)
sd.wait()
print("✓ 音频播放完成")
# 保存音频文件
print("\n4. 保存音频文件...")
try:
from scipy.io import wavfile
wavfile.write('test_recording.wav', sample_rate, recorded_audio)
print("✓ 音频已保存为 test_recording.wav")
except ImportError:
print("提示: 安装scipy可保存WAV文件: pip install scipy")
else:
print("警告: 没有录制到音频数据")
return False
except Exception as e:
print(f"录音测试失败: {e}")
return False
return True
def test_stream_reading():
"""测试流式读取"""
print("\n5. 测试流式读取...")
try:
sample_rate = 16000
channels = 1
chunk_size = 3200
# 创建输入流
with sd.InputStream(
samplerate=sample_rate,
channels=channels,
dtype='int16',
blocksize=chunk_size
) as stream:
print("开始流式读取测试...")
# 读取10个数据块
for i in range(10):
audio_data = stream.read(chunk_size)
print(f"读取第 {i+1} 块数据: 形状={audio_data.shape}, 类型={audio_data.dtype}")
# 转换为字节数据
byte_data = audio_data.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
time.sleep(0.1) # 模拟实际处理间隔
print("✓ 流式读取测试完成")
except Exception as e:
print(f"流式读取测试失败: {e}")
return False
return True
if __name__ == "__main__":
print("SoundDevice麦克风录音功能测试")
print("=" * 50)
success = True
# 测试麦克风
if not test_microphone():
success = False
# 测试流式读取
if not test_stream_reading():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有麦克风测试通过")
print("树莓派应该可以正常录音了!")
else:
print("✗ 部分测试失败,需要检查音频设备和权限")
print("请确保:")
print("1. 麦克风已正确连接")
print("2. 用户有音频设备访问权限")
print("3. 没有其他程序占用音频设备")

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
快速测试修复后的录音功能
验证回调模式是否解决了元组数据问题
"""
import sys
import time
import threading
def test_recording_fix():
"""测试录音修复"""
print("=== 测试录音修复 ===")
try:
# 导入修改后的模块
sys.path.append('/home/zhuchaowe/Local-Voice/doubao')
import audio_manager
import config
# 创建音频设备管理器
audio_device = audio_manager.AudioDeviceManager(
audio_manager.AudioConfig(**config.input_audio_config),
audio_manager.AudioConfig(**config.output_audio_config)
)
print("1. 打开音频输入流...")
input_stream = audio_device.open_input_stream()
if input_stream:
print("✓ 音频输入流打开成功")
else:
print("✗ 音频输入流打开失败")
return False
print("2. 测试读取音频数据...")
# 读取几秒钟的音频数据
for i in range(10):
audio_data = audio_device.read_audio_data(config.input_audio_config["chunk"])
if audio_data:
print(f"{i+1}次读取成功: {len(audio_data)} 字节")
if i == 0:
print(f"音频数据类型: {type(audio_data)}")
else:
print(f"{i+1}次读取失败")
time.sleep(0.1)
print("3. 停止录音...")
audio_device.stop_recording()
print("4. 清理资源...")
audio_device.cleanup()
print("✓ 所有测试通过!")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_recording_fix()
if success:
print("\n🎉 录音功能修复成功!")
print("现在可以运行 main.py 测试完整功能")
else:
print("\n❌ 录音功能仍有问题")
print("请检查错误信息并调整代码")

143
doubao/test_sounddevice.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
测试sounddevice音频播放功能
用于验证新的音频实现是否正常工作
"""
import numpy as np
import sounddevice as sd
import time
def test_sounddevice():
"""测试sounddevice音频播放"""
print("=== SoundDevice音频播放测试 ===")
# 1. 检查音频设备
print("\n1. 检查音频设备...")
try:
devices = sd.query_devices()
print(f"找到 {len(devices)} 个音频设备:")
for i, dev in enumerate(devices):
print(f" [{i}] {dev['name']} (输入: {dev['max_input_channels']}, 输出: {dev['max_output_channels']})")
# 查找默认输出设备
default_output = sd.default.device
print(f"默认输出设备: {default_output}")
except Exception as e:
print(f"音频设备检查失败: {e}")
return False
# 2. 测试生成和播放音频
print("\n2. 测试生成和播放音频...")
try:
# 生成1秒的440Hz正弦波
sample_rate = 24000
duration = 1.0
frequency = 440
t = np.linspace(0, duration, int(sample_rate * duration), False)
audio_data = np.sin(2 * np.pi * frequency * t) * 0.3 # 30%音量
# 转换为16-bit整数
audio_data_int16 = (audio_data * 32767).astype(np.int16)
print(f"生成音频数据: 采样率={sample_rate}Hz, 时长={duration}秒, 频率={frequency}Hz")
print(f"音频数据形状: {audio_data_int16.shape}, 数据类型: {audio_data_int16.dtype}")
# 播放音频
print("开始播放测试音频...")
sd.play(audio_data_int16, sample_rate)
sd.wait() # 等待播放完成
print("✓ 音频播放成功")
except Exception as e:
print(f"音频播放失败: {e}")
return False
# 3. 测试直接播放字节数据
print("\n3. 测试直接播放字节数据...")
try:
# 将numpy数组转换为字节数据
byte_data = audio_data_int16.tobytes()
print(f"字节数据长度: {len(byte_data)} 字节")
# 将字节数据转换回numpy数组
audio_array = np.frombuffer(byte_data, dtype=np.int16)
# 播放
print("开始播放字节数据...")
sd.play(audio_array, sample_rate)
sd.wait()
print("✓ 字节数据播放成功")
except Exception as e:
print(f"字节数据播放失败: {e}")
return False
# 4. 测试立体声
print("\n4. 测试立体声播放...")
try:
# 创建立体声数据
stereo_data = np.column_stack([audio_data_int16, audio_data_int16])
print(f"立体声数据形状: {stereo_data.shape}")
print("开始播放立体声音频...")
sd.play(stereo_data, sample_rate)
sd.wait()
print("✓ 立体声播放成功")
except Exception as e:
print(f"立体声播放失败: {e}")
return False
return True
def test_numpy_conversion():
"""测试numpy数组转换"""
print("\n5. 测试数据类型转换...")
# 模拟火山引擎返回的16bit PCM数据
test_data = b'\x00\x00\x7f\x7f\x80\x00\xff\xff' # 一些测试音频数据
try:
# 字节数据转numpy数组
audio_array = np.frombuffer(test_data, dtype=np.int16)
print(f"原始字节数据: {test_data}")
print(f"转换后numpy数组: {audio_array}")
print(f"数组形状: {audio_array.shape}, 数据类型: {audio_array.dtype}")
# 重塑为单声道
audio_reshaped = audio_array.reshape(-1, 1)
print(f"重塑后形状: {audio_reshaped.shape}")
# 转回字节数据
byte_data = audio_array.tobytes()
print(f"转回字节数据: {byte_data}")
print("✓ 数据类型转换测试成功")
return True
except Exception as e:
print(f"数据类型转换失败: {e}")
return False
if __name__ == "__main__":
print("SoundDevice音频播放功能测试")
print("=" * 50)
success = True
# 测试sounddevice
if not test_sounddevice():
success = False
# 测试数据转换
if not test_numpy_conversion():
success = False
print("\n" + "=" * 50)
if success:
print("✓ 所有SoundDevice测试通过")
print("树莓派应该可以正常播放音频了!")
else:
print("✗ 部分测试失败,需要进一步调试")