Local-Voice/simple_wake_and_record.py
2025-09-20 10:53:56 +08:00

481 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化的唤醒+录音测试
专注于解决音频冲突问题
"""
import sys
import os
import time
import threading
import pyaudio
import json
# 添加当前目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
from vosk import Model, KaldiRecognizer
VOSK_AVAILABLE = True
except ImportError:
VOSK_AVAILABLE = False
print("⚠️ Vosk 未安装,请运行: pip install vosk")
class SimpleWakeAndRecord:
"""简化的唤醒+录音系统"""
def __init__(self, model_path="model", wake_words=["你好", "助手"]):
self.model_path = model_path
self.wake_words = wake_words
self.model = None
self.recognizer = None
self.audio = None
self.stream = None
self.running = False
# 音频参数 - 优化为树莓派3B
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 8000 # 从16kHz降至8kHz减少50%数据处理量
self.CHUNK_SIZE = 2048 # 增大块大小,减少处理次数
# 录音相关
self.recording = False
self.recorded_frames = []
self.last_text_time = None # 最后一次识别到文字的时间
self.recording_start_time = None
self.recording_recognizer = None # 录音时专用的识别器
# 性能优化相关
self.audio_buffer = [] # 音频缓冲区
self.buffer_size = 10 # 缓冲区大小(块数)
self.last_process_time = time.time() # 上次处理时间
self.process_interval = 0.5 # 处理间隔(秒)
self.batch_process_size = 5 # 批处理大小
# 性能监控
self.process_count = 0
self.avg_process_time = 0
self.last_monitor_time = time.time()
self.monitor_interval = 5.0 # 监控间隔(秒)
# 阈值
self.text_silence_threshold = 3.0 # 3秒没有识别到文字就结束
self.min_recording_time = 2.0 # 最小录音时间
self.max_recording_time = 30.0 # 最大录音时间
self._setup_model()
self._setup_audio()
def _setup_model(self):
"""设置 Vosk 模型"""
if not VOSK_AVAILABLE:
return
try:
if not os.path.exists(self.model_path):
print(f"模型路径不存在: {self.model_path}")
return
self.model = Model(self.model_path)
self.recognizer = KaldiRecognizer(self.model, self.RATE)
self.recognizer.SetWords(True)
print(f"✅ Vosk 模型加载成功")
except Exception as e:
print(f"模型初始化失败: {e}")
def _setup_audio(self):
"""设置音频设备"""
try:
if self.audio is None:
self.audio = pyaudio.PyAudio()
if self.stream is None:
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"音频设备初始化失败: {e}")
def _calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
import numpy as np
audio_array = np.frombuffer(audio_data, dtype=np.int16)
rms = np.sqrt(np.mean(audio_array ** 2))
return rms
def _check_wake_word(self, text):
"""检查是否包含唤醒词"""
if not text or not self.wake_words:
return False, None
text_lower = text.lower()
for wake_word in self.wake_words:
if wake_word.lower() in text_lower:
return True, wake_word
return False, None
def _should_process_audio(self):
"""判断是否应该处理音频"""
current_time = time.time()
return (current_time - self.last_process_time >= self.process_interval and
len(self.audio_buffer) >= self.batch_process_size)
def _process_audio_batch(self):
"""批量处理音频数据"""
if len(self.audio_buffer) < self.batch_process_size:
return
# 记录处理开始时间
start_time = time.time()
# 取出批处理数据
batch_data = self.audio_buffer[:self.batch_process_size]
self.audio_buffer = self.audio_buffer[self.batch_process_size:]
# 合并音频数据
combined_data = b''.join(batch_data)
# 更新处理时间
self.last_process_time = time.time()
# 更新性能统计
process_time = time.time() - start_time
self.process_count += 1
self.avg_process_time = (self.avg_process_time * (self.process_count - 1) + process_time) / self.process_count
# 性能监控
self._monitor_performance()
return combined_data
def _monitor_performance(self):
"""性能监控"""
current_time = time.time()
if current_time - self.last_monitor_time >= self.monitor_interval:
buffer_usage = len(self.audio_buffer) / self.buffer_size * 100
print(f"\n📊 性能监控 | 处理次数: {self.process_count} | 平均处理时间: {self.avg_process_time:.3f}s | 缓冲区使用: {buffer_usage:.1f}%")
self.last_monitor_time = current_time
def _save_recording(self, audio_data):
"""保存录音"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
import wave
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"保存录音失败: {e}")
return False, None
def _play_audio(self, filename):
"""播放音频文件"""
try:
import wave
# 打开音频文件
with wave.open(filename, 'rb') as wf:
# 获取音频参数
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据,避免内存问题
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
# 等待播放完成
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
except Exception as e:
print(f"❌ 播放失败: {e}")
# 如果pyaudio播放失败尝试用系统命令播放
self._play_with_system_player(filename)
def _play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import platform
import subprocess
system = platform.system()
if system == 'Darwin': # macOS
cmd = ['afplay', filename]
elif system == 'Windows':
cmd = ['start', '/min', filename]
else: # Linux
cmd = ['aplay', filename]
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
except Exception as e:
print(f"❌ 系统播放器也失败: {e}")
print(f"💡 文件已保存,请手动播放: {filename}")
def _start_recording(self):
"""开始录音"""
print("🎙️ 开始录音,请说话...")
self.recording = True
self.recorded_frames = []
self.last_text_time = None
self.recording_start_time = time.time()
# 为录音创建一个新的识别器
if self.model:
self.recording_recognizer = KaldiRecognizer(self.model, self.RATE)
self.recording_recognizer.SetWords(True)
def _stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2)
print(f"📝 录音完成,时长: {duration:.2f}")
# 保存录音
success, filename = self._save_recording(audio_data)
# 如果保存成功,播放录音
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
self._play_audio(filename)
print("=" * 50)
self.recording = False
self.recorded_frames = []
self.last_text_time = None
self.recording_start_time = None
self.recording_recognizer = None
def start(self):
"""开始唤醒词检测和录音"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"唤醒词: {', '.join(self.wake_words)}")
try:
while self.running:
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
if self.recording:
# 录音模式 - 直接处理
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 录音时使用批处理进行识别
self.audio_buffer.append(data)
# 限制缓冲区大小
if len(self.audio_buffer) > self.buffer_size:
self.audio_buffer.pop(0)
# 批处理识别
if self._should_process_audio() and self.recording_recognizer:
combined_data = self._process_audio_batch()
if combined_data and self.recording_recognizer.AcceptWaveform(combined_data):
# 获取最终识别结果
result = json.loads(self.recording_recognizer.Result())
text = result.get('text', '').strip()
if text:
# 识别到文字,更新时间戳
self.last_text_time = time.time()
print(f"\n📝 识别: {text}")
elif combined_data:
# 获取部分识别结果
partial_result = json.loads(self.recording_recognizer.PartialResult())
partial_text = partial_result.get('partial', '').strip()
if partial_text:
# 更新时间戳(部分识别也算有声音)
self.last_text_time = time.time()
status = f"录音中... {recording_duration:.1f}s | {partial_text}"
print(f"\r{status}", end='', flush=True)
# 检查是否需要结束录音
current_time = time.time()
# 检查是否有文字识别超时
if self.last_text_time is not None:
text_silence_duration = current_time - self.last_text_time
if text_silence_duration > self.text_silence_threshold and recording_duration >= self.min_recording_time:
print(f"\n\n3秒没有识别到文字结束录音")
self._stop_recording()
else:
# 还没有识别到任何文字,检查是否超时
if recording_duration > 5.0: # 如果5秒还没识别到任何文字也结束
print(f"\n\n5秒没有识别到文字结束录音")
self._stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n\n达到最大录音时间 {self.max_recording_time}s")
self._stop_recording()
# 显示录音状态
if self.last_text_time is None:
status = f"等待语音输入... {recording_duration:.1f}s"
print(f"\r{status}", end='', flush=True)
elif self.model and self.recognizer:
# 唤醒词检测模式 - 使用批处理
self.audio_buffer.append(data)
# 限制缓冲区大小
if len(self.audio_buffer) > self.buffer_size:
self.audio_buffer.pop(0)
# 批处理识别
if self._should_process_audio():
combined_data = self._process_audio_batch()
if combined_data and self.recognizer.AcceptWaveform(combined_data):
result = json.loads(self.recognizer.Result())
text = result.get('text', '').strip()
if text:
print(f"识别: {text}")
# 检查唤醒词
is_wake_word, detected_word = self._check_wake_word(text)
if is_wake_word:
print(f"🎯 检测到唤醒词: {detected_word}")
self._start_recording()
else:
# 显示实时音频级别
energy = self._calculate_energy(data)
if energy > 50: # 只显示有意义的音频级别
partial_result = json.loads(self.recognizer.PartialResult())
partial_text = partial_result.get('partial', '')
if partial_text:
status = f"监听中... 能量: {energy:.0f} | {partial_text}"
else:
status = f"监听中... 能量: {energy:.0f}"
print(status, end='\r')
time.sleep(0.05) # 增加延迟减少CPU使用
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"错误: {e}")
finally:
self.stop()
def stop(self):
"""停止"""
self.running = False
if self.recording:
self._stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.audio:
self.audio.terminate()
self.audio = None
def main():
"""主函数"""
print("🚀 简化唤醒+录音测试")
print("=" * 50)
# 检查模型
model_dir = "model"
if not os.path.exists(model_dir):
print("⚠️ 未找到模型目录")
print("请下载 Vosk 模型到 model 目录")
return
# 创建系统
system = SimpleWakeAndRecord(
model_path=model_dir,
wake_words=["你好", "助手", "小爱"]
)
if not system.model:
print("❌ 模型加载失败")
return
print("✅ 系统初始化成功")
print("📖 使用说明:")
print("1. 说唤醒词开始录音")
print("2. 基于语音识别判断3秒没有识别到文字就结束")
print("3. 最少录音2秒最多30秒")
print("4. 录音时实时显示识别结果")
print("5. 录音文件自动保存")
print("6. 录音完成后自动播放刚才录制的内容")
print("7. 按 Ctrl+C 退出")
print("🚀 性能优化已启用:")
print(" - 采样率: 8kHz (降低50%数据量)")
print(" - 批处理: 5个音频块/次")
print(" - 处理间隔: 0.5秒")
print(" - 缓冲区: 10个音频块")
print(" - 性能监控: 每5秒显示")
print("=" * 50)
# 开始运行
system.start()
if __name__ == "__main__":
main()