From d4ff3fd774b646776824aa9dc9138e1607a87bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Thu, 18 Sep 2025 21:21:34 +0800 Subject: [PATCH] config --- test_audio.py | 101 -------- test_audio_playback.py | 119 ++++++++++ test_audio_recording.py | 187 +++++++++++++++ voice_assistant_fixed.py | 483 --------------------------------------- voice_assistant_pi.py | 381 ------------------------------ 5 files changed, 306 insertions(+), 965 deletions(-) delete mode 100644 test_audio.py create mode 100644 test_audio_playback.py create mode 100644 test_audio_recording.py delete mode 100644 voice_assistant_fixed.py delete mode 100644 voice_assistant_pi.py diff --git a/test_audio.py b/test_audio.py deleted file mode 100644 index fa241a1..0000000 --- a/test_audio.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -""" -简单的音频测试脚本,用于诊断树莓派上的音频问题 -""" - -import pyaudio -import time -import os - -def test_audio(): - """测试音频设备""" - print("=== 音频设备测试 ===") - - pa = pyaudio.PyAudio() - - # 列出所有设备 - print("\n可用的音频设备:") - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - print(f" 设备 {i}: {info['name']}") - print(f" 输入通道: {info['maxInputChannels']}") - print(f" 输出通道: {info['maxOutputChannels']}") - print(f" 默认采样率: {info['defaultSampleRate']}") - print() - - # 查找默认输入设备 - default_input = pa.get_default_input_device_info() - print(f"默认输入设备: {default_input['name']} (索引: {default_input['index']})") - - # 查找默认输出设备 - default_output = pa.get_default_output_device_info() - print(f"默认输出设备: {default_output['name']} (索引: {default_output['index']})") - - pa.terminate() - -def test_recording(): - """测试录音功能""" - print("\n=== 录音测试 ===") - - pa = pyaudio.PyAudio() - - try: - # 设置录音参数 - FORMAT = pyaudio.paInt16 - CHANNELS = 1 - RATE = 16000 # 降低采样率,使用设备默认的44100 - CHUNK = 1024 - - print(f"尝试打开音频流,采样率: {RATE}") - - # 打开音频流 - stream = pa.open( - format=FORMAT, - channels=CHANNELS, - rate=RATE, - input=True, - frames_per_buffer=CHUNK - ) - - print("开始录音5秒...") - frames = [] - - # 录音5秒 - for i in range(0, int(RATE / CHUNK * 5)): - data = stream.read(CHUNK) - frames.append(data) - if i % 10 == 0: - print(f"录音中... {i * CHUNK / RATE:.1f}秒") - - print("录音完成") - - # 停止流 - stream.stop_stream() - stream.close() - - # 播放录音 - print("播放录音...") - stream = pa.open( - format=FORMAT, - channels=CHANNELS, - rate=RATE, - output=True - ) - - for frame in frames: - stream.write(frame) - - stream.stop_stream() - stream.close() - - print("播放完成") - - except Exception as e: - print(f"录音测试失败: {e}") - - finally: - pa.terminate() - -if __name__ == "__main__": - test_audio() - test_recording() \ No newline at end of file diff --git a/test_audio_playback.py b/test_audio_playback.py new file mode 100644 index 0000000..7b92157 --- /dev/null +++ b/test_audio_playback.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +""" +音频播放测试脚本 +用于测试树莓派的音频播放功能 +""" + +import subprocess +import time +import sys +import os + +def test_audio_playback(): + """测试音频播放功能""" + print("=== 音频播放测试 ===") + + # 检查音频设备 + print("\n1. 检查音频设备...") + try: + result = subprocess.run(['aplay', '-l'], capture_output=True, text=True) + if result.returncode == 0: + print("音频设备列表:") + print(result.stdout) + else: + print("错误: 无法获取音频设备列表") + return False + except FileNotFoundError: + print("错误: aplay 命令未找到,请安装 alsa-utils") + return False + + # 测试播放系统声音 + print("\n2. 测试播放系统提示音...") + try: + # 使用系统内置的测试声音 + result = subprocess.run(['speaker-test', '-t', 'sine', '-f', '440', '-l', '1'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + print("✓ 系统提示音播放成功") + else: + print("✗ 系统提示音播放失败") + return False + except (subprocess.TimeoutExpired, FileNotFoundError): + print("提示: speaker-test 测试跳过,尝试直接播放音频文件") + + # 创建测试音频文件并播放 + print("\n3. 创建并播放测试音频文件...") + test_audio_file = "/tmp/test_audio.wav" + + # 使用sox生成测试音频(如果可用) + if os.path.exists("/usr/bin/sox"): + try: + subprocess.run(['sox', '-n', '-r', '44100', '-c', '2', test_audio_file, + 'synth', '3', 'sine', '440'], check=True) + print("✓ 测试音频文件创建成功") + except (subprocess.CalledProcessError, FileNotFoundError): + print("无法创建测试音频文件,跳过文件播放测试") + return True + else: + print("sox 未安装,跳过文件播放测试") + return True + + # 播放测试音频文件 + try: + result = subprocess.run(['aplay', test_audio_file], capture_output=True, text=True) + if result.returncode == 0: + print("✓ 音频文件播放成功") + return True + else: + print("✗ 音频文件播放失败") + print(f"错误信息: {result.stderr}") + return False + except FileNotFoundError: + print("错误: aplay 命令未找到") + return False + finally: + # 清理测试文件 + if os.path.exists(test_audio_file): + os.remove(test_audio_file) + +def check_volume(): + """检查并设置音量""" + print("\n4. 检查音量设置...") + try: + result = subprocess.run(['amixer', 'sget', 'Master'], capture_output=True, text=True) + if result.returncode == 0: + print("当前音量设置:") + print(result.stdout) + + # 设置音量到80% + subprocess.run(['amixer', 'sset', 'Master', '80%'], check=True) + print("✓ 音量已设置为80%") + return True + else: + print("无法获取音量信息") + return False + except (subprocess.CalledProcessError, FileNotFoundError): + print("amixer 命令未找到或执行失败") + return False + +if __name__ == "__main__": + print("树莓派音频播放功能测试") + print("=" * 40) + + success = True + + # 检查音量 + if not check_volume(): + success = False + + # 测试音频播放 + if not test_audio_playback(): + success = False + + print("\n" + "=" * 40) + if success: + print("✓ 所有音频播放测试通过") + sys.exit(0) + else: + print("✗ 部分音频播放测试失败") + sys.exit(1) \ No newline at end of file diff --git a/test_audio_recording.py b/test_audio_recording.py new file mode 100644 index 0000000..e4e206d --- /dev/null +++ b/test_audio_recording.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +音频录音测试脚本 +用于测试树莓派的音频录音功能 +""" + +import subprocess +import time +import sys +import os +import signal + +def test_audio_recording(): + """测试音频录音功能""" + print("=== 音频录音测试 ===") + + # 检查录音设备 + print("\n1. 检查录音设备...") + try: + result = subprocess.run(['arecord', '-l'], capture_output=True, text=True) + if result.returncode == 0: + print("录音设备列表:") + print(result.stdout) + else: + print("错误: 无法获取录音设备列表") + return False + except FileNotFoundError: + print("错误: arecord 命令未找到,请安装 alsa-utils") + return False + + # 录制测试音频 + print("\n2. 录制测试音频(5秒)...") + test_record_file = "/tmp/test_record.wav" + + try: + print("请对着麦克风说话(5秒录音开始)...") + + # 录制5秒音频 + result = subprocess.run(['arecord', '-d', '5', '-f', 'cd', test_record_file], + capture_output=True, text=True) + + if result.returncode == 0: + print("✓ 音频录制成功") + + # 检查文件是否存在且大小合理 + if os.path.exists(test_record_file): + file_size = os.path.getsize(test_record_file) + print(f"录制文件大小: {file_size} 字节") + + if file_size > 1000: # 至少1KB + print("✓ 录音文件大小正常") + return True + else: + print("✗ 录音文件太小,可能录音失败") + return False + else: + print("✗ 录音文件未创建") + return False + else: + print("✗ 音频录制失败") + print(f"错误信息: {result.stderr}") + return False + + except FileNotFoundError: + print("错误: arecord 命令未找到") + return False + except KeyboardInterrupt: + print("\n录音被用户中断") + return False + +def test_audio_playback_verification(): + """播放录制的音频进行验证""" + print("\n3. 播放录制的音频进行验证...") + test_record_file = "/tmp/test_record.wav" + + if not os.path.exists(test_record_file): + print("错误: 找不到录制的音频文件") + return False + + try: + print("播放录制的音频...") + result = subprocess.run(['aplay', test_record_file], capture_output=True, text=True) + + if result.returncode == 0: + print("✓ 录音播放成功") + return True + else: + print("✗ 录音播放失败") + print(f"错误信息: {result.stderr}") + return False + + except FileNotFoundError: + print("错误: aplay 命令未找到") + return False + +def test_microphone_levels(): + """测试麦克风音量级别""" + print("\n4. 测试麦克风音量级别...") + + try: + # 获取麦克风音量 + result = subprocess.run(['amixer', 'sget', 'Capture'], capture_output=True, text=True) + + if result.returncode == 0: + print("当前麦克风音量:") + print(result.stdout) + + # 设置麦克风音量 + subprocess.run(['amixer', 'sset', 'Capture', '80%'], check=True) + print("✓ 麦克风音量已设置为80%") + return True + else: + print("无法获取麦克风音量信息") + return False + + except (subprocess.CalledProcessError, FileNotFoundError): + print("amixer 命令未找到或执行失败") + return False + +def test_realtime_monitoring(): + """实时音频监控测试""" + print("\n5. 实时音频监控测试(3秒)...") + + try: + print("开始实时监控,请对着麦克风说话...") + + # 使用parecord进行实时监控(如果可用) + cmd = ['parecord', '--monitor', '--latency-msec', '100', '--duration', '3', '/dev/null'] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) + + if result.returncode == 0: + print("✓ 实时监控测试成功") + return True + else: + print("提示: 实时监控测试跳过(需要pulseaudio)") + return True + + except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.CalledProcessError): + print("提示: 实时监控测试跳过") + return True + +def cleanup(): + """清理测试文件""" + test_files = ["/tmp/test_record.wav"] + + for file_path in test_files: + if os.path.exists(file_path): + try: + os.remove(file_path) + print(f"✓ 已清理测试文件: {file_path}") + except OSError: + print(f"警告: 无法清理测试文件: {file_path}") + +if __name__ == "__main__": + print("树莓派音频录音功能测试") + print("=" * 40) + + success = True + + # 测试麦克风音量 + if not test_microphone_levels(): + success = False + + # 测试音频录制 + if not test_audio_recording(): + success = False + + # 播放录制的音频 + if os.path.exists("/tmp/test_record.wav"): + if not test_audio_playback_verification(): + success = False + + # 实时监控测试 + if not test_realtime_monitoring(): + success = False + + print("\n" + "=" * 40) + if success: + print("✓ 所有音频录音测试通过") + else: + print("✗ 部分音频录音测试失败") + + # 清理测试文件 + cleanup() + + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/voice_assistant_fixed.py b/voice_assistant_fixed.py deleted file mode 100644 index af88140..0000000 --- a/voice_assistant_fixed.py +++ /dev/null @@ -1,483 +0,0 @@ -#!/usr/bin/env python3 -""" -Voice Assistant: Real-Time Voice Chat - -This app runs on a Raspberry Pi (or Linux desktop) and creates a low-latency, full-duplex voice interaction -with an AI character. It uses local speech recognition -(Vosk), local text-to-speech synthesis (Piper), and a locally hosted large language model via Ollama. - -Key Features: -- Wake-free, continuous voice recognition with real-time transcription -- LLM-driven responses streamed from a selected local model (e.g., LLaMA, Qwen, Gemma) -- Audio response synthesis with a gruff custom voice using ONNX-based Piper models -- Optional noise mixing and filtering via SoX -- System volume control via ALSA -- Modular and responsive design suitable for low-latency, character-driven agents - -Ideal for embedded voice AI demos, cosplay companions, or standalone AI characters. - -Copyright: M15.ai -License: MIT -""" - -import io -import json -import os -import queue -import re -import subprocess -import threading -import time -import wave - -import numpy as np -import pyaudio -import requests -import soxr -from pydub import AudioSegment -from vosk import KaldiRecognizer, Model - - -# ------------------- TIMING UTILITY ------------------- -class Timer: - def __init__(self, label): - self.label = label - self.enabled = True - def __enter__(self): - self.start = time.time() - return self - def __exit__(self, exc_type, exc_val, exc_tb): - if self.enabled: - elapsed_ms = (time.time() - self.start) * 1000 - print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms") - def disable(self): - self.enabled = False - -# ------------------- FUNCTIONS ------------------- - -def get_input_device_index(preferred_name="default"): - pa = pyaudio.PyAudio() - index = None - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - if preferred_name.lower() in info['name'].lower() and info['maxInputChannels'] > 0: - print(f"[Debug] Selected input device {i}: {info['name']}") - print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz") - index = i - break - pa.terminate() - if index is None: - print("[Warning] Preferred mic not found. Using default.") - return None - return index - -def get_output_device_index(preferred_name="default"): - pa = pyaudio.PyAudio() - index = None - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - if preferred_name.lower() in info['name'].lower() and info['maxOutputChannels'] > 0: - print(f"[Debug] Selected output device {i}: {info['name']}") - index = i - break - pa.terminate() - if index is None: - print("[Warning] Preferred output device not found. Using default.") - return None - return index - -def parse_card_number(device_str): - """ - Extract ALSA card number from string like 'plughw:3,0' - """ - try: - return int(device_str.split(":")[1].split(",")[0]) - except Exception as e: - print(f"[Warning] Could not parse card number from {device_str}: {e}") - return 0 # fallback - -def list_input_devices(): - pa = pyaudio.PyAudio() - print("[Debug] Available input devices:") - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - if info['maxInputChannels'] > 0: - print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)") - pa.terminate() - -def resample_audio(data, orig_rate=48000, target_rate=16000): - # Convert byte string to numpy array - audio_np = np.frombuffer(data, dtype=np.int16) - # Resample using soxr - resampled_np = soxr.resample(audio_np, orig_rate, target_rate) - # Convert back to bytes - return resampled_np.astype(np.int16).tobytes() - -def set_output_volume(volume_level, card_id=0): - """ - Set output volume using ALSA 'Speaker' control on specified card. - volume_level: 1–10 (user scale) - card_id: ALSA card number (from aplay -l) - """ - percent = max(1, min(volume_level, 10)) * 10 # map to 10–100% - try: - subprocess.run( - ['amixer', '-c', str(card_id), 'sset', 'Speaker', str(percent) + '%'], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL - ) - print(f"[Debug] Volume set to {percent}% on card {card_id}") - except Exception as e: - print(f"[Warning] Volume control failed on card {card_id}: {e}") - -# ------------------- PATHS ------------------- - -CONFIG_PATH = os.path.expanduser("va_config.json") -BASE_DIR = os.path.dirname(__file__) -MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model') -CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions' -AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' # Replace with your actual token - -# ------------------- CONFIG FILE LOADING ------------------- - -DEFAULT_CONFIG = { - "volume": 9, - "mic_name": "default", - "audio_output_device": "default", - "model_name": "glm-4.5", - "voice": "en_US-kathleen-low.onnx", - "enable_audio_processing": False, - "history_length": 4, - "system_prompt": "You are a helpful assistant." -} - -def load_config(): - # Load config from system file or fall back to defaults - if os.path.isfile(CONFIG_PATH): - try: - with open(CONFIG_PATH, 'r') as f: - user_config = json.load(f) - return {**DEFAULT_CONFIG, **user_config} # merge with defaults - except Exception as e: - print(f"[Warning] Failed to load system config: {e}") - - print("[Debug] Using default config.") - return DEFAULT_CONFIG - -config = load_config() - -# Apply loaded config values -VOLUME = config["volume"] -MIC_NAME = config["mic_name"] -AUDIO_OUTPUT_DEVICE = config["audio_output_device"] -AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"]) -OUTPUT_CARD = parse_card_number(AUDIO_OUTPUT_DEVICE) if AUDIO_OUTPUT_DEVICE else 0 -MODEL_NAME = config["model_name"] -VOICE_MODEL = os.path.join("voices", config["voice"]) -ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"] -HISTORY_LENGTH = config["history_length"] - -# Set system volume -set_output_volume(VOLUME, OUTPUT_CARD) - -# Setup messages with system prompt -messages = [{"role": "system", "content": config["system_prompt"]}] - -list_input_devices() -RATE = 48000 -CHUNK = 1024 -CHANNELS = 1 -mic_enabled = True -DEVICE_INDEX = get_input_device_index() - -# SOUND EFFECTS -NOISE_LEVEL = '0.04' -BANDPASS_HIGHPASS = '300' -BANDPASS_LOWPASS = '800' - -# ------------------- VOICE MODEL ------------------- - -VOICE_MODELS_DIR = os.path.join(BASE_DIR, 'voices') -if not os.path.isdir(VOICE_MODELS_DIR): - os.makedirs(VOICE_MODELS_DIR) - -VOICE_MODEL = os.path.join(VOICE_MODELS_DIR, config["voice"]) - -print('[Debug] Available Piper voices:') -for f in os.listdir(VOICE_MODELS_DIR): - if f.endswith('.onnx'): - print(' ', f) -print(f'[Debug] Using VOICE_MODEL: {VOICE_MODEL}') -print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}, mic={MIC_NAME}") - -# ------------------- CONVERSATION STATE ------------------- - -audio_queue = queue.Queue() - -# Audio callback form Shure -def audio_callback(in_data, frame_count, time_info, status): - global mic_enabled - if not mic_enabled: - return (None, pyaudio.paContinue) - resampled_data = resample_audio(in_data, orig_rate=48000, target_rate=16000) - audio_queue.put(resampled_data) - return (None, pyaudio.paContinue) - -# ------------------- STREAM SETUP ------------------- - -def start_stream(): - pa = pyaudio.PyAudio() - - stream = pa.open( - rate=RATE, - format=pyaudio.paInt16, - channels=CHANNELS, - input=True, - input_device_index=DEVICE_INDEX, - frames_per_buffer=CHUNK, - stream_callback=audio_callback - ) - stream.start_stream() - print(f'[Debug] Stream @ {RATE}Hz') - return pa, stream - -# ------------------- QUERY GLM API ------------------- - -def query_glm(): - headers = { - 'Authorization': 'Bearer ' + AUTH_TOKEN, - 'Content-Type': 'application/json' - } - payload = { - "model": "glm-4.5", - "messages": [messages[0]] + messages[-HISTORY_LENGTH:], # force system prompt at top - "temperature": 0.6, - "max_tokens": 1024, - "stream": False - } - - with Timer("Inference"): # measure inference latency - try: - resp = requests.post(CHAT_URL, json=payload, headers=headers) - resp.raise_for_status() # Raise exception for HTTP errors - except requests.exceptions.RequestException as e: - print(f"[Error] GLM API request failed: {e}") - return '' - - data = resp.json() - # Extract assistant message - reply = '' - if 'choices' in data and len(data['choices']) > 0: - choice = data['choices'][0] - if 'message' in choice and 'content' in choice['message']: - reply = choice['message']['content'].strip() - return reply - -# ------------------- TTS & DEGRADATION ------------------- - -import tempfile - -def play_response(text): - import io - import tempfile - - # Mute the mic during playback to avoid feedback loop - global mic_enabled - mic_enabled = False # 🔇 mute mic - - # clean the response - clean = re.sub(r"[\*]+", '', text) # remove asterisks - clean = re.sub(r"\(.*?\)", '', clean) # remove (stage directions) - clean = re.sub(r"<.*?>", '', clean) # remove HTML-style tags - clean = clean.replace('\n', ' ').strip() # normalize newlines - clean = re.sub(r'\s+', ' ', clean) # collapse whitespace - clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) # remove emojis - - piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper') - - # 1. Generate Piper raw PCM - with Timer("Piper inference"): - try: - piper_proc = subprocess.Popen( - [piper_path, '--model', VOICE_MODEL, '--output_raw'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - tts_pcm, _ = piper_proc.communicate(input=clean.encode()) - except Exception as e: - print(f"[Error] Piper TTS failed: {e}") - return - - if ENABLE_AUDIO_PROCESSING: - # SoX timing consolidation - sox_start = time.time() - - # 2. Convert raw PCM to WAV - pcm_to_wav = subprocess.Popen( - ['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16', - '-e', 'signed-integer', '-', '-t', 'wav', '-'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm) - - # 3. Estimate duration - duration_sec = len(tts_pcm) / (RATE * 2) - - # 4. Generate white noise WAV bytes - noise_bytes = subprocess.check_output([ - 'sox', '-n', - '-r', '16000', - '-c', str(CHANNELS), - '-b', '16', - '-e', 'signed-integer', - '-t', 'wav', '-', - 'synth', str(duration_sec), - 'whitenoise', 'vol', NOISE_LEVEL - ], stderr=subprocess.DEVNULL) - - # 5. Write both to temp files & mix - with tempfile.NamedTemporaryFile(suffix='.wav') as tts_file, tempfile.NamedTemporaryFile(suffix='.wav') as noise_file: - tts_file.write(tts_wav_16k) - noise_file.write(noise_bytes) - tts_file.flush() - noise_file.flush() - mixer = subprocess.Popen( - ['sox', '-m', tts_file.name, noise_file.name, '-t', 'wav', '-'], - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - mixed_bytes, _ = mixer.communicate() - - # 6. Apply filter - filter_proc = subprocess.Popen( - #['sox', '-t', 'wav', '-', '-t', 'wav', '-', 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS], - ['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-', - 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - final_bytes, _ = filter_proc.communicate(input=mixed_bytes) - - sox_elapsed = (time.time() - sox_start) * 1000 - print(f"[Timing] SoX (total): {int(sox_elapsed)} ms") - - else: - # No FX: just convert raw PCM to WAV - pcm_to_wav = subprocess.Popen( - ['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16', - '-e', 'signed-integer', '-', '-t', 'wav', '-'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm) - - resample_proc = subprocess.Popen( - ['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - final_bytes, _ = resample_proc.communicate(input=tts_wav_16k) - - # 7. Playback - with Timer("Playback"): - try: - wf = wave.open(io.BytesIO(final_bytes), 'rb') - - pa = pyaudio.PyAudio() - stream = pa.open( - format=pa.get_format_from_width(wf.getsampwidth()), - channels=wf.getnchannels(), - rate=wf.getframerate(), - output=True, - output_device_index=AUDIO_OUTPUT_DEVICE_INDEX - ) - - data = wf.readframes(CHUNK) - while data: - stream.write(data) - data = wf.readframes(CHUNK) - - stream.stop_stream() - stream.close() - pa.terminate() - wf.close() - - except wave.Error as e: - print(f"[Error] Could not open final WAV: {e}") - - finally: - mic_enabled = True # 🔊 unmute mic - time.sleep(0.3) # optional: small cooldown - - -# ------------------- PROCESSING LOOP ------------------- - -def processing_loop(): - try: - model = Model(MODEL_PATH) - except Exception as e: - print(f"[Error] Failed to load Vosk model: {e}") - print(f"[Info] Model path: {MODEL_PATH}") - return - - #rec = KaldiRecognizer(model, RATE) - rec = KaldiRecognizer(model, 16000) - MAX_DEBUG_LEN = 200 # optional: limit length of debug output - LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"} - - while True: - data = audio_queue.get() - - if rec.AcceptWaveform(data): - start = time.time() - r = json.loads(rec.Result()) - elapsed_ms = int((time.time() - start) * 1000) - - user = r.get('text', '').strip() - if user: - print(f"[Timing] STT parse: {elapsed_ms} ms") - print("User:", user) - - if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES: - print("[Debug] Ignored low-effort utterance.") - rec = KaldiRecognizer(model, 16000) - continue # Skip LLM response + TTS for accidental noise - - messages.append({"role": "user", "content": user}) - # Generate assistant response - resp_text = query_glm() - if resp_text: - # Clean debug print (remove newlines and carriage returns) - clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ') - if len(clean_debug_text) > MAX_DEBUG_LEN: - clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...' - - print('Assistant:', clean_debug_text) - messages.append({"role": "assistant", "content": clean_debug_text}) - - # TTS generation + playback - play_response(resp_text) - else: - print('[Debug] Empty response, skipping TTS.') - - # Reset recognizer after each full interaction - rec = KaldiRecognizer(model, 16000) - -# ------------------- MAIN ------------------- - -if __name__ == '__main__': - pa, stream = start_stream() - t = threading.Thread(target=processing_loop, daemon=True) - t.start() - try: - while stream.is_active(): - time.sleep(0.1) - except KeyboardInterrupt: - stream.stop_stream(); stream.close(); pa.terminate() \ No newline at end of file diff --git a/voice_assistant_pi.py b/voice_assistant_pi.py deleted file mode 100644 index d9cce18..0000000 --- a/voice_assistant_pi.py +++ /dev/null @@ -1,381 +0,0 @@ -#!/usr/bin/env python3 -""" -Voice Assistant: Real-Time Voice Chat (修复版) - -修复了树莓派上的音频设备问题 -""" - -import io -import json -import os -import queue -import re -import subprocess -import threading -import time -import wave - -import numpy as np -import pyaudio -import requests -import soxr -from pydub import AudioSegment -from vosk import KaldiRecognizer, Model - - -# ------------------- TIMING UTILITY ------------------- -class Timer: - def __init__(self, label): - self.label = label - self.enabled = True - def __enter__(self): - self.start = time.time() - return self - def __exit__(self, exc_type, exc_val, exc_tb): - if self.enabled: - elapsed_ms = (time.time() - self.start) * 1000 - print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms") - def disable(self): - self.enabled = False - -# ------------------- FUNCTIONS ------------------- - -def get_input_device_index(preferred_name=None): - pa = pyaudio.PyAudio() - try: - # 首先尝试获取默认设备 - if preferred_name is None: - default_input = pa.get_default_input_device_info() - print(f"[Debug] Using default input device: {default_input['name']}") - return default_input['index'] - - # 如果有指定名称,尝试匹配 - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - if info['maxInputChannels'] > 0 and preferred_name.lower() in info['name'].lower(): - print(f"[Debug] Selected input device {i}: {info['name']}") - print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz") - return i - - # 如果没找到,使用默认设备 - default_input = pa.get_default_input_device_info() - print(f"[Warning] Preferred mic not found. Using default: {default_input['name']}") - return default_input['index'] - finally: - pa.terminate() - -def get_output_device_index(preferred_name=None): - pa = pyaudio.PyAudio() - try: - # 首先尝试获取默认设备 - if preferred_name is None: - default_output = pa.get_default_output_device_info() - print(f"[Debug] Using default output device: {default_output['name']}") - return default_output['index'] - - # 如果有指定名称,尝试匹配 - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - if info['maxOutputChannels'] > 0 and preferred_name.lower() in info['name'].lower(): - print(f"[Debug] Selected output device {i}: {info['name']}") - return i - - # 如果没找到,使用默认设备 - default_output = pa.get_default_output_device_info() - print(f"[Warning] Preferred output device not found. Using default: {default_output['name']}") - return default_output['index'] - finally: - pa.terminate() - -def list_input_devices(): - pa = pyaudio.PyAudio() - try: - print("[Debug] Available input devices:") - for i in range(pa.get_device_count()): - info = pa.get_device_info_by_index(i) - if info['maxInputChannels'] > 0: - print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)") - finally: - pa.terminate() - -def resample_audio(data, orig_rate=44100, target_rate=16000): - # Convert byte string to numpy array - audio_np = np.frombuffer(data, dtype=np.int16) - # Resample using soxr - resampled_np = soxr.resample(audio_np, orig_rate, target_rate) - # Convert back to bytes - return resampled_np.astype(np.int16).tobytes() - -# ------------------- PATHS ------------------- - -CONFIG_PATH = os.path.expanduser("va_config.json") -BASE_DIR = os.path.dirname(__file__) -MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model') -CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions' -AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' - -# ------------------- CONFIG FILE LOADING ------------------- - -DEFAULT_CONFIG = { - "volume": 8, - "mic_name": None, - "audio_output_device": None, - "model_name": "glm-4.5", - "voice": "en_US-kathleen-low.onnx", - "enable_audio_processing": False, - "history_length": 4, - "system_prompt": "You are a helpful assistant." -} - -def load_config(): - if os.path.isfile(CONFIG_PATH): - try: - with open(CONFIG_PATH, 'r') as f: - user_config = json.load(f) - return {**DEFAULT_CONFIG, **user_config} - except Exception as e: - print(f"[Warning] Failed to load system config: {e}") - - print("[Debug] Using default config.") - return DEFAULT_CONFIG - -config = load_config() - -# Apply loaded config values -VOLUME = config["volume"] -MIC_NAME = config["mic_name"] -AUDIO_OUTPUT_DEVICE = config["audio_output_device"] -AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"]) -MODEL_NAME = config["model_name"] -VOICE_MODEL = os.path.join("voices", config["voice"]) -ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"] -HISTORY_LENGTH = config["history_length"] - -# Setup messages with system prompt -messages = [{"role": "system", "content": config["system_prompt"]}] - -list_input_devices() -DEVICE_INDEX = get_input_device_index(config["mic_name"]) - -# 从设备获取采样率 -pa = pyaudio.PyAudio() -device_info = pa.get_device_info_by_index(DEVICE_INDEX) -INPUT_RATE = int(device_info['defaultSampleRate']) -OUTPUT_RATE = int(device_info['defaultSampleRate']) -pa.terminate() - -CHUNK = 1024 -CHANNELS = 1 -mic_enabled = True - -print(f"[Debug] Using sample rate: {INPUT_RATE} Hz") -print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}") - -# ------------------- CONVERSATION STATE ------------------- - -audio_queue = queue.Queue() - -# Audio callback -def audio_callback(in_data, frame_count, time_info, status): - global mic_enabled - if not mic_enabled: - return (None, pyaudio.paContinue) - resampled_data = resample_audio(in_data, orig_rate=INPUT_RATE, target_rate=16000) - audio_queue.put(resampled_data) - return (None, pyaudio.paContinue) - -# ------------------- STREAM SETUP ------------------- - -def start_stream(): - pa = pyaudio.PyAudio() - - stream = pa.open( - rate=INPUT_RATE, # 使用设备的默认采样率 - format=pyaudio.paInt16, - channels=CHANNELS, - input=True, - input_device_index=DEVICE_INDEX, - frames_per_buffer=CHUNK, - stream_callback=audio_callback - ) - stream.start_stream() - print(f'[Debug] Stream @ {INPUT_RATE}Hz') - return pa, stream - -# ------------------- QUERY GLM API ------------------- - -def query_glm(): - headers = { - 'Authorization': 'Bearer ' + AUTH_TOKEN, - 'Content-Type': 'application/json' - } - payload = { - "model": "glm-4.5", - "messages": [messages[0]] + messages[-HISTORY_LENGTH:], - "temperature": 0.6, - "max_tokens": 1024, - "stream": False - } - - with Timer("Inference"): - try: - resp = requests.post(CHAT_URL, json=payload, headers=headers) - resp.raise_for_status() - except requests.exceptions.RequestException as e: - print(f"[Error] GLM API request failed: {e}") - return '' - - data = resp.json() - reply = '' - if 'choices' in data and len(data['choices']) > 0: - choice = data['choices'][0] - if 'message' in choice and 'content' in choice['message']: - reply = choice['message']['content'].strip() - return reply - -# ------------------- TTS & DEGRADATION ------------------- - -def play_response(text): - global mic_enabled - mic_enabled = False - - # clean the response - clean = re.sub(r"[\*]+", '', text) - clean = re.sub(r"\(.*?\)", '', clean) - clean = re.sub(r"<.*?>", '', clean) - clean = clean.replace('\n', ' ').strip() - clean = re.sub(r'\s+', ' ', clean) - clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) - - piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper') - - if not os.path.exists(piper_path): - print(f"[Error] Piper executable not found at {piper_path}") - mic_enabled = True - return - - try: - # Generate Piper raw PCM - with Timer("Piper inference"): - piper_proc = subprocess.Popen( - [piper_path, '--model', VOICE_MODEL, '--output_raw'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL - ) - tts_pcm, _ = piper_proc.communicate(input=clean.encode()) - - # Convert raw PCM to WAV for playback - wav_io = io.BytesIO() - with wave.open(wav_io, 'wb') as wf: - wf.setnchannels(1) - wf.setsampwidth(2) - wf.setframerate(16000) - wf.writeframes(tts_pcm) - - wav_io.seek(0) - wf = wave.open(wav_io, 'rb') - - # Playback - with Timer("Playback"): - pa = pyaudio.PyAudio() - stream = pa.open( - format=pa.get_format_from_width(wf.getsampwidth()), - channels=wf.getnchannels(), - rate=wf.getframerate(), - output=True, - output_device_index=AUDIO_OUTPUT_DEVICE_INDEX - ) - - data = wf.readframes(CHUNK) - while data: - stream.write(data) - data = wf.readframes(CHUNK) - - stream.stop_stream() - stream.close() - pa.terminate() - wf.close() - - except Exception as e: - print(f"[Error] TTS playback failed: {e}") - finally: - mic_enabled = True - time.sleep(0.3) - -# ------------------- PROCESSING LOOP ------------------- - -def processing_loop(): - try: - model = Model(MODEL_PATH) - print("[Debug] Vosk model loaded successfully") - except Exception as e: - print(f"[Error] Failed to load Vosk model: {e}") - print(f"[Info] Model path: {MODEL_PATH}") - return - - rec = KaldiRecognizer(model, 16000) - MAX_DEBUG_LEN = 200 - LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"} - - while True: - try: - data = audio_queue.get() - - if rec.AcceptWaveform(data): - start = time.time() - r = json.loads(rec.Result()) - elapsed_ms = int((time.time() - start) * 1000) - - user = r.get('text', '').strip() - if user: - print(f"[Timing] STT parse: {elapsed_ms} ms") - print("User:", user) - - if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES: - print("[Debug] Ignored low-effort utterance.") - rec = KaldiRecognizer(model, 16000) - continue - - messages.append({"role": "user", "content": user}) - resp_text = query_glm() - - if resp_text: - clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ') - if len(clean_debug_text) > MAX_DEBUG_LEN: - clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...' - - print('Assistant:', clean_debug_text) - messages.append({"role": "assistant", "content": clean_debug_text}) - play_response(resp_text) - else: - print('[Debug] Empty response, skipping TTS.') - - rec = KaldiRecognizer(model, 16000) - - except Exception as e: - print(f"[Error] Processing loop error: {e}") - time.sleep(1) - -# ------------------- MAIN ------------------- - -if __name__ == '__main__': - try: - pa, stream = start_stream() - t = threading.Thread(target=processing_loop, daemon=True) - t.start() - - print("[Debug] Voice assistant started. Press Ctrl+C to exit.") - while stream.is_active(): - time.sleep(0.1) - - except KeyboardInterrupt: - print("[Debug] Shutting down...") - stream.stop_stream() - stream.close() - pa.terminate() - except Exception as e: - print(f"[Error] Main loop error: {e}") - stream.stop_stream() - stream.close() - pa.terminate() \ No newline at end of file