diff --git a/.DS_Store b/.DS_Store index 2f240b4..866472b 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/enhanced_wake_and_record.py b/enhanced_wake_and_record.py deleted file mode 100644 index e55fc08..0000000 --- a/enhanced_wake_and_record.py +++ /dev/null @@ -1,501 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -集成语音识别的唤醒+录音系统 -基于 simple_wake_and_record.py,添加语音识别功能 -""" - -import sys -import os -import time -import threading -import pyaudio -import json -import asyncio -from typing import Optional, List - -# 添加当前目录到路径 -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -try: - from vosk import Model, KaldiRecognizer - VOSK_AVAILABLE = True -except ImportError: - VOSK_AVAILABLE = False - print("⚠️ Vosk 未安装,请运行: pip install vosk") - -from speech_recognizer import SpeechRecognizer, RecognitionResult - -class EnhancedWakeAndRecord: - """增强的唤醒+录音系统,集成语音识别""" - - def __init__(self, model_path="model", wake_words=["你好", "助手"], - enable_speech_recognition=True, app_key=None, access_key=None): - self.model_path = model_path - self.wake_words = wake_words - self.enable_speech_recognition = enable_speech_recognition - self.model = None - self.recognizer = None - self.audio = None - self.stream = None - self.running = False - - # 音频参数 - self.FORMAT = pyaudio.paInt16 - self.CHANNELS = 1 - self.RATE = 16000 - self.CHUNK_SIZE = 1024 - - # 录音相关 - self.recording = False - self.recorded_frames = [] - self.last_text_time = None - self.recording_start_time = None - self.recording_recognizer = None - - # 阈值 - self.text_silence_threshold = 3.0 - self.min_recording_time = 2.0 - self.max_recording_time = 30.0 - - # 语音识别相关 - self.speech_recognizer = None - self.last_recognition_result = None - self.recognition_thread = None - - # 回调函数 - self.on_recognition_result = None - - self._setup_model() - self._setup_audio() - self._setup_speech_recognition(app_key, access_key) - - def _setup_model(self): - """设置 Vosk 模型""" - if not VOSK_AVAILABLE: - return - - try: - if not os.path.exists(self.model_path): - print(f"模型路径不存在: {self.model_path}") - return - - self.model = Model(self.model_path) - self.recognizer = KaldiRecognizer(self.model, self.RATE) - self.recognizer.SetWords(True) - - print(f"✅ Vosk 模型加载成功") - - except Exception as e: - print(f"模型初始化失败: {e}") - - def _setup_audio(self): - """设置音频设备""" - try: - if self.audio is None: - self.audio = pyaudio.PyAudio() - - if self.stream is None: - self.stream = self.audio.open( - format=self.FORMAT, - channels=self.CHANNELS, - rate=self.RATE, - input=True, - frames_per_buffer=self.CHUNK_SIZE - ) - - print("✅ 音频设备初始化成功") - - except Exception as e: - print(f"音频设备初始化失败: {e}") - - def _setup_speech_recognition(self, app_key=None, access_key=None): - """设置语音识别""" - if not self.enable_speech_recognition: - return - - try: - self.speech_recognizer = SpeechRecognizer( - app_key=app_key, - access_key=access_key - ) - print("✅ 语音识别器初始化成功") - except Exception as e: - print(f"语音识别器初始化失败: {e}") - self.enable_speech_recognition = False - - def _calculate_energy(self, audio_data): - """计算音频能量""" - if len(audio_data) == 0: - return 0 - - import numpy as np - audio_array = np.frombuffer(audio_data, dtype=np.int16) - rms = np.sqrt(np.mean(audio_array ** 2)) - return rms - - def _check_wake_word(self, text): - """检查是否包含唤醒词""" - if not text or not self.wake_words: - return False, None - - text_lower = text.lower() - for wake_word in self.wake_words: - if wake_word.lower() in text_lower: - return True, wake_word - return False, None - - def _save_recording(self, audio_data): - """保存录音""" - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"recording_{timestamp}.wav" - - try: - import wave - with wave.open(filename, 'wb') as wf: - wf.setnchannels(self.CHANNELS) - wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) - wf.setframerate(self.RATE) - wf.writeframes(audio_data) - - print(f"✅ 录音已保存: {filename}") - return True, filename - except Exception as e: - print(f"保存录音失败: {e}") - return False, None - - def _play_audio(self, filename): - """播放音频文件""" - try: - import wave - - # 打开音频文件 - with wave.open(filename, 'rb') as wf: - # 获取音频参数 - channels = wf.getnchannels() - width = wf.getsampwidth() - rate = wf.getframerate() - total_frames = wf.getnframes() - - # 分块读取音频数据,避免内存问题 - chunk_size = 1024 - frames = [] - - for _ in range(0, total_frames, chunk_size): - chunk = wf.readframes(chunk_size) - if chunk: - frames.append(chunk) - else: - break - - # 创建播放流 - playback_stream = self.audio.open( - format=self.audio.get_format_from_width(width), - channels=channels, - rate=rate, - output=True - ) - - print(f"🔊 开始播放: {filename}") - - # 分块播放音频 - for chunk in frames: - playback_stream.write(chunk) - - # 等待播放完成 - playback_stream.stop_stream() - playback_stream.close() - - print("✅ 播放完成") - - except Exception as e: - print(f"❌ 播放失败: {e}") - self._play_with_system_player(filename) - - def _play_with_system_player(self, filename): - """使用系统播放器播放音频""" - try: - import platform - import subprocess - - system = platform.system() - - if system == 'Darwin': # macOS - cmd = ['afplay', filename] - elif system == 'Windows': - cmd = ['start', '/min', filename] - else: # Linux - cmd = ['aplay', filename] - - print(f"🔊 使用系统播放器: {' '.join(cmd)}") - subprocess.run(cmd, check=True) - print("✅ 播放完成") - - except Exception as e: - print(f"❌ 系统播放器也失败: {e}") - print(f"💡 文件已保存,请手动播放: {filename}") - - def _start_recognition_thread(self, filename): - """启动语音识别线程""" - if not self.enable_speech_recognition or not self.speech_recognizer: - return - - def recognize_task(): - try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - print(f"🧠 开始识别录音文件: {filename}") - result = loop.run_until_complete( - self.speech_recognizer.recognize_file(filename) - ) - - if result: - # 合并所有识别结果 - full_text = " ".join([r.text for r in result]) - final_result = RecognitionResult( - text=full_text, - confidence=0.9, - is_final=True - ) - - self.last_recognition_result = final_result - print(f"\n🧠 语音识别结果: {full_text}") - - # 调用回调函数 - if self.on_recognition_result: - self.on_recognition_result(final_result) - else: - print(f"\n🧠 语音识别失败或未识别到内容") - - loop.close() - - except Exception as e: - print(f"❌ 语音识别线程异常: {e}") - - self.recognition_thread = threading.Thread(target=recognize_task) - self.recognition_thread.daemon = True - self.recognition_thread.start() - - def _start_recording(self): - """开始录音""" - print("🎙️ 开始录音,请说话...") - self.recording = True - self.recorded_frames = [] - self.last_text_time = None - self.recording_start_time = time.time() - - # 为录音创建一个新的识别器 - if self.model: - self.recording_recognizer = KaldiRecognizer(self.model, self.RATE) - self.recording_recognizer.SetWords(True) - - def _stop_recording(self): - """停止录音""" - if len(self.recorded_frames) > 0: - audio_data = b''.join(self.recorded_frames) - duration = len(audio_data) / (self.RATE * 2) - print(f"📝 录音完成,时长: {duration:.2f}秒") - - # 保存录音 - success, filename = self._save_recording(audio_data) - - # 如果保存成功,播放录音并进行语音识别 - if success and filename: - print("=" * 50) - print("🔊 播放刚才录制的音频...") - self._play_audio(filename) - print("=" * 50) - - # 启动语音识别 - if self.enable_speech_recognition: - print("🧠 准备进行语音识别...") - self._start_recognition_thread(filename) - - self.recording = False - self.recorded_frames = [] - self.last_text_time = None - self.recording_start_time = None - self.recording_recognizer = None - - def set_recognition_callback(self, callback): - """设置识别结果回调函数""" - self.on_recognition_result = callback - - def get_last_recognition_result(self) -> Optional[RecognitionResult]: - """获取最后一次识别结果""" - return self.last_recognition_result - - def start(self): - """开始唤醒词检测和录音""" - if not self.stream: - print("❌ 音频设备未初始化") - return - - self.running = True - print("🎤 开始监听...") - print(f"唤醒词: {', '.join(self.wake_words)}") - if self.enable_speech_recognition: - print("🧠 语音识别: 已启用") - else: - print("🧠 语音识别: 已禁用") - - try: - while self.running: - # 读取音频数据 - data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) - - if len(data) == 0: - continue - - if self.recording: - # 录音模式 - self.recorded_frames.append(data) - recording_duration = time.time() - self.recording_start_time - - # 使用录音专用的识别器进行实时识别 - if self.recording_recognizer: - if self.recording_recognizer.AcceptWaveform(data): - result = json.loads(self.recording_recognizer.Result()) - text = result.get('text', '').strip() - - if text: - self.last_text_time = time.time() - print(f"\n📝 实时识别: {text}") - else: - partial_result = json.loads(self.recording_recognizer.PartialResult()) - partial_text = partial_result.get('partial', '').strip() - - if partial_text: - self.last_text_time = time.time() - status = f"录音中... {recording_duration:.1f}s | {partial_text}" - print(f"\r{status}", end='', flush=True) - - # 检查是否需要结束录音 - current_time = time.time() - - if self.last_text_time is not None: - text_silence_duration = current_time - self.last_text_time - if text_silence_duration > self.text_silence_threshold and recording_duration >= self.min_recording_time: - print(f"\n\n3秒没有识别到文字,结束录音") - self._stop_recording() - else: - if recording_duration > 5.0: - print(f"\n\n5秒没有识别到文字,结束录音") - self._stop_recording() - - # 检查最大录音时间 - if recording_duration > self.max_recording_time: - print(f"\n\n达到最大录音时间 {self.max_recording_time}s") - self._stop_recording() - - # 显示录音状态 - if self.last_text_time is None: - status = f"等待语音输入... {recording_duration:.1f}s" - print(f"\r{status}", end='', flush=True) - - elif self.model and self.recognizer: - # 唤醒词检测模式 - if self.recognizer.AcceptWaveform(data): - result = json.loads(self.recognizer.Result()) - text = result.get('text', '').strip() - - if text: - print(f"识别: {text}") - - # 检查唤醒词 - is_wake_word, detected_word = self._check_wake_word(text) - if is_wake_word: - print(f"🎯 检测到唤醒词: {detected_word}") - self._start_recording() - else: - # 显示实时音频级别 - energy = self._calculate_energy(data) - if energy > 50: - partial_result = json.loads(self.recognizer.PartialResult()) - partial_text = partial_result.get('partial', '') - if partial_text: - status = f"监听中... 能量: {energy:.0f} | {partial_text}" - else: - status = f"监听中... 能量: {energy:.0f}" - print(status, end='\r') - - time.sleep(0.01) - - except KeyboardInterrupt: - print("\n👋 退出") - except Exception as e: - print(f"错误: {e}") - finally: - self.stop() - - def stop(self): - """停止""" - self.running = False - if self.recording: - self._stop_recording() - - if self.stream: - self.stream.stop_stream() - self.stream.close() - self.stream = None - - if self.audio: - self.audio.terminate() - self.audio = None - - # 等待识别线程结束 - if self.recognition_thread and self.recognition_thread.is_alive(): - self.recognition_thread.join(timeout=5.0) - -def main(): - """主函数""" - print("🚀 增强版唤醒+录音+语音识别测试") - print("=" * 50) - - # 检查模型 - model_dir = "model" - if not os.path.exists(model_dir): - print("⚠️ 未找到模型目录") - print("请下载 Vosk 模型到 model 目录") - return - - # 创建系统 - system = EnhancedWakeAndRecord( - model_path=model_dir, - wake_words=["你好", "助手", "小爱"], - enable_speech_recognition=True, - # app_key="your_app_key", # 请填入实际的app_key - # access_key="your_access_key" # 请填入实际的access_key - ) - - if not system.model: - print("❌ 模型加载失败") - return - - # 设置识别结果回调 - def on_recognition_result(result): - print(f"\n🎯 识别完成!结果: {result.text}") - print(f" 置信度: {result.confidence}") - print(f" 是否最终结果: {result.is_final}") - - system.set_recognition_callback(on_recognition_result) - - print("✅ 系统初始化成功") - print("📖 使用说明:") - print("1. 说唤醒词开始录音") - print("2. 基于语音识别判断,3秒没有识别到文字就结束") - print("3. 最少录音2秒,最多30秒") - print("4. 录音时实时显示识别结果") - print("5. 录音文件自动保存") - print("6. 录音完成后自动播放刚才录制的内容") - print("7. 启动语音识别对录音文件进行识别") - print("8. 按 Ctrl+C 退出") - print("=" * 50) - - # 开始运行 - system.start() - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/energy_based_recorder.py b/recorder.py similarity index 77% rename from energy_based_recorder.py rename to recorder.py index 27bf6fc..ded2d44 100644 --- a/energy_based_recorder.py +++ b/recorder.py @@ -39,7 +39,8 @@ class EnergyBasedRecorder: self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None - self.energy_history = [] # 能量历史 + self.energy_history = [] + self.zcr_history = [] # ZCR历史 self.max_energy_history = 50 # 最大能量历史记录 # 预录音缓冲区 @@ -49,6 +50,17 @@ class EnergyBasedRecorder: # 播放状态 self.is_playing = False # 是否正在播放 + # 智能静音检测 + self.voice_activity_history = [] # 语音活动历史 + self.max_voice_history = 20 # 最大语音活动历史记录 + self.consecutive_silence_count = 0 # 连续静音计数 + self.silence_threshold_count = 15 # 连续静音次数阈值(约1.5秒) + + # 智能ZCR静音检测 + self.max_zcr_history = 30 # 最大ZCR历史记录 + self.consecutive_low_zcr_count = 0 # 连续低ZCR计数 + self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值(约2秒) + # 性能监控 self.frame_count = 0 self.start_time = time.time() @@ -100,7 +112,7 @@ class EnergyBasedRecorder: return peak_energy def calculate_zero_crossing_rate(self, audio_data): - """计算零交叉率(辅助判断语音)""" + """计算零交叉率(主要语音检测方法)""" if len(audio_data) == 0: return 0 @@ -112,40 +124,27 @@ class EnergyBasedRecorder: # 归一化到采样率 zcr = zero_crossings / len(audio_array) * self.RATE + # 更新ZCR历史 + self.zcr_history.append(zcr) + if len(self.zcr_history) > self.max_zcr_history: + self.zcr_history.pop(0) + return zcr def is_voice_active_advanced(self, energy, zcr): - """高级语音活动检测""" - # 动态阈值:基于背景噪音 - if len(self.energy_history) >= 10: - # 使用最近10个样本的中位数作为背景噪音 - background_energy = np.median(self.energy_history[-10:]) - - # 动态阈值:背景噪音 + 50%(提高敏感性) - dynamic_threshold = max(50, background_energy * 1.5) - - # 能量条件 - energy_condition = energy > dynamic_threshold - - # 零交叉率条件:语音通常在1000-5000 Hz之间 - # 对于8kHz采样率,ZCR通常在1000-4000之间 - zcr_condition = 1000 < zcr < 4000 - - # 同时满足能量和ZCR条件才认为是语音 - return energy_condition and zcr_condition - else: - # 初始阶段使用固定阈值 - return energy > 60 and zcr > 1000 # 更严格的初始条件 - - def get_average_energy(self): - """获取平均能量水平""" - if not self.energy_history: - return 0 - return np.mean(self.energy_history) + """仅使用ZCR进行语音活动检测""" + # ZCR语音检测:提高到1200-6000 Hz之间,更好地区分语音和环境噪音 + # 说话时ZCR会比较稳定在这个范围内 + zcr_condition = 1200 < zcr < 6000 + + # 添加一些容错,避免短暂的ZCR波动导致误判 + return zcr_condition + def is_voice_active(self, energy): - """判断是否有人声""" - return energy > self.energy_threshold + """已弃用 - 仅用于兼容性""" + # 现在主要使用ZCR检测,这个方法保留但不再使用 + return False def save_recording(self, audio_data, filename=None): """保存录音""" @@ -181,6 +180,7 @@ class EnergyBasedRecorder: # 清空所有缓冲区 self.pre_record_buffer = [] self.energy_history = [] + self.zcr_history = [] # 完全关闭输入流 if self.stream: @@ -247,6 +247,7 @@ class EnergyBasedRecorder: # 重置所有状态 self.energy_history = [] + self.zcr_history = [] print("📡 音频输入已重新开启") def play_with_system_player(self, filename): @@ -277,6 +278,7 @@ class EnergyBasedRecorder: # 清空所有缓冲区 self.pre_record_buffer = [] self.energy_history = [] + self.zcr_history = [] # 完全关闭输入流 if self.stream: @@ -312,6 +314,7 @@ class EnergyBasedRecorder: # 重置所有状态 self.energy_history = [] + self.zcr_history = [] print("📡 音频输入已重新开启") def update_pre_record_buffer(self, audio_data): @@ -336,7 +339,13 @@ class EnergyBasedRecorder: self.recording_start_time = time.time() self.last_sound_time = time.time() - self.energy_history = [] # 重置能量历史 + self.energy_history = [] + self.zcr_history = [] # 重置ZCR历史 + + # 重置ZCR相关计数器 + self.consecutive_low_zcr_count = 0 + self.consecutive_silence_count = 0 + self.voice_activity_history = [] def stop_recording(self): """停止录音""" @@ -366,6 +375,7 @@ class EnergyBasedRecorder: self.recording_start_time = None self.last_sound_time = None self.energy_history = [] + self.zcr_history = [] def monitor_performance(self): """性能监控""" @@ -397,7 +407,7 @@ class EnergyBasedRecorder: self.running = True print("🎤 开始监听...") - print(f"能量阈值: {self.energy_threshold}") + print(f"能量阈值: {self.energy_threshold} (已弃用)") print(f"静音阈值: {self.silence_threshold}秒") print("📖 使用说明:") print("- 检测到声音自动开始录音") @@ -406,11 +416,16 @@ class EnergyBasedRecorder: print("- 录音完成后自动播放") print("- 按 Ctrl+C 退出") print("🎯 新增功能:") - print("- 动态阈值调整(基于背景噪音)") + print("- 纯ZCR语音检测(移除能量检测)") print("- 零交叉率检测(区分语音和噪音)") - print("- 实时显示ZCR和背景能量") + print("- 实时显示ZCR状态") print("- 预录音功能(包含声音开始前2秒)") print("- 环形缓冲区防止丢失开头音频") + print("🤖 纯ZCR静音检测:") + print("- 连续低ZCR计数(20次=2秒)") + print("- ZCR活动历史追踪") + print("- 基于ZCR模式的静音验证") + print("- 语音范围: 1200-6000 Hz (提高阈值)") print("=" * 50) try: @@ -442,28 +457,60 @@ class EnergyBasedRecorder: self.recorded_frames.append(data) recording_duration = time.time() - self.recording_start_time - # 更新最后声音时间 + # 基于ZCR的智能静音检测 if self.is_voice_active_advanced(energy, zcr): self.last_sound_time = time.time() + self.consecutive_low_zcr_count = 0 # 重置低ZCR计数 + self.consecutive_silence_count = 0 # 重置静音计数 + else: + self.consecutive_low_zcr_count += 1 # 增加低ZCR计数 + self.consecutive_silence_count += 1 # 增加静音计数 + + # 更新ZCR活动历史(基于ZCR是否在语音范围内) + self.voice_activity_history.append(1200 < zcr < 6000) + if len(self.voice_activity_history) > self.max_voice_history: + self.voice_activity_history.pop(0) # 检查是否应该结束录音 current_time = time.time() - # 检查静音超时 - if current_time - self.last_sound_time > self.silence_threshold: - if recording_duration >= self.min_recording_time: - print(f"\n🔇 检测到持续静音 {self.silence_threshold}秒,结束录音") - self.stop_recording() + # 纯ZCR静音检测 + should_stop = False + stop_reason = "" + + # 主要检测:连续低ZCR计数 + if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count: + # 进一步验证:检查最近的ZCR活动历史 + if len(self.voice_activity_history) >= 15: + recent_voice_activity = sum(self.voice_activity_history[-15:]) + if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动 + should_stop = True + stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)" + else: + # 如果历史数据不足,使用基础检测 + should_stop = True + stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)" + + # 备用检测:基于时间的静音检测 + if not should_stop and current_time - self.last_sound_time > self.silence_threshold: + should_stop = True + stop_reason = f"时间静音检测 ({self.silence_threshold}秒)" + + # 执行停止录音 + if should_stop and recording_duration >= self.min_recording_time: + print(f"\n🔇 {stop_reason},结束录音") + self.stop_recording() # 检查最大录音时间 if recording_duration > self.max_recording_time: print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}秒") self.stop_recording() - # 显示录音状态(包含调试信息) - bg_energy = np.median(self.energy_history[-10:]) if len(self.energy_history) >= 10 else 0 + # 显示录音状态(仅ZCR相关信息) is_voice = self.is_voice_active_advanced(energy, zcr) - status = f"录音中... {recording_duration:.1f}s | RMS: {energy:.0f} | 峰值: {peak_energy:.0f} | ZCR: {zcr:.0f} | 语音: {is_voice}" + zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}" + recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0 + status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}" print(f"\r{status}", end='', flush=True) else: @@ -475,12 +522,10 @@ class EnergyBasedRecorder: # 检测到声音,开始录音 self.start_recording() else: - # 显示监听状态(包含调试信息) - avg_energy = self.get_average_energy() - bg_energy = np.median(self.energy_history[-10:]) if len(self.energy_history) >= 10 else 0 - buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100 + # 显示监听状态(仅ZCR相关信息) is_voice = self.is_voice_active_advanced(energy, zcr) - status = f"监听中... RMS: {energy:.0f} | 峰值: {peak_energy:.0f} | ZCR: {zcr:.0f} | 背景: {bg_energy:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%" + buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100 + status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%" print(f"\r{status}", end='', flush=True) # 减少CPU使用 diff --git a/simple_wake_and_record.py b/simple_wake_and_record.py deleted file mode 100644 index 2e38396..0000000 --- a/simple_wake_and_record.py +++ /dev/null @@ -1,580 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -简化的唤醒+录音测试 -专注于解决音频冲突问题 -""" - -import sys -import os -import time -import threading -import pyaudio -import json - -# 添加当前目录到路径 -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -try: - from vosk import Model, KaldiRecognizer - VOSK_AVAILABLE = True -except ImportError: - VOSK_AVAILABLE = False - print("⚠️ Vosk 未安装,请运行: pip install vosk") - -class SimpleWakeAndRecord: - """简化的唤醒+录音系统""" - - def __init__(self, model_path="model", wake_words=["你好", "助手"]): - self.model_path = model_path - self.wake_words = wake_words - self.model = None - self.recognizer = None - self.audio = None - self.stream = None - self.running = False - - # 音频参数 - 激进优化为树莓派3B - self.FORMAT = pyaudio.paInt16 - self.CHANNELS = 1 - self.RATE = 8000 # 从16kHz降至8kHz,减少50%数据处理量 - self.CHUNK_SIZE = 4096 # 进一步增大块大小到4KB,大幅减少处理次数 - self.AGGRESSIVE_MODE = True # 激进优化模式 - - # 录音相关 - self.recording = False - self.recorded_frames = [] - self.last_text_time = None # 最后一次识别到文字的时间 - self.recording_start_time = None - self.recording_recognizer = None # 录音时专用的识别器 - - # 性能优化相关 - 激进优化 - self.audio_buffer = [] # 音频缓冲区 - self.buffer_size = 5 # 减小缓冲区大小,减少内存使用 - self.last_process_time = time.time() # 上次处理时间 - self.process_interval = 0.2 # 缩短处理间隔,提高响应速度 - self.batch_process_size = 3 # 减少批处理大小,更快处理 - self.skip_partial_results = True # 跳过部分识别结果,只处理最终结果 - - # 性能监控 - self.process_count = 0 - self.avg_process_time = 0 - self.last_monitor_time = time.time() - self.monitor_interval = 3.0 # 缩短监控间隔 - - # 延迟监控 - self.audio_receive_times = [] # 音频接收时间戳 - self.process_start_times = [] # 处理开始时间 - self.latency_samples = [] # 延迟样本 - self.max_latency_samples = 10 # 最大延迟样本数 - - # 阈值 - self.text_silence_threshold = 3.0 # 3秒没有识别到文字就结束 - self.min_recording_time = 2.0 # 最小录音时间 - self.max_recording_time = 30.0 # 最大录音时间 - - self._setup_model() - self._setup_audio() - - def _setup_model(self): - """设置 Vosk 模型""" - if not VOSK_AVAILABLE: - return - - try: - if not os.path.exists(self.model_path): - print(f"模型路径不存在: {self.model_path}") - return - - print(f"🔄 正在加载模型,这可能需要一些时间...") - start_time = time.time() - - self.model = Model(self.model_path) - self.recognizer = KaldiRecognizer(self.model, self.RATE) - - # 激进模式:禁用词级识别以提高性能 - if self.AGGRESSIVE_MODE: - self.recognizer.SetWords(False) - print(f"📉 激进模式:已禁用词级识别以提高性能") - else: - self.recognizer.SetWords(True) - - load_time = time.time() - start_time - print(f"✅ Vosk 模型加载成功 (耗时: {load_time:.2f}s)") - - except Exception as e: - print(f"模型初始化失败: {e}") - - def _setup_audio(self): - """设置音频设备""" - try: - if self.audio is None: - self.audio = pyaudio.PyAudio() - - if self.stream is None: - self.stream = self.audio.open( - format=self.FORMAT, - channels=self.CHANNELS, - rate=self.RATE, - input=True, - frames_per_buffer=self.CHUNK_SIZE - ) - - print("✅ 音频设备初始化成功") - - except Exception as e: - print(f"音频设备初始化失败: {e}") - - def _calculate_energy(self, audio_data): - """计算音频能量""" - if len(audio_data) == 0: - return 0 - - import numpy as np - audio_array = np.frombuffer(audio_data, dtype=np.int16) - rms = np.sqrt(np.mean(audio_array ** 2)) - return rms - - def _check_wake_word(self, text): - """检查是否包含唤醒词""" - if not text or not self.wake_words: - return False, None - - text_lower = text.lower() - for wake_word in self.wake_words: - if wake_word.lower() in text_lower: - return True, wake_word - return False, None - - def _should_process_audio(self): - """判断是否应该处理音频""" - current_time = time.time() - return (current_time - self.last_process_time >= self.process_interval and - len(self.audio_buffer) >= self.batch_process_size) - - def _process_audio_batch(self): - """批量处理音频数据""" - if len(self.audio_buffer) < self.batch_process_size: - return - - # 记录处理开始时间 - start_time = time.time() - self.process_start_times.append(start_time) - - # 取出批处理数据 - batch_data = self.audio_buffer[:self.batch_process_size] - self.audio_buffer = self.audio_buffer[self.batch_process_size:] - - # 合并音频数据 - combined_data = b''.join(batch_data) - - # 更新处理时间 - self.last_process_time = time.time() - - # 更新性能统计 - process_time = time.time() - start_time - self.process_count += 1 - self.avg_process_time = (self.avg_process_time * (self.process_count - 1) + process_time) / self.process_count - - # 性能监控 - self._monitor_performance() - - return combined_data - - def _monitor_performance(self): - """性能监控""" - current_time = time.time() - if current_time - self.last_monitor_time >= self.monitor_interval: - buffer_usage = len(self.audio_buffer) / self.buffer_size * 100 - - # 计算平均延迟 - avg_latency = 0 - if self.latency_samples: - avg_latency = sum(self.latency_samples) / len(self.latency_samples) - - print(f"\n📊 性能监控 | 处理次数: {self.process_count} | 平均处理时间: {self.avg_process_time:.3f}s | 缓冲区使用: {buffer_usage:.1f}% | 平均延迟: {avg_latency:.2f}s") - self.last_monitor_time = current_time - - def _calculate_latency(self, audio_time): - """计算音频延迟""" - current_time = time.time() - latency = current_time - audio_time - - # 添加到延迟样本 - self.latency_samples.append(latency) - if len(self.latency_samples) > self.max_latency_samples: - self.latency_samples.pop(0) - - return latency - - def _lightweight_recognition(self, recognizer, audio_data): - """轻量级识别处理""" - if not recognizer: - return None - - # 激进模式:跳过部分识别结果,只处理最终结果 - if self.skip_partial_results: - if recognizer.AcceptWaveform(audio_data): - result = json.loads(recognizer.Result()) - return result.get('text', '').strip() - else: - # 标准模式:处理部分和最终结果 - if recognizer.AcceptWaveform(audio_data): - result = json.loads(recognizer.Result()) - return result.get('text', '').strip() - else: - partial_result = json.loads(recognizer.PartialResult()) - return partial_result.get('partial', '').strip() - - return None - - def _save_recording(self, audio_data): - """保存录音""" - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"recording_{timestamp}.wav" - - try: - import wave - with wave.open(filename, 'wb') as wf: - wf.setnchannels(self.CHANNELS) - wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) - wf.setframerate(self.RATE) - wf.writeframes(audio_data) - - print(f"✅ 录音已保存: {filename}") - return True, filename - except Exception as e: - print(f"保存录音失败: {e}") - return False, None - - def _play_audio(self, filename): - """播放音频文件""" - try: - import wave - - # 打开音频文件 - with wave.open(filename, 'rb') as wf: - # 获取音频参数 - channels = wf.getnchannels() - width = wf.getsampwidth() - rate = wf.getframerate() - total_frames = wf.getnframes() - - # 分块读取音频数据,避免内存问题 - chunk_size = 1024 - frames = [] - - for _ in range(0, total_frames, chunk_size): - chunk = wf.readframes(chunk_size) - if chunk: - frames.append(chunk) - else: - break - - # 创建播放流 - playback_stream = self.audio.open( - format=self.audio.get_format_from_width(width), - channels=channels, - rate=rate, - output=True - ) - - print(f"🔊 开始播放: {filename}") - - # 分块播放音频 - for chunk in frames: - playback_stream.write(chunk) - - # 等待播放完成 - playback_stream.stop_stream() - playback_stream.close() - - print("✅ 播放完成") - - except Exception as e: - print(f"❌ 播放失败: {e}") - # 如果pyaudio播放失败,尝试用系统命令播放 - self._play_with_system_player(filename) - - def _play_with_system_player(self, filename): - """使用系统播放器播放音频""" - try: - import platform - import subprocess - - system = platform.system() - - if system == 'Darwin': # macOS - cmd = ['afplay', filename] - elif system == 'Windows': - cmd = ['start', '/min', filename] - else: # Linux - cmd = ['aplay', filename] - - print(f"🔊 使用系统播放器: {' '.join(cmd)}") - subprocess.run(cmd, check=True) - print("✅ 播放完成") - - except Exception as e: - print(f"❌ 系统播放器也失败: {e}") - print(f"💡 文件已保存,请手动播放: {filename}") - - def _start_recording(self): - """开始录音""" - print("🎙️ 开始录音,请说话...") - self.recording = True - self.recorded_frames = [] - self.last_text_time = None - self.recording_start_time = time.time() - - # 为录音创建一个新的识别器 - if self.model: - self.recording_recognizer = KaldiRecognizer(self.model, self.RATE) - # 激进模式:禁用词级识别以提高性能 - if self.AGGRESSIVE_MODE: - self.recording_recognizer.SetWords(False) - else: - self.recording_recognizer.SetWords(True) - - def _stop_recording(self): - """停止录音""" - if len(self.recorded_frames) > 0: - audio_data = b''.join(self.recorded_frames) - duration = len(audio_data) / (self.RATE * 2) - print(f"📝 录音完成,时长: {duration:.2f}秒") - - # 保存录音 - success, filename = self._save_recording(audio_data) - - # 如果保存成功,播放录音 - if success and filename: - print("=" * 50) - print("🔊 播放刚才录制的音频...") - self._play_audio(filename) - print("=" * 50) - - self.recording = False - self.recorded_frames = [] - self.last_text_time = None - self.recording_start_time = None - self.recording_recognizer = None - - def start(self): - """开始唤醒词检测和录音""" - if not self.stream: - print("❌ 音频设备未初始化") - return - - self.running = True - print("🎤 开始监听...") - print(f"唤醒词: {', '.join(self.wake_words)}") - - try: - while self.running: - # 读取音频数据 - receive_time = time.time() - data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) - - if len(data) == 0: - continue - - # 记录音频接收时间 - self.audio_receive_times.append(receive_time) - if len(self.audio_receive_times) > self.max_latency_samples: - self.audio_receive_times.pop(0) - - if self.recording: - # 录音模式 - 激进优化处理 - self.recorded_frames.append(data) - recording_duration = time.time() - self.recording_start_time - - # 录音时使用批处理进行识别 - self.audio_buffer.append(data) - - # 限制缓冲区大小 - if len(self.audio_buffer) > self.buffer_size: - self.audio_buffer.pop(0) - - # 激进模式:直接处理,不等待批处理 - if self.AGGRESSIVE_MODE and self.recording_recognizer: - # 直接处理当前音频块 - text = self._lightweight_recognition(self.recording_recognizer, data) - if text: - # 计算延迟 - if self.audio_receive_times: - latency = self._calculate_latency(self.audio_receive_times[0]) - self.audio_receive_times.pop(0) - - # 识别到文字,更新时间戳 - self.last_text_time = time.time() - print(f"\n📝 识别: {text} (延迟: {latency:.2f}s)") - else: - # 标准批处理模式 - if self._should_process_audio() and self.recording_recognizer: - combined_data = self._process_audio_batch() - if combined_data: - text = self._lightweight_recognition(self.recording_recognizer, combined_data) - if text: - # 计算延迟 - if self.process_start_times: - process_start = self.process_start_times[0] - self.process_start_times.pop(0) - if self.audio_receive_times: - audio_time = self.audio_receive_times[0] - self.audio_receive_times.pop(0) - latency = process_start - audio_time - self._calculate_latency(audio_time) - - self.last_text_time = time.time() - print(f"\n📝 识别: {text}") - - # 检查是否需要结束录音 - current_time = time.time() - - # 激进模式:缩短超时时间 - timeout_duration = 2.0 if self.AGGRESSIVE_MODE else 5.0 - - if self.last_text_time is not None: - text_silence_duration = current_time - self.last_text_time - if text_silence_duration > self.text_silence_threshold and recording_duration >= self.min_recording_time: - print(f"\n\n3秒没有识别到文字,结束录音") - self._stop_recording() - else: - # 还没有识别到任何文字,检查是否超时 - if recording_duration > timeout_duration: - print(f"\n\n{timeout_duration}秒没有识别到文字,结束录音") - self._stop_recording() - - # 检查最大录音时间 - if recording_duration > self.max_recording_time: - print(f"\n\n达到最大录音时间 {self.max_recording_time}s") - self._stop_recording() - - # 显示录音状态 - if self.last_text_time is None: - status = f"等待语音输入... {recording_duration:.1f}s" - print(f"\r{status}", end='', flush=True) - - elif self.model and self.recognizer: - # 唤醒词检测模式 - 激进优化 - if self.AGGRESSIVE_MODE: - # 直接处理,不使用缓冲区 - text = self._lightweight_recognition(self.recognizer, data) - if text: - print(f"识别: {text}") - - # 检查唤醒词 - is_wake_word, detected_word = self._check_wake_word(text) - if is_wake_word: - print(f"🎯 检测到唤醒词: {detected_word}") - self._start_recording() - - # 显示实时音频级别(仅在高能量时) - energy = self._calculate_energy(data) - if energy > 100: # 提高阈值,减少显示频率 - status = f"监听中... 能量: {energy:.0f}" - print(status, end='\r') - else: - # 标准批处理模式 - self.audio_buffer.append(data) - - # 限制缓冲区大小 - if len(self.audio_buffer) > self.buffer_size: - self.audio_buffer.pop(0) - - # 批处理识别 - if self._should_process_audio(): - combined_data = self._process_audio_batch() - if combined_data: - text = self._lightweight_recognition(self.recognizer, combined_data) - if text: - print(f"识别: {text}") - - # 检查唤醒词 - is_wake_word, detected_word = self._check_wake_word(text) - if is_wake_word: - print(f"🎯 检测到唤醒词: {detected_word}") - self._start_recording() - - # 显示实时音频级别 - energy = self._calculate_energy(data) - if energy > 50: - status = f"监听中... 能量: {energy:.0f}" - print(status, end='\r') - - # 激进模式:更长的延迟以减少CPU使用 - sleep_time = 0.1 if self.AGGRESSIVE_MODE else 0.05 - time.sleep(sleep_time) - - except KeyboardInterrupt: - print("\n👋 退出") - except Exception as e: - print(f"错误: {e}") - finally: - self.stop() - - def stop(self): - """停止""" - self.running = False - if self.recording: - self._stop_recording() - - if self.stream: - self.stream.stop_stream() - self.stream.close() - self.stream = None - - if self.audio: - self.audio.terminate() - self.audio = None - -def main(): - """主函数""" - print("🚀 简化唤醒+录音测试") - print("=" * 50) - - # 检查模型 - model_dir = "model" - if not os.path.exists(model_dir): - print("⚠️ 未找到模型目录") - print("请下载 Vosk 模型到 model 目录") - return - - # 创建系统 - system = SimpleWakeAndRecord( - model_path=model_dir, - wake_words=["你好", "助手", "小爱"] - ) - - if not system.model: - print("❌ 模型加载失败") - return - - print("✅ 系统初始化成功") - print("📖 使用说明:") - print("1. 说唤醒词开始录音") - print("2. 基于语音识别判断,3秒没有识别到文字就结束") - print("3. 最少录音2秒,最多30秒") - print("4. 录音时实时显示识别结果") - print("5. 录音文件自动保存") - print("6. 录音完成后自动播放刚才录制的内容") - print("7. 按 Ctrl+C 退出") - print("🚀 激进性能优化已启用:") - print(" - 采样率: 8kHz (降低50%数据量)") - print(" - 块大小: 4096字节 (4倍于原始大小)") - print(" - 激进模式: 已启用 (直接处理,跳过部分结果)") - print(" - 批处理: 3个音频块/次") - print(" - 处理间隔: 0.2秒") - print(" - 缓冲区: 5个音频块") - print(" - 词级识别: 已禁用 (提高性能)") - print(" - 性能监控: 每3秒显示") - print(" - 延迟监控: 实时显示") - print(" - 预期延迟: <1秒 (原10秒)") - print("=" * 50) - - # 开始运行 - system.start() - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/voice_recorder.py b/voice_recorder.py deleted file mode 100644 index b5b943f..0000000 --- a/voice_recorder.py +++ /dev/null @@ -1,344 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -语音录制模块 -基于pyaudio实现,支持语音活动检测(VAD)自动判断录音结束 -""" - -import pyaudio -import wave -import numpy as np -import time -import os -import threading -from collections import deque - -class VoiceRecorder: - """语音录制器,支持自动检测语音结束""" - - def __init__(self, - energy_threshold=500, - silence_threshold=1.0, - min_recording_time=0.5, - max_recording_time=10.0, - sample_rate=16000, - chunk_size=1024, - defer_audio_init=False): - """ - 初始化录音器 - - Args: - energy_threshold: 语音能量阈值 - silence_threshold: 静音持续时间阈值(秒) - min_recording_time: 最小录音时间(秒) - max_recording_time: 最大录音时间(秒) - sample_rate: 采样率 - chunk_size: 音频块大小 - defer_audio_init: 是否延迟音频初始化 - """ - self.energy_threshold = energy_threshold - self.silence_threshold = silence_threshold - self.min_recording_time = min_recording_time - self.max_recording_time = max_recording_time - self.sample_rate = sample_rate - self.chunk_size = chunk_size - self.defer_audio_init = defer_audio_init - - # 音频参数 - self.FORMAT = pyaudio.paInt16 - self.CHANNELS = 1 - - # 状态变量 - self.audio = None - self.stream = None - self.recording = False - self.recorded_frames = [] - - # 语音检测相关 - self.silence_start_time = None - self.recording_start_time = None - self.audio_buffer = deque(maxlen=int(sample_rate / chunk_size * 2)) # 2秒缓冲 - - # 回调函数 - self.on_recording_complete = None - self.on_speech_detected = None - - if not defer_audio_init: - self._setup_audio() - - def _setup_audio(self): - """设置音频设备""" - try: - self.audio = pyaudio.PyAudio() - - # 获取默认输入设备信息 - device_info = self.audio.get_default_input_device_info() - print(f"使用音频设备: {device_info['name']}") - - except Exception as e: - print(f"音频设备初始化失败: {e}") - raise - - def _calculate_energy(self, audio_data): - """计算音频能量""" - if len(audio_data) == 0: - return 0 - - # 转换为numpy数组 - audio_array = np.frombuffer(audio_data, dtype=np.int16) - - # 计算RMS能量 - rms = np.sqrt(np.mean(audio_array ** 2)) - return rms - - def _is_speech(self, audio_data): - """判断是否为语音""" - energy = self._calculate_energy(audio_data) - return energy > self.energy_threshold - - def _open_stream(self): - """打开音频流""" - if self.stream is not None: - return - - self.stream = self.audio.open( - format=self.FORMAT, - channels=self.CHANNELS, - rate=self.sample_rate, - input=True, - frames_per_buffer=self.chunk_size - ) - - def _close_stream(self): - """关闭音频流""" - if self.stream: - self.stream.stop_stream() - self.stream.close() - self.stream = None - - def start_listening(self): - """开始监听语音""" - if self.recording: - print("正在录音中...") - return - - self._open_stream() - self.recording = True - self.recorded_frames = [] - self.silence_start_time = None - self.recording_start_time = None - - print("开始监听语音...") - - # 在新线程中录音 - recording_thread = threading.Thread(target=self._record_loop) - recording_thread.daemon = True - recording_thread.start() - - def _record_loop(self): - """录音循环""" - try: - while self.recording: - # 读取音频数据 - data = self.stream.read(self.chunk_size, exception_on_overflow=False) - - if len(data) == 0: - continue - - # 计算能量 - energy = self._calculate_energy(data) - - # 添加到缓冲区 - self.audio_buffer.append(data) - - # 检测语音活动 - if energy > self.energy_threshold: - # 检测到语音 - if self.recording_start_time is None: - # 开始录音 - self.recording_start_time = time.time() - self.silence_start_time = None - self.recorded_frames = list(self.audio_buffer) # 包含之前的音频 - - print("🎤 检测到语音,开始录音...") - - if self.on_speech_detected: - self.on_speech_detected() - - # 重置静音计时 - self.silence_start_time = None - - # 录音 - self.recorded_frames.append(data) - - elif self.recording_start_time is not None: - # 之前有语音,现在检查是否静音 - if self.silence_start_time is None: - self.silence_start_time = time.time() - - # 继续录音 - self.recorded_frames.append(data) - - # 检查是否静音超时 - silence_duration = time.time() - self.silence_start_time - if silence_duration > self.silence_threshold: - recording_duration = time.time() - self.recording_start_time - - # 检查最小录音时间 - if recording_duration >= self.min_recording_time: - print(f"静音 {silence_duration:.1f}s,结束录音") - self.stop_recording() - break - else: - print(f"录音时间太短 ({recording_duration:.1f}s),继续等待...") - self.silence_start_time = time.time() - - # 检查最大录音时间 - if self.recording_start_time is not None: - recording_duration = time.time() - self.recording_start_time - if recording_duration > self.max_recording_time: - print(f"达到最大录音时间 {self.max_recording_time}s,结束录音") - self.stop_recording() - break - - # 短暂休眠 - time.sleep(0.01) - - except Exception as e: - print(f"录音过程中发生错误: {e}") - self.stop_recording() - - def stop_recording(self): - """停止录音""" - if not self.recording: - return - - self.recording = False - self._close_stream() - - if len(self.recorded_frames) > 0: - # 保存录音 - audio_data = b''.join(self.recorded_frames) - - print(f"录音完成,共 {len(self.recorded_frames)} 帧") - print(f"录音时长: {len(audio_data) / (self.sample_rate * 2):.2f} 秒") - - # 调用回调函数 - if self.on_recording_complete: - self.on_recording_complete(audio_data) - - # 重置状态 - self.recorded_frames = [] - self.silence_start_time = None - self.recording_start_time = None - - def save_audio(self, audio_data, filename): - """保存音频到文件""" - try: - with wave.open(filename, 'wb') as wf: - wf.setnchannels(self.CHANNELS) - wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) - wf.setframerate(self.sample_rate) - wf.writeframes(audio_data) - - print(f"音频已保存到: {filename}") - return True - except Exception as e: - print(f"保存音频失败: {e}") - return False - - def set_recording_complete_callback(self, callback): - """设置录音完成回调函数""" - self.on_recording_complete = callback - - def set_speech_detected_callback(self, callback): - """设置语音检测回调函数""" - self.on_speech_detected = callback - - def adjust_sensitivity(self, energy_threshold=None, silence_threshold=None): - """调整灵敏度""" - if energy_threshold is not None: - self.energy_threshold = energy_threshold - print(f"能量阈值调整为: {energy_threshold}") - - if silence_threshold is not None: - self.silence_threshold = silence_threshold - print(f"静音阈值调整为: {silence_threshold}秒") - - def get_audio_level(self): - """获取当前音频级别""" - if len(self.audio_buffer) > 0: - latest_data = self.audio_buffer[-1] - return self._calculate_energy(latest_data) - return 0 - - def cleanup(self): - """清理资源""" - self.stop_recording() - if self.audio: - self.audio.terminate() - self.audio = None - -def main(): - """测试录音功能""" - print("🎙️ 语音录制测试") - print("=" * 50) - print("配置:") - print("- 能量阈值: 500") - print("- 静音阈值: 1.0秒") - print("- 最小录音时间: 0.5秒") - print("- 最大录音时间: 10秒") - print("=" * 50) - print("请说话测试录音功能...") - print("按 Ctrl+C 退出") - - def on_recording_complete(audio_data): - """录音完成回调""" - # 保存录音文件 - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"recording_{timestamp}.wav" - - recorder.save_audio(audio_data, filename) - print(f"✅ 录音文件已保存: {filename}") - - # 显示录音信息 - duration = len(audio_data) / (recorder.sample_rate * 2) - print(f"录音时长: {duration:.2f} 秒") - - def on_speech_detected(): - """检测到语音回调""" - print("🔊 检测到语音活动...") - - # 创建录音器 - recorder = VoiceRecorder( - energy_threshold=500, - silence_threshold=1.0, - min_recording_time=0.5, - max_recording_time=10.0 - ) - - # 设置回调 - recorder.set_recording_complete_callback(on_recording_complete) - recorder.set_speech_detected_callback(on_speech_detected) - - try: - # 开始监听 - recorder.start_listening() - - # 保持程序运行 - while True: - time.sleep(0.1) - - # 显示当前音频级别(可选) - level = recorder.get_audio_level() - if level > 100: - print(f"当前音频级别: {level:.0f}", end='\r') - - except KeyboardInterrupt: - print("\n👋 退出录音测试") - finally: - recorder.cleanup() - -if __name__ == "__main__": - main() \ No newline at end of file