#!/usr/bin/env python3 """ Voice Assistant: Real-Time Voice Chat This app runs on a Raspberry Pi (or Linux desktop) and creates a low-latency, full-duplex voice interaction with an AI character. It uses local speech recognition (Vosk), local text-to-speech synthesis (Piper), and a locally hosted large language model via Ollama. Key Features: - Wake-free, continuous voice recognition with real-time transcription - LLM-driven responses streamed from a selected local model (e.g., LLaMA, Qwen, Gemma) - Audio response synthesis with a gruff custom voice using ONNX-based Piper models - Optional noise mixing and filtering via SoX - System volume control via ALSA - Modular and responsive design suitable for low-latency, character-driven agents Ideal for embedded voice AI demos, cosplay companions, or standalone AI characters. Copyright: M15.ai License: MIT """ import io import json import os import queue import re import subprocess import threading import time import wave import numpy as np import pyaudio import requests import soxr from pydub import AudioSegment from vosk import KaldiRecognizer, Model # ------------------- TIMING UTILITY ------------------- class Timer: def __init__(self, label): self.label = label self.enabled = True def __enter__(self): self.start = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.enabled: elapsed_ms = (time.time() - self.start) * 1000 print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms") def disable(self): self.enabled = False # ------------------- FUNCTIONS ------------------- def get_input_device_index(preferred_name="Shure MVX2U"): pa = pyaudio.PyAudio() index = None for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if preferred_name.lower() in info['name'].lower() and info['maxInputChannels'] > 0: print(f"[Debug] Selected input device {i}: {info['name']}") print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz") index = i break pa.terminate() if index is None: print("[Warning] Preferred mic not found. Falling back to default.") return index def get_output_device_index(preferred_name): pa = pyaudio.PyAudio() for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if preferred_name.lower() in info['name'].lower() and info['maxOutputChannels'] > 0: print(f"[Debug] Selected output device {i}: {info['name']}") return i print("[Warning] Preferred output device not found. Using default index 0.") return 0 def parse_card_number(device_str): """ Extract ALSA card number from string like 'plughw:3,0' """ try: return int(device_str.split(":")[1].split(",")[0]) except Exception as e: print(f"[Warning] Could not parse card number from {device_str}: {e}") return 0 # fallback def list_input_devices(): pa = pyaudio.PyAudio() print("[Debug] Available input devices:") for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info['maxInputChannels'] > 0: print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)") pa.terminate() def resample_audio(data, orig_rate=48000, target_rate=16000): # Convert byte string to numpy array audio_np = np.frombuffer(data, dtype=np.int16) # Resample using soxr resampled_np = soxr.resample(audio_np, orig_rate, target_rate) # Convert back to bytes return resampled_np.astype(np.int16).tobytes() def set_output_volume(volume_level, card_id=3): """ Set output volume using ALSA 'Speaker' control on specified card. volume_level: 1–10 (user scale) card_id: ALSA card number (from aplay -l) """ percent = max(1, min(volume_level, 10)) * 10 # map to 10–100% try: subprocess.run( ['amixer', '-c', str(card_id), 'sset', 'Speaker', f'{percent}%'], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print(f"[Debug] Volume set to {percent}% on card {card_id}") except Exception as e: print(f"[Warning] Volume control failed on card {card_id}: {e}") # ------------------- PATHS ------------------- CONFIG_PATH = os.path.expanduser("va_config.json") BASE_DIR = os.path.dirname(__file__) MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model') CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions' AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' # Replace with your actual token # ------------------- CONFIG FILE LOADING ------------------- DEFAULT_CONFIG = { "volume": 9, "mic_name": "Plantronics", "audio_output_device": "Plantronics", "model_name": "qwen2.5:0.5b", "voice": "en_US-kathleen-low.onnx", "enable_audio_processing": False, "history_length": 4, "system_prompt": "You are a helpful assistant." } def load_config(): # Load config from system file or fall back to defaults if os.path.isfile(CONFIG_PATH): try: with open(CONFIG_PATH, 'r') as f: user_config = json.load(f) return {**DEFAULT_CONFIG, **user_config} # merge with defaults except Exception as e: print(f"[Warning] Failed to load system config: {e}") print("[Debug] Using default config.") return DEFAULT_CONFIG config = load_config() # Apply loaded config values VOLUME = config["volume"] MIC_NAME = config["mic_name"] AUDIO_OUTPUT_DEVICE = config["audio_output_device"] AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"]) OUTPUT_CARD = parse_card_number(AUDIO_OUTPUT_DEVICE) MODEL_NAME = config["model_name"] VOICE_MODEL = os.path.join("voices", config["voice"]) ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"] HISTORY_LENGTH = config["history_length"] # Set system volume set_output_volume(VOLUME, OUTPUT_CARD) # Setup messages with system prompt messages = [{"role": "system", "content": config["system_prompt"]}] list_input_devices() RATE = 48000 CHUNK = 1024 CHANNELS = 1 mic_enabled = True DEVICE_INDEX = get_input_device_index() # SOUND EFFECTS NOISE_LEVEL = '0.04' BANDPASS_HIGHPASS = '300' BANDPASS_LOWPASS = '800' # ------------------- VOICE MODEL ------------------- VOICE_MODELS_DIR = os.path.join(BASE_DIR, 'voices') if not os.path.isdir(VOICE_MODELS_DIR): os.makedirs(VOICE_MODELS_DIR) VOICE_MODEL = os.path.join(VOICE_MODELS_DIR, config["voice"]) print('[Debug] Available Piper voices:') for f in os.listdir(VOICE_MODELS_DIR): if f.endswith('.onnx'): print(' ', f) print(f'[Debug] Using VOICE_MODEL: {VOICE_MODEL}') print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}, mic={MIC_NAME}") # ------------------- CONVERSATION STATE ------------------- audio_queue = queue.Queue() # Audio callback form Shure def audio_callback(in_data, frame_count, time_info, status): global mic_enabled if not mic_enabled: return (None, pyaudio.paContinue) resampled_data = resample_audio(in_data, orig_rate=48000, target_rate=16000) audio_queue.put(resampled_data) return (None, pyaudio.paContinue) # ------------------- STREAM SETUP ------------------- def start_stream(): pa = pyaudio.PyAudio() stream = pa.open( rate=RATE, format=pyaudio.paInt16, channels=CHANNELS, input=True, input_device_index=DEVICE_INDEX, frames_per_buffer=CHUNK, stream_callback=audio_callback ) stream.start_stream() print(f'[Debug] Stream @ {RATE}Hz') return pa, stream # ------------------- QUERY OLLAMA CHAT ENDPOINT ------------------- def query_glm(): headers = { 'Authorization': f'Bearer {AUTH_TOKEN}', 'Content-Type': 'application/json' } payload = { "model": "glm-4.5", "messages": [messages[0]] + messages[-HISTORY_LENGTH:], # force system prompt at top "temperature": 0.6, "max_tokens": 1024, "stream": False } with Timer("Inference"): # measure inference latency resp = requests.post(CHAT_URL, json=payload, headers=headers) if resp.status_code != 200: print(f'[Error] GLM API failed with status {resp.status_code}: {resp.text}') return '' data = resp.json() # Extract assistant message reply = '' if 'choices' in data and len(data['choices']) > 0: choice = data['choices'][0] if 'message' in choice and 'content' in choice['message']: reply = choice['message']['content'].strip() return reply # ------------------- TTS & DEGRADATION ------------------- import tempfile def play_response(text): import io import tempfile # Mute the mic during playback to avoid feedback loop global mic_enabled mic_enabled = False # 🔇 mute mic # clean the response clean = re.sub(r"[\*]+", '', text) # remove asterisks clean = re.sub(r"\(.*?\)", '', clean) # remove (stage directions) clean = re.sub(r"<.*?>", '', clean) # remove HTML-style tags clean = clean.replace('\n', ' ').strip() # normalize newlines clean = re.sub(r'\s+', ' ', clean) # collapse whitespace clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) # remove emojis piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper') # 1. Generate Piper raw PCM with Timer("Piper inference"): piper_proc = subprocess.Popen( [piper_path, '--model', VOICE_MODEL, '--output_raw'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) tts_pcm, _ = piper_proc.communicate(input=clean.encode()) if ENABLE_AUDIO_PROCESSING: # SoX timing consolidation sox_start = time.time() # 2. Convert raw PCM to WAV pcm_to_wav = subprocess.Popen( ['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16', '-e', 'signed-integer', '-', '-t', 'wav', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm) # 3. Estimate duration duration_sec = len(tts_pcm) / (RATE * 2) # 4. Generate white noise WAV bytes noise_bytes = subprocess.check_output([ 'sox', '-n', '-r', '16000', '-c', str(CHANNELS), '-b', '16', '-e', 'signed-integer', '-t', 'wav', '-', 'synth', str(duration_sec), 'whitenoise', 'vol', NOISE_LEVEL ], stderr=subprocess.DEVNULL) # 5. Write both to temp files & mix with tempfile.NamedTemporaryFile(suffix='.wav') as tts_file, tempfile.NamedTemporaryFile(suffix='.wav') as noise_file: tts_file.write(tts_wav_16k) noise_file.write(noise_bytes) tts_file.flush() noise_file.flush() mixer = subprocess.Popen( ['sox', '-m', tts_file.name, noise_file.name, '-t', 'wav', '-'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) mixed_bytes, _ = mixer.communicate() # 6. Apply filter filter_proc = subprocess.Popen( #['sox', '-t', 'wav', '-', '-t', 'wav', '-', 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS], ['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-', 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) final_bytes, _ = filter_proc.communicate(input=mixed_bytes) sox_elapsed = (time.time() - sox_start) * 1000 print(f"[Timing] SoX (total): {int(sox_elapsed)} ms") else: # No FX: just convert raw PCM to WAV pcm_to_wav = subprocess.Popen( ['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16', '-e', 'signed-integer', '-', '-t', 'wav', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm) resample_proc = subprocess.Popen( ['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) final_bytes, _ = resample_proc.communicate(input=tts_wav_16k) # 7. Playback with Timer("Playback"): try: wf = wave.open(io.BytesIO(final_bytes), 'rb') pa = pyaudio.PyAudio() stream = pa.open( format=pa.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, output_device_index=AUDIO_OUTPUT_DEVICE_INDEX ) data = wf.readframes(CHUNK) while data: stream.write(data) data = wf.readframes(CHUNK) stream.stop_stream() stream.close() pa.terminate() wf.close() except wave.Error as e: print(f"[Error] Could not open final WAV: {e}") finally: mic_enabled = True # 🔊 unmute mic time.sleep(0.3) # optional: small cooldown # ------------------- PROCESSING LOOP ------------------- def processing_loop(): model = Model(MODEL_PATH) #rec = KaldiRecognizer(model, RATE) rec = KaldiRecognizer(model, 16000) MAX_DEBUG_LEN = 200 # optional: limit length of debug output LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"} while True: data = audio_queue.get() if rec.AcceptWaveform(data): start = time.time() r = json.loads(rec.Result()) elapsed_ms = int((time.time() - start) * 1000) user = r.get('text', '').strip() if user: print(f"[Timing] STT parse: {elapsed_ms} ms") print("User:", user) if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES: print("[Debug] Ignored low-effort utterance.") rec = KaldiRecognizer(model, 16000) continue # Skip LLM response + TTS for accidental noise messages.append({"role": "user", "content": user}) # Generate assistant response resp_text = query_glm() if resp_text: # Clean debug print (remove newlines and carriage returns) clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ') if len(clean_debug_text) > MAX_DEBUG_LEN: clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...' print('Assistant:', clean_debug_text) messages.append({"role": "assistant", "content": clean_debug_text}) # TTS generation + playback play_response(resp_text) else: print('[Debug] Empty response, skipping TTS.') # Reset recognizer after each full interaction rec = KaldiRecognizer(model, 16000) # ------------------- MAIN ------------------- if __name__ == '__main__': pa, stream = start_stream() t = threading.Thread(target=processing_loop, daemon=True) t.start() try: while stream.is_active(): time.sleep(0.1) except KeyboardInterrupt: stream.stop_stream(); stream.close(); pa.terminate()