This commit is contained in:
朱潮 2025-09-18 21:21:34 +08:00
parent 711df77d38
commit d4ff3fd774
5 changed files with 306 additions and 965 deletions

View File

@ -1,101 +0,0 @@
#!/usr/bin/env python3
"""
简单的音频测试脚本用于诊断树莓派上的音频问题
"""
import pyaudio
import time
import os
def test_audio():
"""测试音频设备"""
print("=== 音频设备测试 ===")
pa = pyaudio.PyAudio()
# 列出所有设备
print("\n可用的音频设备:")
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
print(f" 设备 {i}: {info['name']}")
print(f" 输入通道: {info['maxInputChannels']}")
print(f" 输出通道: {info['maxOutputChannels']}")
print(f" 默认采样率: {info['defaultSampleRate']}")
print()
# 查找默认输入设备
default_input = pa.get_default_input_device_info()
print(f"默认输入设备: {default_input['name']} (索引: {default_input['index']})")
# 查找默认输出设备
default_output = pa.get_default_output_device_info()
print(f"默认输出设备: {default_output['name']} (索引: {default_output['index']})")
pa.terminate()
def test_recording():
"""测试录音功能"""
print("\n=== 录音测试 ===")
pa = pyaudio.PyAudio()
try:
# 设置录音参数
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000 # 降低采样率使用设备默认的44100
CHUNK = 1024
print(f"尝试打开音频流,采样率: {RATE}")
# 打开音频流
stream = pa.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
print("开始录音5秒...")
frames = []
# 录音5秒
for i in range(0, int(RATE / CHUNK * 5)):
data = stream.read(CHUNK)
frames.append(data)
if i % 10 == 0:
print(f"录音中... {i * CHUNK / RATE:.1f}")
print("录音完成")
# 停止流
stream.stop_stream()
stream.close()
# 播放录音
print("播放录音...")
stream = pa.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
output=True
)
for frame in frames:
stream.write(frame)
stream.stop_stream()
stream.close()
print("播放完成")
except Exception as e:
print(f"录音测试失败: {e}")
finally:
pa.terminate()
if __name__ == "__main__":
test_audio()
test_recording()

119
test_audio_playback.py Normal file
View File

@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
音频播放测试脚本
用于测试树莓派的音频播放功能
"""
import subprocess
import time
import sys
import os
def test_audio_playback():
"""测试音频播放功能"""
print("=== 音频播放测试 ===")
# 检查音频设备
print("\n1. 检查音频设备...")
try:
result = subprocess.run(['aplay', '-l'], capture_output=True, text=True)
if result.returncode == 0:
print("音频设备列表:")
print(result.stdout)
else:
print("错误: 无法获取音频设备列表")
return False
except FileNotFoundError:
print("错误: aplay 命令未找到,请安装 alsa-utils")
return False
# 测试播放系统声音
print("\n2. 测试播放系统提示音...")
try:
# 使用系统内置的测试声音
result = subprocess.run(['speaker-test', '-t', 'sine', '-f', '440', '-l', '1'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✓ 系统提示音播放成功")
else:
print("✗ 系统提示音播放失败")
return False
except (subprocess.TimeoutExpired, FileNotFoundError):
print("提示: speaker-test 测试跳过,尝试直接播放音频文件")
# 创建测试音频文件并播放
print("\n3. 创建并播放测试音频文件...")
test_audio_file = "/tmp/test_audio.wav"
# 使用sox生成测试音频如果可用
if os.path.exists("/usr/bin/sox"):
try:
subprocess.run(['sox', '-n', '-r', '44100', '-c', '2', test_audio_file,
'synth', '3', 'sine', '440'], check=True)
print("✓ 测试音频文件创建成功")
except (subprocess.CalledProcessError, FileNotFoundError):
print("无法创建测试音频文件,跳过文件播放测试")
return True
else:
print("sox 未安装,跳过文件播放测试")
return True
# 播放测试音频文件
try:
result = subprocess.run(['aplay', test_audio_file], capture_output=True, text=True)
if result.returncode == 0:
print("✓ 音频文件播放成功")
return True
else:
print("✗ 音频文件播放失败")
print(f"错误信息: {result.stderr}")
return False
except FileNotFoundError:
print("错误: aplay 命令未找到")
return False
finally:
# 清理测试文件
if os.path.exists(test_audio_file):
os.remove(test_audio_file)
def check_volume():
"""检查并设置音量"""
print("\n4. 检查音量设置...")
try:
result = subprocess.run(['amixer', 'sget', 'Master'], capture_output=True, text=True)
if result.returncode == 0:
print("当前音量设置:")
print(result.stdout)
# 设置音量到80%
subprocess.run(['amixer', 'sset', 'Master', '80%'], check=True)
print("✓ 音量已设置为80%")
return True
else:
print("无法获取音量信息")
return False
except (subprocess.CalledProcessError, FileNotFoundError):
print("amixer 命令未找到或执行失败")
return False
if __name__ == "__main__":
print("树莓派音频播放功能测试")
print("=" * 40)
success = True
# 检查音量
if not check_volume():
success = False
# 测试音频播放
if not test_audio_playback():
success = False
print("\n" + "=" * 40)
if success:
print("✓ 所有音频播放测试通过")
sys.exit(0)
else:
print("✗ 部分音频播放测试失败")
sys.exit(1)

187
test_audio_recording.py Normal file
View File

@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
音频录音测试脚本
用于测试树莓派的音频录音功能
"""
import subprocess
import time
import sys
import os
import signal
def test_audio_recording():
"""测试音频录音功能"""
print("=== 音频录音测试 ===")
# 检查录音设备
print("\n1. 检查录音设备...")
try:
result = subprocess.run(['arecord', '-l'], capture_output=True, text=True)
if result.returncode == 0:
print("录音设备列表:")
print(result.stdout)
else:
print("错误: 无法获取录音设备列表")
return False
except FileNotFoundError:
print("错误: arecord 命令未找到,请安装 alsa-utils")
return False
# 录制测试音频
print("\n2. 录制测试音频5秒...")
test_record_file = "/tmp/test_record.wav"
try:
print("请对着麦克风说话5秒录音开始...")
# 录制5秒音频
result = subprocess.run(['arecord', '-d', '5', '-f', 'cd', test_record_file],
capture_output=True, text=True)
if result.returncode == 0:
print("✓ 音频录制成功")
# 检查文件是否存在且大小合理
if os.path.exists(test_record_file):
file_size = os.path.getsize(test_record_file)
print(f"录制文件大小: {file_size} 字节")
if file_size > 1000: # 至少1KB
print("✓ 录音文件大小正常")
return True
else:
print("✗ 录音文件太小,可能录音失败")
return False
else:
print("✗ 录音文件未创建")
return False
else:
print("✗ 音频录制失败")
print(f"错误信息: {result.stderr}")
return False
except FileNotFoundError:
print("错误: arecord 命令未找到")
return False
except KeyboardInterrupt:
print("\n录音被用户中断")
return False
def test_audio_playback_verification():
"""播放录制的音频进行验证"""
print("\n3. 播放录制的音频进行验证...")
test_record_file = "/tmp/test_record.wav"
if not os.path.exists(test_record_file):
print("错误: 找不到录制的音频文件")
return False
try:
print("播放录制的音频...")
result = subprocess.run(['aplay', test_record_file], capture_output=True, text=True)
if result.returncode == 0:
print("✓ 录音播放成功")
return True
else:
print("✗ 录音播放失败")
print(f"错误信息: {result.stderr}")
return False
except FileNotFoundError:
print("错误: aplay 命令未找到")
return False
def test_microphone_levels():
"""测试麦克风音量级别"""
print("\n4. 测试麦克风音量级别...")
try:
# 获取麦克风音量
result = subprocess.run(['amixer', 'sget', 'Capture'], capture_output=True, text=True)
if result.returncode == 0:
print("当前麦克风音量:")
print(result.stdout)
# 设置麦克风音量
subprocess.run(['amixer', 'sset', 'Capture', '80%'], check=True)
print("✓ 麦克风音量已设置为80%")
return True
else:
print("无法获取麦克风音量信息")
return False
except (subprocess.CalledProcessError, FileNotFoundError):
print("amixer 命令未找到或执行失败")
return False
def test_realtime_monitoring():
"""实时音频监控测试"""
print("\n5. 实时音频监控测试3秒...")
try:
print("开始实时监控,请对着麦克风说话...")
# 使用parecord进行实时监控如果可用
cmd = ['parecord', '--monitor', '--latency-msec', '100', '--duration', '3', '/dev/null']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print("✓ 实时监控测试成功")
return True
else:
print("提示: 实时监控测试跳过需要pulseaudio")
return True
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.CalledProcessError):
print("提示: 实时监控测试跳过")
return True
def cleanup():
"""清理测试文件"""
test_files = ["/tmp/test_record.wav"]
for file_path in test_files:
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"✓ 已清理测试文件: {file_path}")
except OSError:
print(f"警告: 无法清理测试文件: {file_path}")
if __name__ == "__main__":
print("树莓派音频录音功能测试")
print("=" * 40)
success = True
# 测试麦克风音量
if not test_microphone_levels():
success = False
# 测试音频录制
if not test_audio_recording():
success = False
# 播放录制的音频
if os.path.exists("/tmp/test_record.wav"):
if not test_audio_playback_verification():
success = False
# 实时监控测试
if not test_realtime_monitoring():
success = False
print("\n" + "=" * 40)
if success:
print("✓ 所有音频录音测试通过")
else:
print("✗ 部分音频录音测试失败")
# 清理测试文件
cleanup()
sys.exit(0 if success else 1)

View File

@ -1,483 +0,0 @@
#!/usr/bin/env python3
"""
Voice Assistant: Real-Time Voice Chat
This app runs on a Raspberry Pi (or Linux desktop) and creates a low-latency, full-duplex voice interaction
with an AI character. It uses local speech recognition
(Vosk), local text-to-speech synthesis (Piper), and a locally hosted large language model via Ollama.
Key Features:
- Wake-free, continuous voice recognition with real-time transcription
- LLM-driven responses streamed from a selected local model (e.g., LLaMA, Qwen, Gemma)
- Audio response synthesis with a gruff custom voice using ONNX-based Piper models
- Optional noise mixing and filtering via SoX
- System volume control via ALSA
- Modular and responsive design suitable for low-latency, character-driven agents
Ideal for embedded voice AI demos, cosplay companions, or standalone AI characters.
Copyright: M15.ai
License: MIT
"""
import io
import json
import os
import queue
import re
import subprocess
import threading
import time
import wave
import numpy as np
import pyaudio
import requests
import soxr
from pydub import AudioSegment
from vosk import KaldiRecognizer, Model
# ------------------- TIMING UTILITY -------------------
class Timer:
def __init__(self, label):
self.label = label
self.enabled = True
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enabled:
elapsed_ms = (time.time() - self.start) * 1000
print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms")
def disable(self):
self.enabled = False
# ------------------- FUNCTIONS -------------------
def get_input_device_index(preferred_name="default"):
pa = pyaudio.PyAudio()
index = None
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if preferred_name.lower() in info['name'].lower() and info['maxInputChannels'] > 0:
print(f"[Debug] Selected input device {i}: {info['name']}")
print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz")
index = i
break
pa.terminate()
if index is None:
print("[Warning] Preferred mic not found. Using default.")
return None
return index
def get_output_device_index(preferred_name="default"):
pa = pyaudio.PyAudio()
index = None
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if preferred_name.lower() in info['name'].lower() and info['maxOutputChannels'] > 0:
print(f"[Debug] Selected output device {i}: {info['name']}")
index = i
break
pa.terminate()
if index is None:
print("[Warning] Preferred output device not found. Using default.")
return None
return index
def parse_card_number(device_str):
"""
Extract ALSA card number from string like 'plughw:3,0'
"""
try:
return int(device_str.split(":")[1].split(",")[0])
except Exception as e:
print(f"[Warning] Could not parse card number from {device_str}: {e}")
return 0 # fallback
def list_input_devices():
pa = pyaudio.PyAudio()
print("[Debug] Available input devices:")
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)")
pa.terminate()
def resample_audio(data, orig_rate=48000, target_rate=16000):
# Convert byte string to numpy array
audio_np = np.frombuffer(data, dtype=np.int16)
# Resample using soxr
resampled_np = soxr.resample(audio_np, orig_rate, target_rate)
# Convert back to bytes
return resampled_np.astype(np.int16).tobytes()
def set_output_volume(volume_level, card_id=0):
"""
Set output volume using ALSA 'Speaker' control on specified card.
volume_level: 110 (user scale)
card_id: ALSA card number (from aplay -l)
"""
percent = max(1, min(volume_level, 10)) * 10 # map to 10100%
try:
subprocess.run(
['amixer', '-c', str(card_id), 'sset', 'Speaker', str(percent) + '%'],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print(f"[Debug] Volume set to {percent}% on card {card_id}")
except Exception as e:
print(f"[Warning] Volume control failed on card {card_id}: {e}")
# ------------------- PATHS -------------------
CONFIG_PATH = os.path.expanduser("va_config.json")
BASE_DIR = os.path.dirname(__file__)
MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model')
CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions'
AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' # Replace with your actual token
# ------------------- CONFIG FILE LOADING -------------------
DEFAULT_CONFIG = {
"volume": 9,
"mic_name": "default",
"audio_output_device": "default",
"model_name": "glm-4.5",
"voice": "en_US-kathleen-low.onnx",
"enable_audio_processing": False,
"history_length": 4,
"system_prompt": "You are a helpful assistant."
}
def load_config():
# Load config from system file or fall back to defaults
if os.path.isfile(CONFIG_PATH):
try:
with open(CONFIG_PATH, 'r') as f:
user_config = json.load(f)
return {**DEFAULT_CONFIG, **user_config} # merge with defaults
except Exception as e:
print(f"[Warning] Failed to load system config: {e}")
print("[Debug] Using default config.")
return DEFAULT_CONFIG
config = load_config()
# Apply loaded config values
VOLUME = config["volume"]
MIC_NAME = config["mic_name"]
AUDIO_OUTPUT_DEVICE = config["audio_output_device"]
AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"])
OUTPUT_CARD = parse_card_number(AUDIO_OUTPUT_DEVICE) if AUDIO_OUTPUT_DEVICE else 0
MODEL_NAME = config["model_name"]
VOICE_MODEL = os.path.join("voices", config["voice"])
ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"]
HISTORY_LENGTH = config["history_length"]
# Set system volume
set_output_volume(VOLUME, OUTPUT_CARD)
# Setup messages with system prompt
messages = [{"role": "system", "content": config["system_prompt"]}]
list_input_devices()
RATE = 48000
CHUNK = 1024
CHANNELS = 1
mic_enabled = True
DEVICE_INDEX = get_input_device_index()
# SOUND EFFECTS
NOISE_LEVEL = '0.04'
BANDPASS_HIGHPASS = '300'
BANDPASS_LOWPASS = '800'
# ------------------- VOICE MODEL -------------------
VOICE_MODELS_DIR = os.path.join(BASE_DIR, 'voices')
if not os.path.isdir(VOICE_MODELS_DIR):
os.makedirs(VOICE_MODELS_DIR)
VOICE_MODEL = os.path.join(VOICE_MODELS_DIR, config["voice"])
print('[Debug] Available Piper voices:')
for f in os.listdir(VOICE_MODELS_DIR):
if f.endswith('.onnx'):
print(' ', f)
print(f'[Debug] Using VOICE_MODEL: {VOICE_MODEL}')
print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}, mic={MIC_NAME}")
# ------------------- CONVERSATION STATE -------------------
audio_queue = queue.Queue()
# Audio callback form Shure
def audio_callback(in_data, frame_count, time_info, status):
global mic_enabled
if not mic_enabled:
return (None, pyaudio.paContinue)
resampled_data = resample_audio(in_data, orig_rate=48000, target_rate=16000)
audio_queue.put(resampled_data)
return (None, pyaudio.paContinue)
# ------------------- STREAM SETUP -------------------
def start_stream():
pa = pyaudio.PyAudio()
stream = pa.open(
rate=RATE,
format=pyaudio.paInt16,
channels=CHANNELS,
input=True,
input_device_index=DEVICE_INDEX,
frames_per_buffer=CHUNK,
stream_callback=audio_callback
)
stream.start_stream()
print(f'[Debug] Stream @ {RATE}Hz')
return pa, stream
# ------------------- QUERY GLM API -------------------
def query_glm():
headers = {
'Authorization': 'Bearer ' + AUTH_TOKEN,
'Content-Type': 'application/json'
}
payload = {
"model": "glm-4.5",
"messages": [messages[0]] + messages[-HISTORY_LENGTH:], # force system prompt at top
"temperature": 0.6,
"max_tokens": 1024,
"stream": False
}
with Timer("Inference"): # measure inference latency
try:
resp = requests.post(CHAT_URL, json=payload, headers=headers)
resp.raise_for_status() # Raise exception for HTTP errors
except requests.exceptions.RequestException as e:
print(f"[Error] GLM API request failed: {e}")
return ''
data = resp.json()
# Extract assistant message
reply = ''
if 'choices' in data and len(data['choices']) > 0:
choice = data['choices'][0]
if 'message' in choice and 'content' in choice['message']:
reply = choice['message']['content'].strip()
return reply
# ------------------- TTS & DEGRADATION -------------------
import tempfile
def play_response(text):
import io
import tempfile
# Mute the mic during playback to avoid feedback loop
global mic_enabled
mic_enabled = False # 🔇 mute mic
# clean the response
clean = re.sub(r"[\*]+", '', text) # remove asterisks
clean = re.sub(r"\(.*?\)", '', clean) # remove (stage directions)
clean = re.sub(r"<.*?>", '', clean) # remove HTML-style tags
clean = clean.replace('\n', ' ').strip() # normalize newlines
clean = re.sub(r'\s+', ' ', clean) # collapse whitespace
clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) # remove emojis
piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper')
# 1. Generate Piper raw PCM
with Timer("Piper inference"):
try:
piper_proc = subprocess.Popen(
[piper_path, '--model', VOICE_MODEL, '--output_raw'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_pcm, _ = piper_proc.communicate(input=clean.encode())
except Exception as e:
print(f"[Error] Piper TTS failed: {e}")
return
if ENABLE_AUDIO_PROCESSING:
# SoX timing consolidation
sox_start = time.time()
# 2. Convert raw PCM to WAV
pcm_to_wav = subprocess.Popen(
['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16',
'-e', 'signed-integer', '-', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm)
# 3. Estimate duration
duration_sec = len(tts_pcm) / (RATE * 2)
# 4. Generate white noise WAV bytes
noise_bytes = subprocess.check_output([
'sox', '-n',
'-r', '16000',
'-c', str(CHANNELS),
'-b', '16',
'-e', 'signed-integer',
'-t', 'wav', '-',
'synth', str(duration_sec),
'whitenoise', 'vol', NOISE_LEVEL
], stderr=subprocess.DEVNULL)
# 5. Write both to temp files & mix
with tempfile.NamedTemporaryFile(suffix='.wav') as tts_file, tempfile.NamedTemporaryFile(suffix='.wav') as noise_file:
tts_file.write(tts_wav_16k)
noise_file.write(noise_bytes)
tts_file.flush()
noise_file.flush()
mixer = subprocess.Popen(
['sox', '-m', tts_file.name, noise_file.name, '-t', 'wav', '-'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
mixed_bytes, _ = mixer.communicate()
# 6. Apply filter
filter_proc = subprocess.Popen(
#['sox', '-t', 'wav', '-', '-t', 'wav', '-', 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS],
['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-',
'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
final_bytes, _ = filter_proc.communicate(input=mixed_bytes)
sox_elapsed = (time.time() - sox_start) * 1000
print(f"[Timing] SoX (total): {int(sox_elapsed)} ms")
else:
# No FX: just convert raw PCM to WAV
pcm_to_wav = subprocess.Popen(
['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16',
'-e', 'signed-integer', '-', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm)
resample_proc = subprocess.Popen(
['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
final_bytes, _ = resample_proc.communicate(input=tts_wav_16k)
# 7. Playback
with Timer("Playback"):
try:
wf = wave.open(io.BytesIO(final_bytes), 'rb')
pa = pyaudio.PyAudio()
stream = pa.open(
format=pa.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE_INDEX
)
data = wf.readframes(CHUNK)
while data:
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
pa.terminate()
wf.close()
except wave.Error as e:
print(f"[Error] Could not open final WAV: {e}")
finally:
mic_enabled = True # 🔊 unmute mic
time.sleep(0.3) # optional: small cooldown
# ------------------- PROCESSING LOOP -------------------
def processing_loop():
try:
model = Model(MODEL_PATH)
except Exception as e:
print(f"[Error] Failed to load Vosk model: {e}")
print(f"[Info] Model path: {MODEL_PATH}")
return
#rec = KaldiRecognizer(model, RATE)
rec = KaldiRecognizer(model, 16000)
MAX_DEBUG_LEN = 200 # optional: limit length of debug output
LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"}
while True:
data = audio_queue.get()
if rec.AcceptWaveform(data):
start = time.time()
r = json.loads(rec.Result())
elapsed_ms = int((time.time() - start) * 1000)
user = r.get('text', '').strip()
if user:
print(f"[Timing] STT parse: {elapsed_ms} ms")
print("User:", user)
if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES:
print("[Debug] Ignored low-effort utterance.")
rec = KaldiRecognizer(model, 16000)
continue # Skip LLM response + TTS for accidental noise
messages.append({"role": "user", "content": user})
# Generate assistant response
resp_text = query_glm()
if resp_text:
# Clean debug print (remove newlines and carriage returns)
clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ')
if len(clean_debug_text) > MAX_DEBUG_LEN:
clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...'
print('Assistant:', clean_debug_text)
messages.append({"role": "assistant", "content": clean_debug_text})
# TTS generation + playback
play_response(resp_text)
else:
print('[Debug] Empty response, skipping TTS.')
# Reset recognizer after each full interaction
rec = KaldiRecognizer(model, 16000)
# ------------------- MAIN -------------------
if __name__ == '__main__':
pa, stream = start_stream()
t = threading.Thread(target=processing_loop, daemon=True)
t.start()
try:
while stream.is_active():
time.sleep(0.1)
except KeyboardInterrupt:
stream.stop_stream(); stream.close(); pa.terminate()

View File

@ -1,381 +0,0 @@
#!/usr/bin/env python3
"""
Voice Assistant: Real-Time Voice Chat (修复版)
修复了树莓派上的音频设备问题
"""
import io
import json
import os
import queue
import re
import subprocess
import threading
import time
import wave
import numpy as np
import pyaudio
import requests
import soxr
from pydub import AudioSegment
from vosk import KaldiRecognizer, Model
# ------------------- TIMING UTILITY -------------------
class Timer:
def __init__(self, label):
self.label = label
self.enabled = True
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enabled:
elapsed_ms = (time.time() - self.start) * 1000
print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms")
def disable(self):
self.enabled = False
# ------------------- FUNCTIONS -------------------
def get_input_device_index(preferred_name=None):
pa = pyaudio.PyAudio()
try:
# 首先尝试获取默认设备
if preferred_name is None:
default_input = pa.get_default_input_device_info()
print(f"[Debug] Using default input device: {default_input['name']}")
return default_input['index']
# 如果有指定名称,尝试匹配
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxInputChannels'] > 0 and preferred_name.lower() in info['name'].lower():
print(f"[Debug] Selected input device {i}: {info['name']}")
print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz")
return i
# 如果没找到,使用默认设备
default_input = pa.get_default_input_device_info()
print(f"[Warning] Preferred mic not found. Using default: {default_input['name']}")
return default_input['index']
finally:
pa.terminate()
def get_output_device_index(preferred_name=None):
pa = pyaudio.PyAudio()
try:
# 首先尝试获取默认设备
if preferred_name is None:
default_output = pa.get_default_output_device_info()
print(f"[Debug] Using default output device: {default_output['name']}")
return default_output['index']
# 如果有指定名称,尝试匹配
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxOutputChannels'] > 0 and preferred_name.lower() in info['name'].lower():
print(f"[Debug] Selected output device {i}: {info['name']}")
return i
# 如果没找到,使用默认设备
default_output = pa.get_default_output_device_info()
print(f"[Warning] Preferred output device not found. Using default: {default_output['name']}")
return default_output['index']
finally:
pa.terminate()
def list_input_devices():
pa = pyaudio.PyAudio()
try:
print("[Debug] Available input devices:")
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)")
finally:
pa.terminate()
def resample_audio(data, orig_rate=44100, target_rate=16000):
# Convert byte string to numpy array
audio_np = np.frombuffer(data, dtype=np.int16)
# Resample using soxr
resampled_np = soxr.resample(audio_np, orig_rate, target_rate)
# Convert back to bytes
return resampled_np.astype(np.int16).tobytes()
# ------------------- PATHS -------------------
CONFIG_PATH = os.path.expanduser("va_config.json")
BASE_DIR = os.path.dirname(__file__)
MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model')
CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions'
AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm'
# ------------------- CONFIG FILE LOADING -------------------
DEFAULT_CONFIG = {
"volume": 8,
"mic_name": None,
"audio_output_device": None,
"model_name": "glm-4.5",
"voice": "en_US-kathleen-low.onnx",
"enable_audio_processing": False,
"history_length": 4,
"system_prompt": "You are a helpful assistant."
}
def load_config():
if os.path.isfile(CONFIG_PATH):
try:
with open(CONFIG_PATH, 'r') as f:
user_config = json.load(f)
return {**DEFAULT_CONFIG, **user_config}
except Exception as e:
print(f"[Warning] Failed to load system config: {e}")
print("[Debug] Using default config.")
return DEFAULT_CONFIG
config = load_config()
# Apply loaded config values
VOLUME = config["volume"]
MIC_NAME = config["mic_name"]
AUDIO_OUTPUT_DEVICE = config["audio_output_device"]
AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"])
MODEL_NAME = config["model_name"]
VOICE_MODEL = os.path.join("voices", config["voice"])
ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"]
HISTORY_LENGTH = config["history_length"]
# Setup messages with system prompt
messages = [{"role": "system", "content": config["system_prompt"]}]
list_input_devices()
DEVICE_INDEX = get_input_device_index(config["mic_name"])
# 从设备获取采样率
pa = pyaudio.PyAudio()
device_info = pa.get_device_info_by_index(DEVICE_INDEX)
INPUT_RATE = int(device_info['defaultSampleRate'])
OUTPUT_RATE = int(device_info['defaultSampleRate'])
pa.terminate()
CHUNK = 1024
CHANNELS = 1
mic_enabled = True
print(f"[Debug] Using sample rate: {INPUT_RATE} Hz")
print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}")
# ------------------- CONVERSATION STATE -------------------
audio_queue = queue.Queue()
# Audio callback
def audio_callback(in_data, frame_count, time_info, status):
global mic_enabled
if not mic_enabled:
return (None, pyaudio.paContinue)
resampled_data = resample_audio(in_data, orig_rate=INPUT_RATE, target_rate=16000)
audio_queue.put(resampled_data)
return (None, pyaudio.paContinue)
# ------------------- STREAM SETUP -------------------
def start_stream():
pa = pyaudio.PyAudio()
stream = pa.open(
rate=INPUT_RATE, # 使用设备的默认采样率
format=pyaudio.paInt16,
channels=CHANNELS,
input=True,
input_device_index=DEVICE_INDEX,
frames_per_buffer=CHUNK,
stream_callback=audio_callback
)
stream.start_stream()
print(f'[Debug] Stream @ {INPUT_RATE}Hz')
return pa, stream
# ------------------- QUERY GLM API -------------------
def query_glm():
headers = {
'Authorization': 'Bearer ' + AUTH_TOKEN,
'Content-Type': 'application/json'
}
payload = {
"model": "glm-4.5",
"messages": [messages[0]] + messages[-HISTORY_LENGTH:],
"temperature": 0.6,
"max_tokens": 1024,
"stream": False
}
with Timer("Inference"):
try:
resp = requests.post(CHAT_URL, json=payload, headers=headers)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"[Error] GLM API request failed: {e}")
return ''
data = resp.json()
reply = ''
if 'choices' in data and len(data['choices']) > 0:
choice = data['choices'][0]
if 'message' in choice and 'content' in choice['message']:
reply = choice['message']['content'].strip()
return reply
# ------------------- TTS & DEGRADATION -------------------
def play_response(text):
global mic_enabled
mic_enabled = False
# clean the response
clean = re.sub(r"[\*]+", '', text)
clean = re.sub(r"\(.*?\)", '', clean)
clean = re.sub(r"<.*?>", '', clean)
clean = clean.replace('\n', ' ').strip()
clean = re.sub(r'\s+', ' ', clean)
clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean)
piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper')
if not os.path.exists(piper_path):
print(f"[Error] Piper executable not found at {piper_path}")
mic_enabled = True
return
try:
# Generate Piper raw PCM
with Timer("Piper inference"):
piper_proc = subprocess.Popen(
[piper_path, '--model', VOICE_MODEL, '--output_raw'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_pcm, _ = piper_proc.communicate(input=clean.encode())
# Convert raw PCM to WAV for playback
wav_io = io.BytesIO()
with wave.open(wav_io, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(tts_pcm)
wav_io.seek(0)
wf = wave.open(wav_io, 'rb')
# Playback
with Timer("Playback"):
pa = pyaudio.PyAudio()
stream = pa.open(
format=pa.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE_INDEX
)
data = wf.readframes(CHUNK)
while data:
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
pa.terminate()
wf.close()
except Exception as e:
print(f"[Error] TTS playback failed: {e}")
finally:
mic_enabled = True
time.sleep(0.3)
# ------------------- PROCESSING LOOP -------------------
def processing_loop():
try:
model = Model(MODEL_PATH)
print("[Debug] Vosk model loaded successfully")
except Exception as e:
print(f"[Error] Failed to load Vosk model: {e}")
print(f"[Info] Model path: {MODEL_PATH}")
return
rec = KaldiRecognizer(model, 16000)
MAX_DEBUG_LEN = 200
LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"}
while True:
try:
data = audio_queue.get()
if rec.AcceptWaveform(data):
start = time.time()
r = json.loads(rec.Result())
elapsed_ms = int((time.time() - start) * 1000)
user = r.get('text', '').strip()
if user:
print(f"[Timing] STT parse: {elapsed_ms} ms")
print("User:", user)
if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES:
print("[Debug] Ignored low-effort utterance.")
rec = KaldiRecognizer(model, 16000)
continue
messages.append({"role": "user", "content": user})
resp_text = query_glm()
if resp_text:
clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ')
if len(clean_debug_text) > MAX_DEBUG_LEN:
clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...'
print('Assistant:', clean_debug_text)
messages.append({"role": "assistant", "content": clean_debug_text})
play_response(resp_text)
else:
print('[Debug] Empty response, skipping TTS.')
rec = KaldiRecognizer(model, 16000)
except Exception as e:
print(f"[Error] Processing loop error: {e}")
time.sleep(1)
# ------------------- MAIN -------------------
if __name__ == '__main__':
try:
pa, stream = start_stream()
t = threading.Thread(target=processing_loop, daemon=True)
t.start()
print("[Debug] Voice assistant started. Press Ctrl+C to exit.")
while stream.is_active():
time.sleep(0.1)
except KeyboardInterrupt:
print("[Debug] Shutting down...")
stream.stop_stream()
stream.close()
pa.terminate()
except Exception as e:
print(f"[Error] Main loop error: {e}")
stream.stop_stream()
stream.close()
pa.terminate()