#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 集成语音识别的唤醒+录音系统 基于 simple_wake_and_record.py,添加语音识别功能 """ import sys import os import time import threading import pyaudio import json import asyncio from typing import Optional, List # 添加当前目录到路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: from vosk import Model, KaldiRecognizer VOSK_AVAILABLE = True except ImportError: VOSK_AVAILABLE = False print("⚠️ Vosk 未安装,请运行: pip install vosk") from speech_recognizer import SpeechRecognizer, RecognitionResult class EnhancedWakeAndRecord: """增强的唤醒+录音系统,集成语音识别""" def __init__(self, model_path="model", wake_words=["你好", "助手"], enable_speech_recognition=True, app_key=None, access_key=None): self.model_path = model_path self.wake_words = wake_words self.enable_speech_recognition = enable_speech_recognition self.model = None self.recognizer = None self.audio = None self.stream = None self.running = False # 音频参数 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 self.CHUNK_SIZE = 1024 # 录音相关 self.recording = False self.recorded_frames = [] self.last_text_time = None self.recording_start_time = None self.recording_recognizer = None # 阈值 self.text_silence_threshold = 3.0 self.min_recording_time = 2.0 self.max_recording_time = 30.0 # 语音识别相关 self.speech_recognizer = None self.last_recognition_result = None self.recognition_thread = None # 回调函数 self.on_recognition_result = None self._setup_model() self._setup_audio() self._setup_speech_recognition(app_key, access_key) def _setup_model(self): """设置 Vosk 模型""" if not VOSK_AVAILABLE: return try: if not os.path.exists(self.model_path): print(f"模型路径不存在: {self.model_path}") return self.model = Model(self.model_path) self.recognizer = KaldiRecognizer(self.model, self.RATE) self.recognizer.SetWords(True) print(f"✅ Vosk 模型加载成功") except Exception as e: print(f"模型初始化失败: {e}") def _setup_audio(self): """设置音频设备""" try: if self.audio is None: self.audio = pyaudio.PyAudio() if self.stream is None: self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE ) print("✅ 音频设备初始化成功") except Exception as e: print(f"音频设备初始化失败: {e}") def _setup_speech_recognition(self, app_key=None, access_key=None): """设置语音识别""" if not self.enable_speech_recognition: return try: self.speech_recognizer = SpeechRecognizer( app_key=app_key, access_key=access_key ) print("✅ 语音识别器初始化成功") except Exception as e: print(f"语音识别器初始化失败: {e}") self.enable_speech_recognition = False def _calculate_energy(self, audio_data): """计算音频能量""" if len(audio_data) == 0: return 0 import numpy as np audio_array = np.frombuffer(audio_data, dtype=np.int16) rms = np.sqrt(np.mean(audio_array ** 2)) return rms def _check_wake_word(self, text): """检查是否包含唤醒词""" if not text or not self.wake_words: return False, None text_lower = text.lower() for wake_word in self.wake_words: if wake_word.lower() in text_lower: return True, wake_word return False, None def _save_recording(self, audio_data): """保存录音""" timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.wav" try: import wave with wave.open(filename, 'wb') as wf: wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(audio_data) print(f"✅ 录音已保存: {filename}") return True, filename except Exception as e: print(f"保存录音失败: {e}") return False, None def _play_audio(self, filename): """播放音频文件""" try: import wave # 打开音频文件 with wave.open(filename, 'rb') as wf: # 获取音频参数 channels = wf.getnchannels() width = wf.getsampwidth() rate = wf.getframerate() total_frames = wf.getnframes() # 分块读取音频数据,避免内存问题 chunk_size = 1024 frames = [] for _ in range(0, total_frames, chunk_size): chunk = wf.readframes(chunk_size) if chunk: frames.append(chunk) else: break # 创建播放流 playback_stream = self.audio.open( format=self.audio.get_format_from_width(width), channels=channels, rate=rate, output=True ) print(f"🔊 开始播放: {filename}") # 分块播放音频 for chunk in frames: playback_stream.write(chunk) # 等待播放完成 playback_stream.stop_stream() playback_stream.close() print("✅ 播放完成") except Exception as e: print(f"❌ 播放失败: {e}") self._play_with_system_player(filename) def _play_with_system_player(self, filename): """使用系统播放器播放音频""" try: import platform import subprocess system = platform.system() if system == 'Darwin': # macOS cmd = ['afplay', filename] elif system == 'Windows': cmd = ['start', '/min', filename] else: # Linux cmd = ['aplay', filename] print(f"🔊 使用系统播放器: {' '.join(cmd)}") subprocess.run(cmd, check=True) print("✅ 播放完成") except Exception as e: print(f"❌ 系统播放器也失败: {e}") print(f"💡 文件已保存,请手动播放: {filename}") def _start_recognition_thread(self, filename): """启动语音识别线程""" if not self.enable_speech_recognition or not self.speech_recognizer: return def recognize_task(): try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) print(f"🧠 开始识别录音文件: {filename}") result = loop.run_until_complete( self.speech_recognizer.recognize_file(filename) ) if result: # 合并所有识别结果 full_text = " ".join([r.text for r in result]) final_result = RecognitionResult( text=full_text, confidence=0.9, is_final=True ) self.last_recognition_result = final_result print(f"\n🧠 语音识别结果: {full_text}") # 调用回调函数 if self.on_recognition_result: self.on_recognition_result(final_result) else: print(f"\n🧠 语音识别失败或未识别到内容") loop.close() except Exception as e: print(f"❌ 语音识别线程异常: {e}") self.recognition_thread = threading.Thread(target=recognize_task) self.recognition_thread.daemon = True self.recognition_thread.start() def _start_recording(self): """开始录音""" print("🎙️ 开始录音,请说话...") self.recording = True self.recorded_frames = [] self.last_text_time = None self.recording_start_time = time.time() # 为录音创建一个新的识别器 if self.model: self.recording_recognizer = KaldiRecognizer(self.model, self.RATE) self.recording_recognizer.SetWords(True) def _stop_recording(self): """停止录音""" if len(self.recorded_frames) > 0: audio_data = b''.join(self.recorded_frames) duration = len(audio_data) / (self.RATE * 2) print(f"📝 录音完成,时长: {duration:.2f}秒") # 保存录音 success, filename = self._save_recording(audio_data) # 如果保存成功,播放录音并进行语音识别 if success and filename: print("=" * 50) print("🔊 播放刚才录制的音频...") self._play_audio(filename) print("=" * 50) # 启动语音识别 if self.enable_speech_recognition: print("🧠 准备进行语音识别...") self._start_recognition_thread(filename) self.recording = False self.recorded_frames = [] self.last_text_time = None self.recording_start_time = None self.recording_recognizer = None def set_recognition_callback(self, callback): """设置识别结果回调函数""" self.on_recognition_result = callback def get_last_recognition_result(self) -> Optional[RecognitionResult]: """获取最后一次识别结果""" return self.last_recognition_result def start(self): """开始唤醒词检测和录音""" if not self.stream: print("❌ 音频设备未初始化") return self.running = True print("🎤 开始监听...") print(f"唤醒词: {', '.join(self.wake_words)}") if self.enable_speech_recognition: print("🧠 语音识别: 已启用") else: print("🧠 语音识别: 已禁用") try: while self.running: # 读取音频数据 data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) if len(data) == 0: continue if self.recording: # 录音模式 self.recorded_frames.append(data) recording_duration = time.time() - self.recording_start_time # 使用录音专用的识别器进行实时识别 if self.recording_recognizer: if self.recording_recognizer.AcceptWaveform(data): result = json.loads(self.recording_recognizer.Result()) text = result.get('text', '').strip() if text: self.last_text_time = time.time() print(f"\n📝 实时识别: {text}") else: partial_result = json.loads(self.recording_recognizer.PartialResult()) partial_text = partial_result.get('partial', '').strip() if partial_text: self.last_text_time = time.time() status = f"录音中... {recording_duration:.1f}s | {partial_text}" print(f"\r{status}", end='', flush=True) # 检查是否需要结束录音 current_time = time.time() if self.last_text_time is not None: text_silence_duration = current_time - self.last_text_time if text_silence_duration > self.text_silence_threshold and recording_duration >= self.min_recording_time: print(f"\n\n3秒没有识别到文字,结束录音") self._stop_recording() else: if recording_duration > 5.0: print(f"\n\n5秒没有识别到文字,结束录音") self._stop_recording() # 检查最大录音时间 if recording_duration > self.max_recording_time: print(f"\n\n达到最大录音时间 {self.max_recording_time}s") self._stop_recording() # 显示录音状态 if self.last_text_time is None: status = f"等待语音输入... {recording_duration:.1f}s" print(f"\r{status}", end='', flush=True) elif self.model and self.recognizer: # 唤醒词检测模式 if self.recognizer.AcceptWaveform(data): result = json.loads(self.recognizer.Result()) text = result.get('text', '').strip() if text: print(f"识别: {text}") # 检查唤醒词 is_wake_word, detected_word = self._check_wake_word(text) if is_wake_word: print(f"🎯 检测到唤醒词: {detected_word}") self._start_recording() else: # 显示实时音频级别 energy = self._calculate_energy(data) if energy > 50: partial_result = json.loads(self.recognizer.PartialResult()) partial_text = partial_result.get('partial', '') if partial_text: status = f"监听中... 能量: {energy:.0f} | {partial_text}" else: status = f"监听中... 能量: {energy:.0f}" print(status, end='\r') time.sleep(0.01) except KeyboardInterrupt: print("\n👋 退出") except Exception as e: print(f"错误: {e}") finally: self.stop() def stop(self): """停止""" self.running = False if self.recording: self._stop_recording() if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None if self.audio: self.audio.terminate() self.audio = None # 等待识别线程结束 if self.recognition_thread and self.recognition_thread.is_alive(): self.recognition_thread.join(timeout=5.0) def main(): """主函数""" print("🚀 增强版唤醒+录音+语音识别测试") print("=" * 50) # 检查模型 model_dir = "model" if not os.path.exists(model_dir): print("⚠️ 未找到模型目录") print("请下载 Vosk 模型到 model 目录") return # 创建系统 system = EnhancedWakeAndRecord( model_path=model_dir, wake_words=["你好", "助手", "小爱"], enable_speech_recognition=True, # app_key="your_app_key", # 请填入实际的app_key # access_key="your_access_key" # 请填入实际的access_key ) if not system.model: print("❌ 模型加载失败") return # 设置识别结果回调 def on_recognition_result(result): print(f"\n🎯 识别完成!结果: {result.text}") print(f" 置信度: {result.confidence}") print(f" 是否最终结果: {result.is_final}") system.set_recognition_callback(on_recognition_result) print("✅ 系统初始化成功") print("📖 使用说明:") print("1. 说唤醒词开始录音") print("2. 基于语音识别判断,3秒没有识别到文字就结束") print("3. 最少录音2秒,最多30秒") print("4. 录音时实时显示识别结果") print("5. 录音文件自动保存") print("6. 录音完成后自动播放刚才录制的内容") print("7. 启动语音识别对录音文件进行识别") print("8. 按 Ctrl+C 退出") print("=" * 50) # 开始运行 system.start() if __name__ == "__main__": main()