Local-Voice/voice_assistant.py
2025-09-18 19:10:26 +08:00

470 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Voice Assistant: Real-Time Voice Chat
This app runs on a Raspberry Pi (or Linux desktop) and creates a low-latency, full-duplex voice interaction
with an AI character. It uses local speech recognition
(Vosk), local text-to-speech synthesis (Piper), and a locally hosted large language model via Ollama.
Key Features:
- Wake-free, continuous voice recognition with real-time transcription
- LLM-driven responses streamed from a selected local model (e.g., LLaMA, Qwen, Gemma)
- Audio response synthesis with a gruff custom voice using ONNX-based Piper models
- Optional noise mixing and filtering via SoX
- System volume control via ALSA
- Modular and responsive design suitable for low-latency, character-driven agents
Ideal for embedded voice AI demos, cosplay companions, or standalone AI characters.
Copyright: M15.ai
License: MIT
"""
import io
import json
import os
import queue
import re
import subprocess
import threading
import time
import wave
import numpy as np
import pyaudio
import requests
import soxr
from pydub import AudioSegment
from vosk import KaldiRecognizer, Model
# ------------------- TIMING UTILITY -------------------
class Timer:
def __init__(self, label):
self.label = label
self.enabled = True
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enabled:
elapsed_ms = (time.time() - self.start) * 1000
print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms")
def disable(self):
self.enabled = False
# ------------------- FUNCTIONS -------------------
def get_input_device_index(preferred_name="Shure MVX2U"):
pa = pyaudio.PyAudio()
index = None
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if preferred_name.lower() in info['name'].lower() and info['maxInputChannels'] > 0:
print(f"[Debug] Selected input device {i}: {info['name']}")
print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz")
index = i
break
pa.terminate()
if index is None:
print("[Warning] Preferred mic not found. Falling back to default.")
return index
def get_output_device_index(preferred_name):
pa = pyaudio.PyAudio()
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if preferred_name.lower() in info['name'].lower() and info['maxOutputChannels'] > 0:
print(f"[Debug] Selected output device {i}: {info['name']}")
return i
print("[Warning] Preferred output device not found. Using default index 0.")
return 0
def parse_card_number(device_str):
"""
Extract ALSA card number from string like 'plughw:3,0'
"""
try:
return int(device_str.split(":")[1].split(",")[0])
except Exception as e:
print(f"[Warning] Could not parse card number from {device_str}: {e}")
return 0 # fallback
def list_input_devices():
pa = pyaudio.PyAudio()
print("[Debug] Available input devices:")
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)")
pa.terminate()
def resample_audio(data, orig_rate=48000, target_rate=16000):
# Convert byte string to numpy array
audio_np = np.frombuffer(data, dtype=np.int16)
# Resample using soxr
resampled_np = soxr.resample(audio_np, orig_rate, target_rate)
# Convert back to bytes
return resampled_np.astype(np.int16).tobytes()
def set_output_volume(volume_level, card_id=3):
"""
Set output volume using ALSA 'Speaker' control on specified card.
volume_level: 110 (user scale)
card_id: ALSA card number (from aplay -l)
"""
percent = max(1, min(volume_level, 10)) * 10 # map to 10100%
try:
subprocess.run(
['amixer', '-c', str(card_id), 'sset', 'Speaker', f'{percent}%'],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print(f"[Debug] Volume set to {percent}% on card {card_id}")
except Exception as e:
print(f"[Warning] Volume control failed on card {card_id}: {e}")
# ------------------- PATHS -------------------
CONFIG_PATH = os.path.expanduser("va_config.json")
BASE_DIR = os.path.dirname(__file__)
MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model')
CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions'
AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' # Replace with your actual token
# ------------------- CONFIG FILE LOADING -------------------
DEFAULT_CONFIG = {
"volume": 9,
"mic_name": "Plantronics",
"audio_output_device": "Plantronics",
"model_name": "qwen2.5:0.5b",
"voice": "en_US-kathleen-low.onnx",
"enable_audio_processing": False,
"history_length": 4,
"system_prompt": "You are a helpful assistant."
}
def load_config():
# Load config from system file or fall back to defaults
if os.path.isfile(CONFIG_PATH):
try:
with open(CONFIG_PATH, 'r') as f:
user_config = json.load(f)
return {**DEFAULT_CONFIG, **user_config} # merge with defaults
except Exception as e:
print(f"[Warning] Failed to load system config: {e}")
print("[Debug] Using default config.")
return DEFAULT_CONFIG
config = load_config()
# Apply loaded config values
VOLUME = config["volume"]
MIC_NAME = config["mic_name"]
AUDIO_OUTPUT_DEVICE = config["audio_output_device"]
AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"])
OUTPUT_CARD = parse_card_number(AUDIO_OUTPUT_DEVICE)
MODEL_NAME = config["model_name"]
VOICE_MODEL = os.path.join("voices", config["voice"])
ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"]
HISTORY_LENGTH = config["history_length"]
# Set system volume
set_output_volume(VOLUME, OUTPUT_CARD)
# Setup messages with system prompt
messages = [{"role": "system", "content": config["system_prompt"]}]
list_input_devices()
RATE = 48000
CHUNK = 1024
CHANNELS = 1
mic_enabled = True
DEVICE_INDEX = get_input_device_index()
# SOUND EFFECTS
NOISE_LEVEL = '0.04'
BANDPASS_HIGHPASS = '300'
BANDPASS_LOWPASS = '800'
# ------------------- VOICE MODEL -------------------
VOICE_MODELS_DIR = os.path.join(BASE_DIR, 'voices')
if not os.path.isdir(VOICE_MODELS_DIR):
os.makedirs(VOICE_MODELS_DIR)
VOICE_MODEL = os.path.join(VOICE_MODELS_DIR, config["voice"])
print('[Debug] Available Piper voices:')
for f in os.listdir(VOICE_MODELS_DIR):
if f.endswith('.onnx'):
print(' ', f)
print(f'[Debug] Using VOICE_MODEL: {VOICE_MODEL}')
print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}, mic={MIC_NAME}")
# ------------------- CONVERSATION STATE -------------------
audio_queue = queue.Queue()
# Audio callback form Shure
def audio_callback(in_data, frame_count, time_info, status):
global mic_enabled
if not mic_enabled:
return (None, pyaudio.paContinue)
resampled_data = resample_audio(in_data, orig_rate=48000, target_rate=16000)
audio_queue.put(resampled_data)
return (None, pyaudio.paContinue)
# ------------------- STREAM SETUP -------------------
def start_stream():
pa = pyaudio.PyAudio()
stream = pa.open(
rate=RATE,
format=pyaudio.paInt16,
channels=CHANNELS,
input=True,
input_device_index=DEVICE_INDEX,
frames_per_buffer=CHUNK,
stream_callback=audio_callback
)
stream.start_stream()
print(f'[Debug] Stream @ {RATE}Hz')
return pa, stream
# ------------------- QUERY OLLAMA CHAT ENDPOINT -------------------
def query_glm():
headers = {
'Authorization': f'Bearer {AUTH_TOKEN}',
'Content-Type': 'application/json'
}
payload = {
"model": "glm-4.5",
"messages": [messages[0]] + messages[-HISTORY_LENGTH:], # force system prompt at top
"temperature": 0.6,
"max_tokens": 1024,
"stream": False
}
with Timer("Inference"): # measure inference latency
resp = requests.post(CHAT_URL, json=payload, headers=headers)
if resp.status_code != 200:
print(f'[Error] GLM API failed with status {resp.status_code}: {resp.text}')
return ''
data = resp.json()
# Extract assistant message
reply = ''
if 'choices' in data and len(data['choices']) > 0:
choice = data['choices'][0]
if 'message' in choice and 'content' in choice['message']:
reply = choice['message']['content'].strip()
return reply
# ------------------- TTS & DEGRADATION -------------------
import tempfile
def play_response(text):
import io
import tempfile
# Mute the mic during playback to avoid feedback loop
global mic_enabled
mic_enabled = False # 🔇 mute mic
# clean the response
clean = re.sub(r"[\*]+", '', text) # remove asterisks
clean = re.sub(r"\(.*?\)", '', clean) # remove (stage directions)
clean = re.sub(r"<.*?>", '', clean) # remove HTML-style tags
clean = clean.replace('\n', ' ').strip() # normalize newlines
clean = re.sub(r'\s+', ' ', clean) # collapse whitespace
clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) # remove emojis
piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper')
# 1. Generate Piper raw PCM
with Timer("Piper inference"):
piper_proc = subprocess.Popen(
[piper_path, '--model', VOICE_MODEL, '--output_raw'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_pcm, _ = piper_proc.communicate(input=clean.encode())
if ENABLE_AUDIO_PROCESSING:
# SoX timing consolidation
sox_start = time.time()
# 2. Convert raw PCM to WAV
pcm_to_wav = subprocess.Popen(
['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16',
'-e', 'signed-integer', '-', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm)
# 3. Estimate duration
duration_sec = len(tts_pcm) / (RATE * 2)
# 4. Generate white noise WAV bytes
noise_bytes = subprocess.check_output([
'sox', '-n',
'-r', '16000',
'-c', str(CHANNELS),
'-b', '16',
'-e', 'signed-integer',
'-t', 'wav', '-',
'synth', str(duration_sec),
'whitenoise', 'vol', NOISE_LEVEL
], stderr=subprocess.DEVNULL)
# 5. Write both to temp files & mix
with tempfile.NamedTemporaryFile(suffix='.wav') as tts_file, tempfile.NamedTemporaryFile(suffix='.wav') as noise_file:
tts_file.write(tts_wav_16k)
noise_file.write(noise_bytes)
tts_file.flush()
noise_file.flush()
mixer = subprocess.Popen(
['sox', '-m', tts_file.name, noise_file.name, '-t', 'wav', '-'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
mixed_bytes, _ = mixer.communicate()
# 6. Apply filter
filter_proc = subprocess.Popen(
#['sox', '-t', 'wav', '-', '-t', 'wav', '-', 'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS],
['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-',
'highpass', BANDPASS_HIGHPASS, 'lowpass', BANDPASS_LOWPASS],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
final_bytes, _ = filter_proc.communicate(input=mixed_bytes)
sox_elapsed = (time.time() - sox_start) * 1000
print(f"[Timing] SoX (total): {int(sox_elapsed)} ms")
else:
# No FX: just convert raw PCM to WAV
pcm_to_wav = subprocess.Popen(
['sox', '-t', 'raw', '-r', '16000', '-c', str(CHANNELS), '-b', '16',
'-e', 'signed-integer', '-', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tts_wav_16k, _ = pcm_to_wav.communicate(input=tts_pcm)
resample_proc = subprocess.Popen(
['sox', '-t', 'wav', '-', '-r', '48000', '-t', 'wav', '-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
final_bytes, _ = resample_proc.communicate(input=tts_wav_16k)
# 7. Playback
with Timer("Playback"):
try:
wf = wave.open(io.BytesIO(final_bytes), 'rb')
pa = pyaudio.PyAudio()
stream = pa.open(
format=pa.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE_INDEX
)
data = wf.readframes(CHUNK)
while data:
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
pa.terminate()
wf.close()
except wave.Error as e:
print(f"[Error] Could not open final WAV: {e}")
finally:
mic_enabled = True # 🔊 unmute mic
time.sleep(0.3) # optional: small cooldown
# ------------------- PROCESSING LOOP -------------------
def processing_loop():
model = Model(MODEL_PATH)
#rec = KaldiRecognizer(model, RATE)
rec = KaldiRecognizer(model, 16000)
MAX_DEBUG_LEN = 200 # optional: limit length of debug output
LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"}
while True:
data = audio_queue.get()
if rec.AcceptWaveform(data):
start = time.time()
r = json.loads(rec.Result())
elapsed_ms = int((time.time() - start) * 1000)
user = r.get('text', '').strip()
if user:
print(f"[Timing] STT parse: {elapsed_ms} ms")
print("User:", user)
if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES:
print("[Debug] Ignored low-effort utterance.")
rec = KaldiRecognizer(model, 16000)
continue # Skip LLM response + TTS for accidental noise
messages.append({"role": "user", "content": user})
# Generate assistant response
resp_text = query_glm()
if resp_text:
# Clean debug print (remove newlines and carriage returns)
clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ')
if len(clean_debug_text) > MAX_DEBUG_LEN:
clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...'
print('Assistant:', clean_debug_text)
messages.append({"role": "assistant", "content": clean_debug_text})
# TTS generation + playback
play_response(resp_text)
else:
print('[Debug] Empty response, skipping TTS.')
# Reset recognizer after each full interaction
rec = KaldiRecognizer(model, 16000)
# ------------------- MAIN -------------------
if __name__ == '__main__':
pa, stream = start_stream()
t = threading.Thread(target=processing_loop, daemon=True)
t.start()
try:
while stream.is_active():
time.sleep(0.1)
except KeyboardInterrupt:
stream.stop_stream(); stream.close(); pa.terminate()