Add files via upload

This commit is contained in:
m15-ai 2025-05-13 18:15:47 -05:00 committed by GitHub
parent adef09c069
commit ec7f2786ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

485
voice_assistant.py Normal file
View File

@ -0,0 +1,485 @@
#!/usr/bin/env python3
"""
Voice Assistant: Real-Time Voice Chat
This app runs on a Raspberry Pi (or Linux desktop) and creates a low-latency, full-duplex voice interaction
with an AI character. It uses local speech recognition
(Vosk), local text-to-speech synthesis (Piper), and a locally hosted large language model via Ollama.
Key Features:
- Wake-free, continuous voice recognition with real-time transcription
- LLM-driven responses streamed from a selected local model (e.g., LLaMA, Qwen, Gemma)
- Audio response synthesis with a gruff custom voice using ONNX-based Piper models
- Optional noise mixing and filtering via SoX
- System volume control via ALSA
- Modular and responsive design suitable for low-latency, character-driven agents
Ideal for embedded voice AI demos, cosplay companions, or standalone AI characters.
Copyright: M15.ai
License: MIT
"""
import os
import json
import queue
import threading
import time
import wave
import io
import re
import subprocess
from vosk import Model, KaldiRecognizer
import pyaudio
import requests
from pydub import AudioSegment
import soxr
import numpy as np
# ------------------- TIMING UTILITY -------------------
class Timer:
def __init__(self, label):
self.label = label
self.enabled = True
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enabled:
elapsed_ms = (time.time() - self.start) * 1000
print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms")
def disable(self):
self.enabled = False
# ------------------- FUNCTIONS -------------------
def get_input_device_index(preferred_name="Shure MVX2U"):
pa = pyaudio.PyAudio()
index = None
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if preferred_name.lower() in info['name'].lower() and info['maxInputChannels'] > 0:
print(f"[Debug] Selected input device {i}: {info['name']}")
print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz")
index = i
break
pa.terminate()
if index is None:
print("[Warning] Preferred mic not found. Falling back to default.")
return index
def get_output_device_index(preferred_name):
pa = pyaudio.PyAudio()
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if preferred_name.lower() in info['name'].lower() and info['maxOutputChannels'] > 0:
print(f"[Debug] Selected output device {i}: {info['name']}")
return i
print("[Warning] Preferred output device not found. Using default index 0.")
return 0
def parse_card_number(device_str):
"""
Extract ALSA card number from string like 'plughw:3,0'
"""
try:
return int(device_str.split(":")[1].split(",")[0])
except Exception as e:
print(f"[Warning] Could not parse card number from {device_str}: {e}")
return 0 # fallback
def list_input_devices():
pa = pyaudio.PyAudio()
print("[Debug] Available input devices:")
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)")
pa.terminate()
def resample_audio(data, orig_rate=48000, target_rate=16000):
# Convert byte string to numpy array
audio_np = np.frombuffer(data, dtype=np.int16)
# Resample using soxr
resampled_np = soxr.resample(audio_np, orig_rate, target_rate)
# Convert back to bytes
return resampled_np.astype(np.int16).tobytes()
def set_output_volume(volume_level, card_id=3):
"""
Set output volume using ALSA 'Speaker' control on specified card.
volume_level: 110 (user scale)
card_id: ALSA card number (from aplay -l)
"""
percent = max(1, min(volume_level, 10)) * 10 # map to 10100%
try:
subprocess.run(
['amixer', '-c', str(card_id), 'sset', 'Speaker', f'{percent}%'],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print(f"[Debug] Volume set to {percent}% on card {card_id}")
except Exception as e:
print(f"[Warning] Volume control failed on card {card_id}: {e}")
# ------------------- PATHS -------------------
CONFIG_PATH = os.path.expanduser("va_config.json")
BASE_DIR = os.path.dirname(__file__)
MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model')
CHAT_URL = 'http://localhost:11434/api/chat'
# ------------------- CONFIG FILE LOADING -------------------
DEFAULT_CONFIG = {
"volume": 9,
"mic_name": "Plantronics",
"audio_output_device": "Plantronics",
"model_name": "qwen2.5:0.5b",
"voice": "en_US-kathleen-low.onnx",
"enable_audio_processing": False,
"history_length": 4,
"system_prompt": "You are a helpful assistant."
}
def load_config():
# Load config from system file or fall back to defaults
if os.path.isfile(CONFIG_PATH):
try:
with open(CONFIG_PATH, 'r') as f:
user_config = json.load(f)
return {**DEFAULT_CONFIG, **user_config} # merge with defaults
except Exception as e:
print(f"[Warning] Failed to load system config: {e}")
print("[Debug] Using default config.")
return DEFAULT_CONFIG
config = load_config()
# Apply loaded config values
VOLUME = config["volume"]
MIC_NAME = config["mic_name"]
# Choose your audio output device
# "plughw:2,0" for Plantronics headset
# "plughw:3,0" for USB PnP Sound Device
AUDIO_OUTPUT_DEVICE = config["audio_output_device"]
AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"])
OUTPUT_CARD = parse_card_number(AUDIO_OUTPUT_DEVICE)
MODEL_NAME = config["model_name"]
VOICE_MODEL = os.path.join("voices", config["voice"])
ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"]
HISTORY_LENGTH = config["history_length"]
# Set system volume
set_output_volume(VOLUME, OUTPUT_CARD)
# Setup messages with system prompt
messages = [{"role": "system", "content": config["system_prompt"]}]
list_input_devices()
RATE = 48000
CHUNK = 1024
CHANNELS = 1
mic_enabled = True
DEVICE_INDEX = get_input_device_index()
# ------------------- OLLAMA MODEL SELECTION -------------------
# Uncomment the model you wish to use:
# MODEL_NAME = "qwen2.5:0.5b"
# MODEL_NAME = "qwen3:0.6b"
# MODEL_NAME = "tinyllama"
# MODEL_NAME = "gemma3:1b"
NOISE_LEVEL = '0.04'
BANDPASS_HIGHPASS = '300'
BANDPASS_LOWPASS = '800'
# ------------------- VOICE MODEL -------------------
VOICE_MODELS_DIR = os.path.join(BASE_DIR, 'voices')
if not os.path.isdir(VOICE_MODELS_DIR):
os.makedirs(VOICE_MODELS_DIR)
VOICE_MODEL = os.path.join(VOICE_MODELS_DIR, config["voice"])
print('[Debug] Available Piper voices:')
for f in os.listdir(VOICE_MODELS_DIR):
if f.endswith('.onnx'):
print(' ', f)
print(f'[Debug] Using VOICE_MODEL: {VOICE_MODEL}')
print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}, mic={MIC_NAME}")
# ------------------- CONVERSATION STATE -------------------
audio_queue = queue.Queue()
# Audio callback for plantronics / generic mics
#def audio_callback(in_data, frame_count, time_info, status):
# audio_queue.put(in_data)
# return (None, pyaudio.paContinue)
# Audio callback form Shure
def audio_callback(in_data, frame_count, time_info, status):
global mic_enabled
if not mic_enabled:
return (None, pyaudio.paContinue)
resampled_data = resample_audio(in_data, orig_rate=48000, target_rate=16000)
audio_queue.put(resampled_data)
return (None, pyaudio.paContinue)
# ------------------- STREAM SETUP -------------------
def start_stream():
pa = pyaudio.PyAudio()
#print('[Debug] Input devices:')
#for i in range(pa.get_device_count()):
# d = pa.get_device_info_by_index(i)
# if d['maxInputChannels'] > 0:
# print(f" {i}: {d['name']} ({d['maxInputChannels']}ch @ {d['defaultSampleRate']}Hz)")
#print(f'[Debug] Using DEVICE_INDEX={DEVICE_INDEX}')
stream = pa.open(
rate=RATE,
format=pyaudio.paInt16,
channels=CHANNELS,
input=True,
input_device_index=DEVICE_INDEX,
frames_per_buffer=CHUNK,
stream_callback=audio_callback
)
stream.start_stream()
print(f'[Debug] Stream @ {RATE}Hz')
return pa, stream
# ------------------- QUERY OLLAMA CHAT ENDPOINT -------------------
def query_ollama():
#payload = {
# "model": MODEL_NAME,
# "messages": messages,
# "stream": False}
payload = {
"model": MODEL_NAME,
"messages": [messages[0]] + messages[-HISTORY_LENGTH:], # force system prompt at top
"stream": False}
#payload = {
# "model": MODEL_NAME,
# "messages": messages[-(HISTORY_LENGTH + 1):],
# "stream": False}
#print('[Debug] Sending messages to Ollama chat:')
#for m in messages[-(HISTORY_LENGTH+1):]:
# print(f" {m['role']}: {m['content']}")
with Timer("Inference"): # measure inference latency
resp = requests.post(CHAT_URL, json=payload)
#print(f'[Debug] Ollama status: {resp.status_code}')
data = resp.json()
# Extract assistant message
reply = ''
if 'message' in data and 'content' in data['message']:
reply = data['message']['content'].strip()
#print('[Debug] Reply:', reply)
return reply
# ------------------- TTS & DEGRADATION -------------------
import tempfile
def play_response(text):
import io
import tempfile
# Mute the mic during playback to avoid feedback loop
global mic_enabled
mic_enabled = False # 🔇 mute mic
# clean the response
clean = re.sub(r"[\*]+", '', text) # remove asterisks
clean = re.sub(r"\(.*?\)", '', clean) # remove (stage directions)
clean = re.sub(r"<.*?>", '', clean) # remove HTML-style tags
clean = clean.replace('\n', ' ').strip() # normalize newlines
clean = re.sub(r'\s+', ' ', clean) # collapse whitespace
clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) # remove emojis
piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper')
# 1. Generate Piper raw PCM
with Timer("Piper inference"):
piper_proc = subprocess.Popen(
[piper_path, '--model', VOICE_MODEL, '--output_raw'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_pcm, _ = piper_proc.communicate(input=clean.encode())
if ENABLE_AUDIO_PROCESSING:
# SoX timing consolidation
sox_start = time.time()
# 2. Convert raw PCM to WAV
pcm_to_wav = subprocess.Popen(
['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16',
'-e', 'signed-integer', '-', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm)
# 3. Estimate duration
duration_sec = len(tts_pcm) / (RATE * 2)
# 4. Generate white noise WAV bytes
noise_bytes = subprocess.check_output([
'sox', '-n',
'-r', '16000',
'-c', str(CHANNELS),
'-b', '16',
'-e', 'signed-integer',
'-t', 'wav', '-',
'synth', str(duration_sec),
'whitenoise', 'vol', NOISE_LEVEL
], stderr=subprocess.DEVNULL)
# 5. Write both to temp files & mix
with tempfile.NamedTemporaryFile(suffix='.wav') as tts_file, tempfile.NamedTemporaryFile(suffix='.wav') as noise_file:
tts_file.write(tts_wav_16k)
noise_file.write(noise_bytes)
tts_file.flush()
noise_file.flush()
mixer = subprocess.Popen(
['sox', '-m', tts_file.name, noise_file.name, '-t', 'wav', '-'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
mixed_bytes, _ = mixer.communicate()
# 6. Apply filter
filter_proc = subprocess.Popen(
#['sox', '-t', 'wav', '-', '-t', 'wav', '-', 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS],
['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-',
'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
final_bytes, _ = filter_proc.communicate(input=mixed_bytes)
sox_elapsed = (time.time() - sox_start) * 1000
print(f"[Timing] SoX (total): {int(sox_elapsed)} ms")
else:
# No FX: just convert raw PCM to WAV
pcm_to_wav = subprocess.Popen(
['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16',
'-e', 'signed-integer', '-', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm)
resample_proc = subprocess.Popen(
['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
final_bytes, _ = resample_proc.communicate(input=tts_wav_16k)
# 7. Playback
with Timer("Playback"):
try:
wf = wave.open(io.BytesIO(final_bytes), 'rb')
pa = pyaudio.PyAudio()
stream = pa.open(
format=pa.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE_INDEX
)
data = wf.readframes(CHUNK)
while data:
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
pa.terminate()
wf.close()
except wave.Error as e:
print(f"[Error] Could not open final WAV: {e}")
finally:
mic_enabled = True # 🔊 unmute mic
time.sleep(0.3) # optional: small cooldown
# ------------------- PROCESSING LOOP -------------------
def processing_loop():
model = Model(MODEL_PATH)
#rec = KaldiRecognizer(model, RATE)
rec = KaldiRecognizer(model, 16000)
MAX_DEBUG_LEN = 200 # optional: limit length of debug output
LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"}
while True:
data = audio_queue.get()
if rec.AcceptWaveform(data):
start = time.time()
r = json.loads(rec.Result())
elapsed_ms = int((time.time() - start) * 1000)
user = r.get('text', '').strip()
if user:
print(f"[Timing] STT parse: {elapsed_ms} ms")
print("User:", user)
if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES:
print("[Debug] Ignored low-effort utterance.")
rec = KaldiRecognizer(model, 16000)
continue # Skip LLM response + TTS for accidental noise
messages.append({"role": "user", "content": user})
# Generate assistant response
resp_text = query_ollama()
if resp_text:
# Clean debug print (remove newlines and carriage returns)
clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ')
if len(clean_debug_text) > MAX_DEBUG_LEN:
clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...'
print('Assistant:', clean_debug_text)
messages.append({"role": "assistant", "content": clean_debug_text})
# TTS generation + playback
play_response(resp_text)
else:
print('[Debug] Empty response, skipping TTS.')
# Reset recognizer after each full interaction
#rec = KaldiRecognizer(model, RATE)
rec = KaldiRecognizer(model, 16000)
# ------------------- MAIN -------------------
if __name__ == '__main__':
pa, stream = start_stream()
t = threading.Thread(target=processing_loop, daemon=True)
t.start()
try:
while stream.is_active():
time.sleep(0.1)
except KeyboardInterrupt:
stream.stop_stream(); stream.close(); pa.terminate()