#!/usr/bin/env python3 """ Voice Assistant: Real-Time Voice Chat (修复版) 修复了树莓派上的音频设备问题 """ import io import json import os import queue import re import subprocess import threading import time import wave import numpy as np import pyaudio import requests import soxr from pydub import AudioSegment from vosk import KaldiRecognizer, Model # ------------------- TIMING UTILITY ------------------- class Timer: def __init__(self, label): self.label = label self.enabled = True def __enter__(self): self.start = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.enabled: elapsed_ms = (time.time() - self.start) * 1000 print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms") def disable(self): self.enabled = False # ------------------- FUNCTIONS ------------------- def get_input_device_index(preferred_name=None): pa = pyaudio.PyAudio() try: # 首先尝试获取默认设备 if preferred_name is None: default_input = pa.get_default_input_device_info() print(f"[Debug] Using default input device: {default_input['name']}") return default_input['index'] # 如果有指定名称,尝试匹配 for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info['maxInputChannels'] > 0 and preferred_name.lower() in info['name'].lower(): print(f"[Debug] Selected input device {i}: {info['name']}") print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz") return i # 如果没找到,使用默认设备 default_input = pa.get_default_input_device_info() print(f"[Warning] Preferred mic not found. Using default: {default_input['name']}") return default_input['index'] finally: pa.terminate() def get_output_device_index(preferred_name=None): pa = pyaudio.PyAudio() try: # 首先尝试获取默认设备 if preferred_name is None: default_output = pa.get_default_output_device_info() print(f"[Debug] Using default output device: {default_output['name']}") return default_output['index'] # 如果有指定名称,尝试匹配 for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info['maxOutputChannels'] > 0 and preferred_name.lower() in info['name'].lower(): print(f"[Debug] Selected output device {i}: {info['name']}") return i # 如果没找到,使用默认设备 default_output = pa.get_default_output_device_info() print(f"[Warning] Preferred output device not found. Using default: {default_output['name']}") return default_output['index'] finally: pa.terminate() def list_input_devices(): pa = pyaudio.PyAudio() try: print("[Debug] Available input devices:") for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info['maxInputChannels'] > 0: print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)") finally: pa.terminate() def resample_audio(data, orig_rate=44100, target_rate=16000): # Convert byte string to numpy array audio_np = np.frombuffer(data, dtype=np.int16) # Resample using soxr resampled_np = soxr.resample(audio_np, orig_rate, target_rate) # Convert back to bytes return resampled_np.astype(np.int16).tobytes() # ------------------- PATHS ------------------- CONFIG_PATH = os.path.expanduser("va_config.json") BASE_DIR = os.path.dirname(__file__) MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model') CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions' AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' # ------------------- CONFIG FILE LOADING ------------------- DEFAULT_CONFIG = { "volume": 8, "mic_name": None, "audio_output_device": None, "model_name": "glm-4.5", "voice": "en_US-kathleen-low.onnx", "enable_audio_processing": False, "history_length": 4, "system_prompt": "You are a helpful assistant." } def load_config(): if os.path.isfile(CONFIG_PATH): try: with open(CONFIG_PATH, 'r') as f: user_config = json.load(f) return {**DEFAULT_CONFIG, **user_config} except Exception as e: print(f"[Warning] Failed to load system config: {e}") print("[Debug] Using default config.") return DEFAULT_CONFIG config = load_config() # Apply loaded config values VOLUME = config["volume"] MIC_NAME = config["mic_name"] AUDIO_OUTPUT_DEVICE = config["audio_output_device"] AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"]) MODEL_NAME = config["model_name"] VOICE_MODEL = os.path.join("voices", config["voice"]) ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"] HISTORY_LENGTH = config["history_length"] # Setup messages with system prompt messages = [{"role": "system", "content": config["system_prompt"]}] list_input_devices() DEVICE_INDEX = get_input_device_index(config["mic_name"]) # 从设备获取采样率 pa = pyaudio.PyAudio() device_info = pa.get_device_info_by_index(DEVICE_INDEX) INPUT_RATE = int(device_info['defaultSampleRate']) OUTPUT_RATE = int(device_info['defaultSampleRate']) pa.terminate() CHUNK = 1024 CHANNELS = 1 mic_enabled = True print(f"[Debug] Using sample rate: {INPUT_RATE} Hz") print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}") # ------------------- CONVERSATION STATE ------------------- audio_queue = queue.Queue() # Audio callback def audio_callback(in_data, frame_count, time_info, status): global mic_enabled if not mic_enabled: return (None, pyaudio.paContinue) resampled_data = resample_audio(in_data, orig_rate=INPUT_RATE, target_rate=16000) audio_queue.put(resampled_data) return (None, pyaudio.paContinue) # ------------------- STREAM SETUP ------------------- def start_stream(): pa = pyaudio.PyAudio() stream = pa.open( rate=INPUT_RATE, # 使用设备的默认采样率 format=pyaudio.paInt16, channels=CHANNELS, input=True, input_device_index=DEVICE_INDEX, frames_per_buffer=CHUNK, stream_callback=audio_callback ) stream.start_stream() print(f'[Debug] Stream @ {INPUT_RATE}Hz') return pa, stream # ------------------- QUERY GLM API ------------------- def query_glm(): headers = { 'Authorization': 'Bearer ' + AUTH_TOKEN, 'Content-Type': 'application/json' } payload = { "model": "glm-4.5", "messages": [messages[0]] + messages[-HISTORY_LENGTH:], "temperature": 0.6, "max_tokens": 1024, "stream": False } with Timer("Inference"): try: resp = requests.post(CHAT_URL, json=payload, headers=headers) resp.raise_for_status() except requests.exceptions.RequestException as e: print(f"[Error] GLM API request failed: {e}") return '' data = resp.json() reply = '' if 'choices' in data and len(data['choices']) > 0: choice = data['choices'][0] if 'message' in choice and 'content' in choice['message']: reply = choice['message']['content'].strip() return reply # ------------------- TTS & DEGRADATION ------------------- def play_response(text): global mic_enabled mic_enabled = False # clean the response clean = re.sub(r"[\*]+", '', text) clean = re.sub(r"\(.*?\)", '', clean) clean = re.sub(r"<.*?>", '', clean) clean = clean.replace('\n', ' ').strip() clean = re.sub(r'\s+', ' ', clean) clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper') if not os.path.exists(piper_path): print(f"[Error] Piper executable not found at {piper_path}") mic_enabled = True return try: # Generate Piper raw PCM with Timer("Piper inference"): piper_proc = subprocess.Popen( [piper_path, '--model', VOICE_MODEL, '--output_raw'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) tts_pcm, _ = piper_proc.communicate(input=clean.encode()) # Convert raw PCM to WAV for playback wav_io = io.BytesIO() with wave.open(wav_io, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) wf.writeframes(tts_pcm) wav_io.seek(0) wf = wave.open(wav_io, 'rb') # Playback with Timer("Playback"): pa = pyaudio.PyAudio() stream = pa.open( format=pa.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=AUDIO_OUTPUT_DEVICE_INDEX ) data = wf.readframes(CHUNK) while data: stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() pa.terminate() wf.close() except Exception as e: print(f"[Error] TTS playback failed: {e}") finally: mic_enabled = True time.sleep(0.3) # ------------------- PROCESSING LOOP ------------------- def processing_loop(): try: model = Model(MODEL_PATH) print("[Debug] Vosk model loaded successfully") except Exception as e: print(f"[Error] Failed to load Vosk model: {e}") print(f"[Info] Model path: {MODEL_PATH}") return rec = KaldiRecognizer(model, 16000) MAX_DEBUG_LEN = 200 LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"} while True: try: data = audio_queue.get() if rec.AcceptWaveform(data): start = time.time() r = json.loads(rec.Result()) elapsed_ms = int((time.time() - start) * 1000) user = r.get('text', '').strip() if user: print(f"[Timing] STT parse: {elapsed_ms} ms") print("User:", user) if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES: print("[Debug] Ignored low-effort utterance.") rec = KaldiRecognizer(model, 16000) continue messages.append({"role": "user", "content": user}) resp_text = query_glm() if resp_text: clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ') if len(clean_debug_text) > MAX_DEBUG_LEN: clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...' print('Assistant:', clean_debug_text) messages.append({"role": "assistant", "content": clean_debug_text}) play_response(resp_text) else: print('[Debug] Empty response, skipping TTS.') rec = KaldiRecognizer(model, 16000) except Exception as e: print(f"[Error] Processing loop error: {e}") time.sleep(1) # ------------------- MAIN ------------------- if __name__ == '__main__': try: pa, stream = start_stream() t = threading.Thread(target=processing_loop, daemon=True) t.start() print("[Debug] Voice assistant started. Press Ctrl+C to exit.") while stream.is_active(): time.sleep(0.1) except KeyboardInterrupt: print("[Debug] Shutting down...") stream.stop_stream() stream.close() pa.terminate() except Exception as e: print(f"[Error] Main loop error: {e}") stream.stop_stream() stream.close() pa.terminate()