#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简化的唤醒+录音测试 专注于解决音频冲突问题 """ import sys import os import time import threading import pyaudio import json # 添加当前目录到路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: from vosk import Model, KaldiRecognizer VOSK_AVAILABLE = True except ImportError: VOSK_AVAILABLE = False print("⚠️ Vosk 未安装,请运行: pip install vosk") class SimpleWakeAndRecord: """简化的唤醒+录音系统""" def __init__(self, model_path="model", wake_words=["你好", "助手"]): self.model_path = model_path self.wake_words = wake_words self.model = None self.recognizer = None self.audio = None self.stream = None self.running = False # 音频参数 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 self.CHUNK_SIZE = 1024 # 录音相关 self.recording = False self.recorded_frames = [] self.last_text_time = None # 最后一次识别到文字的时间 self.recording_start_time = None self.recording_recognizer = None # 录音时专用的识别器 # 阈值 self.text_silence_threshold = 3.0 # 3秒没有识别到文字就结束 self.min_recording_time = 2.0 # 最小录音时间 self.max_recording_time = 30.0 # 最大录音时间 self._setup_model() self._setup_audio() def _setup_model(self): """设置 Vosk 模型""" if not VOSK_AVAILABLE: return try: if not os.path.exists(self.model_path): print(f"模型路径不存在: {self.model_path}") return self.model = Model(self.model_path) self.recognizer = KaldiRecognizer(self.model, self.RATE) self.recognizer.SetWords(True) print(f"✅ Vosk 模型加载成功") except Exception as e: print(f"模型初始化失败: {e}") def _setup_audio(self): """设置音频设备""" try: if self.audio is None: self.audio = pyaudio.PyAudio() if self.stream is None: self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE ) print("✅ 音频设备初始化成功") except Exception as e: print(f"音频设备初始化失败: {e}") def _calculate_energy(self, audio_data): """计算音频能量""" if len(audio_data) == 0: return 0 import numpy as np audio_array = np.frombuffer(audio_data, dtype=np.int16) rms = np.sqrt(np.mean(audio_array ** 2)) return rms def _check_wake_word(self, text): """检查是否包含唤醒词""" if not text or not self.wake_words: return False, None text_lower = text.lower() for wake_word in self.wake_words: if wake_word.lower() in text_lower: return True, wake_word return False, None def _save_recording(self, audio_data): """保存录音""" timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.wav" try: import wave with wave.open(filename, 'wb') as wf: wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(audio_data) print(f"✅ 录音已保存: {filename}") return True, filename except Exception as e: print(f"保存录音失败: {e}") return False, None def _play_audio(self, filename): """播放音频文件""" try: import wave # 打开音频文件 with wave.open(filename, 'rb') as wf: # 获取音频参数 channels = wf.getnchannels() width = wf.getsampwidth() rate = wf.getframerate() total_frames = wf.getnframes() # 分块读取音频数据,避免内存问题 chunk_size = 1024 frames = [] for _ in range(0, total_frames, chunk_size): chunk = wf.readframes(chunk_size) if chunk: frames.append(chunk) else: break # 创建播放流 playback_stream = self.audio.open( format=self.audio.get_format_from_width(width), channels=channels, rate=rate, output=True ) print(f"🔊 开始播放: {filename}") # 分块播放音频 for chunk in frames: playback_stream.write(chunk) # 等待播放完成 playback_stream.stop_stream() playback_stream.close() print("✅ 播放完成") except Exception as e: print(f"❌ 播放失败: {e}") # 如果pyaudio播放失败,尝试用系统命令播放 self._play_with_system_player(filename) def _play_with_system_player(self, filename): """使用系统播放器播放音频""" try: import platform import subprocess system = platform.system() if system == 'Darwin': # macOS cmd = ['afplay', filename] elif system == 'Windows': cmd = ['start', '/min', filename] else: # Linux cmd = ['aplay', filename] print(f"🔊 使用系统播放器: {' '.join(cmd)}") subprocess.run(cmd, check=True) print("✅ 播放完成") except Exception as e: print(f"❌ 系统播放器也失败: {e}") print(f"💡 文件已保存,请手动播放: {filename}") def _start_recording(self): """开始录音""" print("🎙️ 开始录音,请说话...") self.recording = True self.recorded_frames = [] self.last_text_time = None self.recording_start_time = time.time() # 为录音创建一个新的识别器 if self.model: self.recording_recognizer = KaldiRecognizer(self.model, self.RATE) self.recording_recognizer.SetWords(True) def _stop_recording(self): """停止录音""" if len(self.recorded_frames) > 0: audio_data = b''.join(self.recorded_frames) duration = len(audio_data) / (self.RATE * 2) print(f"📝 录音完成,时长: {duration:.2f}秒") # 保存录音 success, filename = self._save_recording(audio_data) # 如果保存成功,播放录音 if success and filename: print("=" * 50) print("🔊 播放刚才录制的音频...") self._play_audio(filename) print("=" * 50) self.recording = False self.recorded_frames = [] self.last_text_time = None self.recording_start_time = None self.recording_recognizer = None def start(self): """开始唤醒词检测和录音""" if not self.stream: print("❌ 音频设备未初始化") return self.running = True print("🎤 开始监听...") print(f"唤醒词: {', '.join(self.wake_words)}") try: while self.running: # 读取音频数据 data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) if len(data) == 0: continue if self.recording: # 录音模式 self.recorded_frames.append(data) recording_duration = time.time() - self.recording_start_time # 使用录音专用的识别器进行实时识别 if self.recording_recognizer: if self.recording_recognizer.AcceptWaveform(data): # 获取最终识别结果 result = json.loads(self.recording_recognizer.Result()) text = result.get('text', '').strip() if text: # 识别到文字,更新时间戳 self.last_text_time = time.time() print(f"\n📝 识别: {text}") else: # 获取部分识别结果 partial_result = json.loads(self.recording_recognizer.PartialResult()) partial_text = partial_result.get('partial', '').strip() if partial_text: # 更新时间戳(部分识别也算有声音) self.last_text_time = time.time() status = f"录音中... {recording_duration:.1f}s | {partial_text}" print(f"\r{status}", end='', flush=True) # 检查是否需要结束录音 current_time = time.time() # 检查是否有文字识别超时 if self.last_text_time is not None: text_silence_duration = current_time - self.last_text_time if text_silence_duration > self.text_silence_threshold and recording_duration >= self.min_recording_time: print(f"\n\n3秒没有识别到文字,结束录音") self._stop_recording() else: # 还没有识别到任何文字,检查是否超时 if recording_duration > 5.0: # 如果5秒还没识别到任何文字,也结束 print(f"\n\n5秒没有识别到文字,结束录音") self._stop_recording() # 检查最大录音时间 if recording_duration > self.max_recording_time: print(f"\n\n达到最大录音时间 {self.max_recording_time}s") self._stop_recording() # 显示录音状态 if self.last_text_time is None: status = f"等待语音输入... {recording_duration:.1f}s" print(f"\r{status}", end='', flush=True) elif self.model and self.recognizer: # 唤醒词检测模式 if self.recognizer.AcceptWaveform(data): result = json.loads(self.recognizer.Result()) text = result.get('text', '').strip() if text: print(f"识别: {text}") # 检查唤醒词 is_wake_word, detected_word = self._check_wake_word(text) if is_wake_word: print(f"🎯 检测到唤醒词: {detected_word}") self._start_recording() else: # 显示实时音频级别 energy = self._calculate_energy(data) if energy > 50: # 只显示有意义的音频级别 partial_result = json.loads(self.recognizer.PartialResult()) partial_text = partial_result.get('partial', '') if partial_text: status = f"监听中... 能量: {energy:.0f} | {partial_text}" else: status = f"监听中... 能量: {energy:.0f}" print(status, end='\r') time.sleep(0.01) except KeyboardInterrupt: print("\n👋 退出") except Exception as e: print(f"错误: {e}") finally: self.stop() def stop(self): """停止""" self.running = False if self.recording: self._stop_recording() if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None if self.audio: self.audio.terminate() self.audio = None def main(): """主函数""" print("🚀 简化唤醒+录音测试") print("=" * 50) # 检查模型 model_dir = "model" if not os.path.exists(model_dir): print("⚠️ 未找到模型目录") print("请下载 Vosk 模型到 model 目录") return # 创建系统 system = SimpleWakeAndRecord( model_path=model_dir, wake_words=["你好", "助手", "小爱"] ) if not system.model: print("❌ 模型加载失败") return print("✅ 系统初始化成功") print("📖 使用说明:") print("1. 说唤醒词开始录音") print("2. 基于语音识别判断,3秒没有识别到文字就结束") print("3. 最少录音2秒,最多30秒") print("4. 录音时实时显示识别结果") print("5. 录音文件自动保存") print("6. 录音完成后自动播放刚才录制的内容") print("7. 按 Ctrl+C 退出") print("=" * 50) # 开始运行 system.start() if __name__ == "__main__": main()