Local-Voice/enhanced_wake_and_record.py
2025-09-20 10:53:56 +08:00

501 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
集成语音识别的唤醒+录音系统
基于 simple_wake_and_record.py添加语音识别功能
"""
import sys
import os
import time
import threading
import pyaudio
import json
import asyncio
from typing import Optional, List
# 添加当前目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
from vosk import Model, KaldiRecognizer
VOSK_AVAILABLE = True
except ImportError:
VOSK_AVAILABLE = False
print("⚠️ Vosk 未安装,请运行: pip install vosk")
from speech_recognizer import SpeechRecognizer, RecognitionResult
class EnhancedWakeAndRecord:
"""增强的唤醒+录音系统,集成语音识别"""
def __init__(self, model_path="model", wake_words=["你好", "助手"],
enable_speech_recognition=True, app_key=None, access_key=None):
self.model_path = model_path
self.wake_words = wake_words
self.enable_speech_recognition = enable_speech_recognition
self.model = None
self.recognizer = None
self.audio = None
self.stream = None
self.running = False
# 音频参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 1024
# 录音相关
self.recording = False
self.recorded_frames = []
self.last_text_time = None
self.recording_start_time = None
self.recording_recognizer = None
# 阈值
self.text_silence_threshold = 3.0
self.min_recording_time = 2.0
self.max_recording_time = 30.0
# 语音识别相关
self.speech_recognizer = None
self.last_recognition_result = None
self.recognition_thread = None
# 回调函数
self.on_recognition_result = None
self._setup_model()
self._setup_audio()
self._setup_speech_recognition(app_key, access_key)
def _setup_model(self):
"""设置 Vosk 模型"""
if not VOSK_AVAILABLE:
return
try:
if not os.path.exists(self.model_path):
print(f"模型路径不存在: {self.model_path}")
return
self.model = Model(self.model_path)
self.recognizer = KaldiRecognizer(self.model, self.RATE)
self.recognizer.SetWords(True)
print(f"✅ Vosk 模型加载成功")
except Exception as e:
print(f"模型初始化失败: {e}")
def _setup_audio(self):
"""设置音频设备"""
try:
if self.audio is None:
self.audio = pyaudio.PyAudio()
if self.stream is None:
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"音频设备初始化失败: {e}")
def _setup_speech_recognition(self, app_key=None, access_key=None):
"""设置语音识别"""
if not self.enable_speech_recognition:
return
try:
self.speech_recognizer = SpeechRecognizer(
app_key=app_key,
access_key=access_key
)
print("✅ 语音识别器初始化成功")
except Exception as e:
print(f"语音识别器初始化失败: {e}")
self.enable_speech_recognition = False
def _calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
import numpy as np
audio_array = np.frombuffer(audio_data, dtype=np.int16)
rms = np.sqrt(np.mean(audio_array ** 2))
return rms
def _check_wake_word(self, text):
"""检查是否包含唤醒词"""
if not text or not self.wake_words:
return False, None
text_lower = text.lower()
for wake_word in self.wake_words:
if wake_word.lower() in text_lower:
return True, wake_word
return False, None
def _save_recording(self, audio_data):
"""保存录音"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
import wave
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"保存录音失败: {e}")
return False, None
def _play_audio(self, filename):
"""播放音频文件"""
try:
import wave
# 打开音频文件
with wave.open(filename, 'rb') as wf:
# 获取音频参数
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据,避免内存问题
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
# 等待播放完成
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
except Exception as e:
print(f"❌ 播放失败: {e}")
self._play_with_system_player(filename)
def _play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import platform
import subprocess
system = platform.system()
if system == 'Darwin': # macOS
cmd = ['afplay', filename]
elif system == 'Windows':
cmd = ['start', '/min', filename]
else: # Linux
cmd = ['aplay', filename]
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
except Exception as e:
print(f"❌ 系统播放器也失败: {e}")
print(f"💡 文件已保存,请手动播放: {filename}")
def _start_recognition_thread(self, filename):
"""启动语音识别线程"""
if not self.enable_speech_recognition or not self.speech_recognizer:
return
def recognize_task():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(f"🧠 开始识别录音文件: {filename}")
result = loop.run_until_complete(
self.speech_recognizer.recognize_file(filename)
)
if result:
# 合并所有识别结果
full_text = " ".join([r.text for r in result])
final_result = RecognitionResult(
text=full_text,
confidence=0.9,
is_final=True
)
self.last_recognition_result = final_result
print(f"\n🧠 语音识别结果: {full_text}")
# 调用回调函数
if self.on_recognition_result:
self.on_recognition_result(final_result)
else:
print(f"\n🧠 语音识别失败或未识别到内容")
loop.close()
except Exception as e:
print(f"❌ 语音识别线程异常: {e}")
self.recognition_thread = threading.Thread(target=recognize_task)
self.recognition_thread.daemon = True
self.recognition_thread.start()
def _start_recording(self):
"""开始录音"""
print("🎙️ 开始录音,请说话...")
self.recording = True
self.recorded_frames = []
self.last_text_time = None
self.recording_start_time = time.time()
# 为录音创建一个新的识别器
if self.model:
self.recording_recognizer = KaldiRecognizer(self.model, self.RATE)
self.recording_recognizer.SetWords(True)
def _stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2)
print(f"📝 录音完成,时长: {duration:.2f}")
# 保存录音
success, filename = self._save_recording(audio_data)
# 如果保存成功,播放录音并进行语音识别
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
self._play_audio(filename)
print("=" * 50)
# 启动语音识别
if self.enable_speech_recognition:
print("🧠 准备进行语音识别...")
self._start_recognition_thread(filename)
self.recording = False
self.recorded_frames = []
self.last_text_time = None
self.recording_start_time = None
self.recording_recognizer = None
def set_recognition_callback(self, callback):
"""设置识别结果回调函数"""
self.on_recognition_result = callback
def get_last_recognition_result(self) -> Optional[RecognitionResult]:
"""获取最后一次识别结果"""
return self.last_recognition_result
def start(self):
"""开始唤醒词检测和录音"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"唤醒词: {', '.join(self.wake_words)}")
if self.enable_speech_recognition:
print("🧠 语音识别: 已启用")
else:
print("🧠 语音识别: 已禁用")
try:
while self.running:
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 使用录音专用的识别器进行实时识别
if self.recording_recognizer:
if self.recording_recognizer.AcceptWaveform(data):
result = json.loads(self.recording_recognizer.Result())
text = result.get('text', '').strip()
if text:
self.last_text_time = time.time()
print(f"\n📝 实时识别: {text}")
else:
partial_result = json.loads(self.recording_recognizer.PartialResult())
partial_text = partial_result.get('partial', '').strip()
if partial_text:
self.last_text_time = time.time()
status = f"录音中... {recording_duration:.1f}s | {partial_text}"
print(f"\r{status}", end='', flush=True)
# 检查是否需要结束录音
current_time = time.time()
if self.last_text_time is not None:
text_silence_duration = current_time - self.last_text_time
if text_silence_duration > self.text_silence_threshold and recording_duration >= self.min_recording_time:
print(f"\n\n3秒没有识别到文字结束录音")
self._stop_recording()
else:
if recording_duration > 5.0:
print(f"\n\n5秒没有识别到文字结束录音")
self._stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n\n达到最大录音时间 {self.max_recording_time}s")
self._stop_recording()
# 显示录音状态
if self.last_text_time is None:
status = f"等待语音输入... {recording_duration:.1f}s"
print(f"\r{status}", end='', flush=True)
elif self.model and self.recognizer:
# 唤醒词检测模式
if self.recognizer.AcceptWaveform(data):
result = json.loads(self.recognizer.Result())
text = result.get('text', '').strip()
if text:
print(f"识别: {text}")
# 检查唤醒词
is_wake_word, detected_word = self._check_wake_word(text)
if is_wake_word:
print(f"🎯 检测到唤醒词: {detected_word}")
self._start_recording()
else:
# 显示实时音频级别
energy = self._calculate_energy(data)
if energy > 50:
partial_result = json.loads(self.recognizer.PartialResult())
partial_text = partial_result.get('partial', '')
if partial_text:
status = f"监听中... 能量: {energy:.0f} | {partial_text}"
else:
status = f"监听中... 能量: {energy:.0f}"
print(status, end='\r')
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"错误: {e}")
finally:
self.stop()
def stop(self):
"""停止"""
self.running = False
if self.recording:
self._stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.audio:
self.audio.terminate()
self.audio = None
# 等待识别线程结束
if self.recognition_thread and self.recognition_thread.is_alive():
self.recognition_thread.join(timeout=5.0)
def main():
"""主函数"""
print("🚀 增强版唤醒+录音+语音识别测试")
print("=" * 50)
# 检查模型
model_dir = "model"
if not os.path.exists(model_dir):
print("⚠️ 未找到模型目录")
print("请下载 Vosk 模型到 model 目录")
return
# 创建系统
system = EnhancedWakeAndRecord(
model_path=model_dir,
wake_words=["你好", "助手", "小爱"],
enable_speech_recognition=True,
# app_key="your_app_key", # 请填入实际的app_key
# access_key="your_access_key" # 请填入实际的access_key
)
if not system.model:
print("❌ 模型加载失败")
return
# 设置识别结果回调
def on_recognition_result(result):
print(f"\n🎯 识别完成!结果: {result.text}")
print(f" 置信度: {result.confidence}")
print(f" 是否最终结果: {result.is_final}")
system.set_recognition_callback(on_recognition_result)
print("✅ 系统初始化成功")
print("📖 使用说明:")
print("1. 说唤醒词开始录音")
print("2. 基于语音识别判断3秒没有识别到文字就结束")
print("3. 最少录音2秒最多30秒")
print("4. 录音时实时显示识别结果")
print("5. 录音文件自动保存")
print("6. 录音完成后自动播放刚才录制的内容")
print("7. 启动语音识别对录音文件进行识别")
print("8. 按 Ctrl+C 退出")
print("=" * 50)
# 开始运行
system.start()
if __name__ == "__main__":
main()