From 313e5555894f697869a9e85b2020550892eddc0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Thu, 18 Sep 2025 20:13:08 +0800 Subject: [PATCH] fix --- test_audio.py | 101 +++++++++++ voice_assistant_pi.py | 381 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 482 insertions(+) create mode 100644 test_audio.py create mode 100644 voice_assistant_pi.py diff --git a/test_audio.py b/test_audio.py new file mode 100644 index 0000000..fa241a1 --- /dev/null +++ b/test_audio.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +简单的音频测试脚本,用于诊断树莓派上的音频问题 +""" + +import pyaudio +import time +import os + +def test_audio(): + """测试音频设备""" + print("=== 音频设备测试 ===") + + pa = pyaudio.PyAudio() + + # 列出所有设备 + print("\n可用的音频设备:") + for i in range(pa.get_device_count()): + info = pa.get_device_info_by_index(i) + print(f" 设备 {i}: {info['name']}") + print(f" 输入通道: {info['maxInputChannels']}") + print(f" 输出通道: {info['maxOutputChannels']}") + print(f" 默认采样率: {info['defaultSampleRate']}") + print() + + # 查找默认输入设备 + default_input = pa.get_default_input_device_info() + print(f"默认输入设备: {default_input['name']} (索引: {default_input['index']})") + + # 查找默认输出设备 + default_output = pa.get_default_output_device_info() + print(f"默认输出设备: {default_output['name']} (索引: {default_output['index']})") + + pa.terminate() + +def test_recording(): + """测试录音功能""" + print("\n=== 录音测试 ===") + + pa = pyaudio.PyAudio() + + try: + # 设置录音参数 + FORMAT = pyaudio.paInt16 + CHANNELS = 1 + RATE = 16000 # 降低采样率,使用设备默认的44100 + CHUNK = 1024 + + print(f"尝试打开音频流,采样率: {RATE}") + + # 打开音频流 + stream = pa.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK + ) + + print("开始录音5秒...") + frames = [] + + # 录音5秒 + for i in range(0, int(RATE / CHUNK * 5)): + data = stream.read(CHUNK) + frames.append(data) + if i % 10 == 0: + print(f"录音中... {i * CHUNK / RATE:.1f}秒") + + print("录音完成") + + # 停止流 + stream.stop_stream() + stream.close() + + # 播放录音 + print("播放录音...") + stream = pa.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + output=True + ) + + for frame in frames: + stream.write(frame) + + stream.stop_stream() + stream.close() + + print("播放完成") + + except Exception as e: + print(f"录音测试失败: {e}") + + finally: + pa.terminate() + +if __name__ == "__main__": + test_audio() + test_recording() \ No newline at end of file diff --git a/voice_assistant_pi.py b/voice_assistant_pi.py new file mode 100644 index 0000000..d9cce18 --- /dev/null +++ b/voice_assistant_pi.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 +""" +Voice Assistant: Real-Time Voice Chat (修复版) + +修复了树莓派上的音频设备问题 +""" + +import io +import json +import os +import queue +import re +import subprocess +import threading +import time +import wave + +import numpy as np +import pyaudio +import requests +import soxr +from pydub import AudioSegment +from vosk import KaldiRecognizer, Model + + +# ------------------- TIMING UTILITY ------------------- +class Timer: + def __init__(self, label): + self.label = label + self.enabled = True + def __enter__(self): + self.start = time.time() + return self + def __exit__(self, exc_type, exc_val, exc_tb): + if self.enabled: + elapsed_ms = (time.time() - self.start) * 1000 + print(f"[Timing] {self.label}: {elapsed_ms:.0f} ms") + def disable(self): + self.enabled = False + +# ------------------- FUNCTIONS ------------------- + +def get_input_device_index(preferred_name=None): + pa = pyaudio.PyAudio() + try: + # 首先尝试获取默认设备 + if preferred_name is None: + default_input = pa.get_default_input_device_info() + print(f"[Debug] Using default input device: {default_input['name']}") + return default_input['index'] + + # 如果有指定名称,尝试匹配 + for i in range(pa.get_device_count()): + info = pa.get_device_info_by_index(i) + if info['maxInputChannels'] > 0 and preferred_name.lower() in info['name'].lower(): + print(f"[Debug] Selected input device {i}: {info['name']}") + print(f"[Debug] Device sample rate: {info['defaultSampleRate']} Hz") + return i + + # 如果没找到,使用默认设备 + default_input = pa.get_default_input_device_info() + print(f"[Warning] Preferred mic not found. Using default: {default_input['name']}") + return default_input['index'] + finally: + pa.terminate() + +def get_output_device_index(preferred_name=None): + pa = pyaudio.PyAudio() + try: + # 首先尝试获取默认设备 + if preferred_name is None: + default_output = pa.get_default_output_device_info() + print(f"[Debug] Using default output device: {default_output['name']}") + return default_output['index'] + + # 如果有指定名称,尝试匹配 + for i in range(pa.get_device_count()): + info = pa.get_device_info_by_index(i) + if info['maxOutputChannels'] > 0 and preferred_name.lower() in info['name'].lower(): + print(f"[Debug] Selected output device {i}: {info['name']}") + return i + + # 如果没找到,使用默认设备 + default_output = pa.get_default_output_device_info() + print(f"[Warning] Preferred output device not found. Using default: {default_output['name']}") + return default_output['index'] + finally: + pa.terminate() + +def list_input_devices(): + pa = pyaudio.PyAudio() + try: + print("[Debug] Available input devices:") + for i in range(pa.get_device_count()): + info = pa.get_device_info_by_index(i) + if info['maxInputChannels'] > 0: + print(f" {i}: {info['name']} ({int(info['defaultSampleRate'])} Hz, {info['maxInputChannels']}ch)") + finally: + pa.terminate() + +def resample_audio(data, orig_rate=44100, target_rate=16000): + # Convert byte string to numpy array + audio_np = np.frombuffer(data, dtype=np.int16) + # Resample using soxr + resampled_np = soxr.resample(audio_np, orig_rate, target_rate) + # Convert back to bytes + return resampled_np.astype(np.int16).tobytes() + +# ------------------- PATHS ------------------- + +CONFIG_PATH = os.path.expanduser("va_config.json") +BASE_DIR = os.path.dirname(__file__) +MODEL_PATH = os.path.join(BASE_DIR, 'vosk-model') +CHAT_URL = 'https://open.bigmodel.cn/api/paas/v4/chat/completions' +AUTH_TOKEN = '0c9cbaca9d2bbf864990f1e1decdf340.dXRMsZCHTUbPQ0rm' + +# ------------------- CONFIG FILE LOADING ------------------- + +DEFAULT_CONFIG = { + "volume": 8, + "mic_name": None, + "audio_output_device": None, + "model_name": "glm-4.5", + "voice": "en_US-kathleen-low.onnx", + "enable_audio_processing": False, + "history_length": 4, + "system_prompt": "You are a helpful assistant." +} + +def load_config(): + if os.path.isfile(CONFIG_PATH): + try: + with open(CONFIG_PATH, 'r') as f: + user_config = json.load(f) + return {**DEFAULT_CONFIG, **user_config} + except Exception as e: + print(f"[Warning] Failed to load system config: {e}") + + print("[Debug] Using default config.") + return DEFAULT_CONFIG + +config = load_config() + +# Apply loaded config values +VOLUME = config["volume"] +MIC_NAME = config["mic_name"] +AUDIO_OUTPUT_DEVICE = config["audio_output_device"] +AUDIO_OUTPUT_DEVICE_INDEX = get_output_device_index(config["audio_output_device"]) +MODEL_NAME = config["model_name"] +VOICE_MODEL = os.path.join("voices", config["voice"]) +ENABLE_AUDIO_PROCESSING = config["enable_audio_processing"] +HISTORY_LENGTH = config["history_length"] + +# Setup messages with system prompt +messages = [{"role": "system", "content": config["system_prompt"]}] + +list_input_devices() +DEVICE_INDEX = get_input_device_index(config["mic_name"]) + +# 从设备获取采样率 +pa = pyaudio.PyAudio() +device_info = pa.get_device_info_by_index(DEVICE_INDEX) +INPUT_RATE = int(device_info['defaultSampleRate']) +OUTPUT_RATE = int(device_info['defaultSampleRate']) +pa.terminate() + +CHUNK = 1024 +CHANNELS = 1 +mic_enabled = True + +print(f"[Debug] Using sample rate: {INPUT_RATE} Hz") +print(f"[Debug] Config loaded: model={MODEL_NAME}, voice={config['voice']}, vol={VOLUME}") + +# ------------------- CONVERSATION STATE ------------------- + +audio_queue = queue.Queue() + +# Audio callback +def audio_callback(in_data, frame_count, time_info, status): + global mic_enabled + if not mic_enabled: + return (None, pyaudio.paContinue) + resampled_data = resample_audio(in_data, orig_rate=INPUT_RATE, target_rate=16000) + audio_queue.put(resampled_data) + return (None, pyaudio.paContinue) + +# ------------------- STREAM SETUP ------------------- + +def start_stream(): + pa = pyaudio.PyAudio() + + stream = pa.open( + rate=INPUT_RATE, # 使用设备的默认采样率 + format=pyaudio.paInt16, + channels=CHANNELS, + input=True, + input_device_index=DEVICE_INDEX, + frames_per_buffer=CHUNK, + stream_callback=audio_callback + ) + stream.start_stream() + print(f'[Debug] Stream @ {INPUT_RATE}Hz') + return pa, stream + +# ------------------- QUERY GLM API ------------------- + +def query_glm(): + headers = { + 'Authorization': 'Bearer ' + AUTH_TOKEN, + 'Content-Type': 'application/json' + } + payload = { + "model": "glm-4.5", + "messages": [messages[0]] + messages[-HISTORY_LENGTH:], + "temperature": 0.6, + "max_tokens": 1024, + "stream": False + } + + with Timer("Inference"): + try: + resp = requests.post(CHAT_URL, json=payload, headers=headers) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + print(f"[Error] GLM API request failed: {e}") + return '' + + data = resp.json() + reply = '' + if 'choices' in data and len(data['choices']) > 0: + choice = data['choices'][0] + if 'message' in choice and 'content' in choice['message']: + reply = choice['message']['content'].strip() + return reply + +# ------------------- TTS & DEGRADATION ------------------- + +def play_response(text): + global mic_enabled + mic_enabled = False + + # clean the response + clean = re.sub(r"[\*]+", '', text) + clean = re.sub(r"\(.*?\)", '', clean) + clean = re.sub(r"<.*?>", '', clean) + clean = clean.replace('\n', ' ').strip() + clean = re.sub(r'\s+', ' ', clean) + clean = re.sub(r'[\U0001F300-\U0001FAFF\u2600-\u26FF\u2700-\u27BF]+', '', clean) + + piper_path = os.path.join(BASE_DIR, 'bin', 'piper', 'piper') + + if not os.path.exists(piper_path): + print(f"[Error] Piper executable not found at {piper_path}") + mic_enabled = True + return + + try: + # Generate Piper raw PCM + with Timer("Piper inference"): + piper_proc = subprocess.Popen( + [piper_path, '--model', VOICE_MODEL, '--output_raw'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL + ) + tts_pcm, _ = piper_proc.communicate(input=clean.encode()) + + # Convert raw PCM to WAV for playback + wav_io = io.BytesIO() + with wave.open(wav_io, 'wb') as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(16000) + wf.writeframes(tts_pcm) + + wav_io.seek(0) + wf = wave.open(wav_io, 'rb') + + # Playback + with Timer("Playback"): + pa = pyaudio.PyAudio() + stream = pa.open( + format=pa.get_format_from_width(wf.getsampwidth()), + channels=wf.getnchannels(), + rate=wf.getframerate(), + output=True, + output_device_index=AUDIO_OUTPUT_DEVICE_INDEX + ) + + data = wf.readframes(CHUNK) + while data: + stream.write(data) + data = wf.readframes(CHUNK) + + stream.stop_stream() + stream.close() + pa.terminate() + wf.close() + + except Exception as e: + print(f"[Error] TTS playback failed: {e}") + finally: + mic_enabled = True + time.sleep(0.3) + +# ------------------- PROCESSING LOOP ------------------- + +def processing_loop(): + try: + model = Model(MODEL_PATH) + print("[Debug] Vosk model loaded successfully") + except Exception as e: + print(f"[Error] Failed to load Vosk model: {e}") + print(f"[Info] Model path: {MODEL_PATH}") + return + + rec = KaldiRecognizer(model, 16000) + MAX_DEBUG_LEN = 200 + LOW_EFFORT_UTTERANCES = {"huh", "uh", "um", "erm", "hmm", "he's", "but"} + + while True: + try: + data = audio_queue.get() + + if rec.AcceptWaveform(data): + start = time.time() + r = json.loads(rec.Result()) + elapsed_ms = int((time.time() - start) * 1000) + + user = r.get('text', '').strip() + if user: + print(f"[Timing] STT parse: {elapsed_ms} ms") + print("User:", user) + + if user.lower().strip(".,!? ") in LOW_EFFORT_UTTERANCES: + print("[Debug] Ignored low-effort utterance.") + rec = KaldiRecognizer(model, 16000) + continue + + messages.append({"role": "user", "content": user}) + resp_text = query_glm() + + if resp_text: + clean_debug_text = resp_text.replace('\n', ' ').replace('\r', ' ') + if len(clean_debug_text) > MAX_DEBUG_LEN: + clean_debug_text = clean_debug_text[:MAX_DEBUG_LEN] + '...' + + print('Assistant:', clean_debug_text) + messages.append({"role": "assistant", "content": clean_debug_text}) + play_response(resp_text) + else: + print('[Debug] Empty response, skipping TTS.') + + rec = KaldiRecognizer(model, 16000) + + except Exception as e: + print(f"[Error] Processing loop error: {e}") + time.sleep(1) + +# ------------------- MAIN ------------------- + +if __name__ == '__main__': + try: + pa, stream = start_stream() + t = threading.Thread(target=processing_loop, daemon=True) + t.start() + + print("[Debug] Voice assistant started. Press Ctrl+C to exit.") + while stream.is_active(): + time.sleep(0.1) + + except KeyboardInterrupt: + print("[Debug] Shutting down...") + stream.stop_stream() + stream.close() + pa.terminate() + except Exception as e: + print(f"[Error] Main loop error: {e}") + stream.stop_stream() + stream.close() + pa.terminate() \ No newline at end of file