#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 基于能量检测的极简录音系统 专门针对树莓派3B优化,完全移除Vosk识别依赖 """ import asyncio import base64 import gzip import hmac import json import os import sys import threading import time import uuid import wave from io import BytesIO from urllib.parse import urlparse import numpy as np import pyaudio import requests try: import websockets except ImportError: print("⚠️ websockets 未安装,语音识别功能将不可用") websockets = None class EnergyBasedRecorder: """基于能量检测的录音系统""" def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0, enable_asr=True, enable_llm=True, enable_tts=True): # 音频参数 - 极简优化 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 # 16kHz采样率 self.CHUNK_SIZE = 1024 # 适中块大小 # 语音识别配置 self.enable_asr = enable_asr self.asr_appid = "8718217928" self.asr_token = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc" self.asr_cluster = "volcengine_input_common" self.asr_ws_url = "wss://openspeech.bytedance.com/api/v2/asr" # 大语言模型配置 self.enable_llm = enable_llm self.llm_api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions" self.llm_model = "doubao-seed-1-6-flash-250828" self.llm_api_key = os.environ.get("ARK_API_KEY", "") # 检查API密钥 if self.enable_llm and not self.llm_api_key: print("⚠️ 未设置 ARK_API_KEY 环境变量,大语言模型功能将被禁用") self.enable_llm = False # 文本转语音配置 self.enable_tts = enable_tts self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional" self.tts_app_id = "8718217928" self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc" self.tts_resource_id = "volc.service_type.10029" self.tts_app_key = "aGjiRDfUWi" self.tts_speaker = "zh_female_wanqudashu_moon_bigtts" # 检查音频播放能力 if self.enable_tts: self.audio_player_available = self._check_audio_player() if not self.audio_player_available: print("⚠️ 未找到音频播放器,TTS音频播放功能可能不可用") print(" 建议安装: sudo apt-get install alsa-utils") # 不禁用TTS功能,因为仍然可以生成文件 # 能量检测参数 self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音 self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束 self.min_recording_time = min_recording_time # 最小录音时间 self.max_recording_time = max_recording_time # 最大录音时间 self.pre_record_duration = 2.0 # 预录音时长(秒) # 状态变量 self.audio = None self.stream = None self.running = False self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None self.energy_history = [] self.zcr_history = [] # ZCR历史 self.max_energy_history = 50 # 最大能量历史记录 # 预录音缓冲区 self.pre_record_buffer = [] # 预录音缓冲区 self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数 # 播放状态 self.is_playing = False # 是否正在播放 # 智能静音检测 self.voice_activity_history = [] # 语音活动历史 self.max_voice_history = 20 # 最大语音活动历史记录 self.consecutive_silence_count = 0 # 连续静音计数 self.silence_threshold_count = 15 # 连续静音次数阈值(约1.5秒) # 智能ZCR静音检测 self.max_zcr_history = 30 # 最大ZCR历史记录 self.consecutive_low_zcr_count = 0 # 连续低ZCR计数 self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值(约2秒) # 性能监控 self.frame_count = 0 self.start_time = time.time() self._setup_audio() def _setup_audio(self): """设置音频设备""" try: self.audio = pyaudio.PyAudio() self.stream = self.audio.open( format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK_SIZE ) print("✅ 音频设备初始化成功") except Exception as e: print(f"❌ 音频设备初始化失败: {e}") def calculate_energy(self, audio_data): """计算音频能量""" if len(audio_data) == 0: return 0 # 将字节数据转换为numpy数组 audio_array = np.frombuffer(audio_data, dtype=np.int16) # 计算RMS能量,处理可能的无效值 try: rms = np.sqrt(np.mean(audio_array ** 2)) # 检查是否为有效值 if np.isnan(rms) or np.isinf(rms): return 0 # 更新能量历史(只在非录音状态下更新,避免语音影响背景噪音计算) if not self.recording: self.energy_history.append(rms) if len(self.energy_history) > self.max_energy_history: self.energy_history.pop(0) return rms except: return 0 def calculate_peak_energy(self, audio_data): """计算峰值能量(辅助判断)""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) peak_energy = np.max(np.abs(audio_array)) return peak_energy def calculate_zero_crossing_rate(self, audio_data): """计算零交叉率(主要语音检测方法)""" if len(audio_data) == 0: return 0 audio_array = np.frombuffer(audio_data, dtype=np.int16) # 计算零交叉次数 zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) # 归一化到采样率 zcr = zero_crossings / len(audio_array) * self.RATE # 更新ZCR历史 self.zcr_history.append(zcr) if len(self.zcr_history) > self.max_zcr_history: self.zcr_history.pop(0) return zcr def is_voice_active_advanced(self, energy, zcr): """仅使用ZCR进行语音活动检测""" # ZCR语音检测:调整到2400-12000 Hz之间,适应16000Hz采样率 # 16000Hz采样率时,正常语音的ZCR范围会翻倍 zcr_condition = 2400 < zcr < 12000 # 添加一些容错,避免短暂的ZCR波动导致误判 return zcr_condition def is_voice_active(self, energy): """已弃用 - 仅用于兼容性""" # 现在主要使用ZCR检测,这个方法保留但不再使用 return False def save_recording(self, audio_data, filename=None): """保存录音""" if filename is None: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.wav" try: with wave.open(filename, 'wb') as wf: wf.setnchannels(self.CHANNELS) wf.setsampwidth(self.audio.get_sample_size(self.FORMAT)) wf.setframerate(self.RATE) wf.writeframes(audio_data) print(f"✅ 录音已保存: {filename}") print(f"📊 音频格式参数:") print(f" - 采样率: {self.RATE} Hz") print(f" - 声道数: {self.CHANNELS}") print(f" - 位深度: {self.audio.get_sample_size(self.FORMAT) * 8} bits") print(f" - 格式: PCM int16 小端序") return True, filename except Exception as e: print(f"❌ 保存录音失败: {e}") return False, None def play_audio(self, filename): """播放音频文件""" try: print("🔇 准备播放,完全停止音频输入") # 立即停止当前录音并清空所有缓冲区 if self.recording: self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None # 清空所有缓冲区 self.pre_record_buffer = [] self.energy_history = [] self.zcr_history = [] # 完全关闭输入流 if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # 设置播放状态 self.is_playing = True # 等待一小段时间确保音频设备完全停止输入 time.sleep(0.5) with wave.open(filename, 'rb') as wf: channels = wf.getnchannels() width = wf.getsampwidth() rate = wf.getframerate() total_frames = wf.getnframes() # 分块读取音频数据 chunk_size = 1024 frames = [] for _ in range(0, total_frames, chunk_size): chunk = wf.readframes(chunk_size) if chunk: frames.append(chunk) else: break # 创建播放流 playback_stream = self.audio.open( format=self.audio.get_format_from_width(width), channels=channels, rate=rate, output=True ) print(f"🔊 开始播放: {filename}") print("🚫 音频输入已完全关闭") # 分块播放音频 for chunk in frames: playback_stream.write(chunk) playback_stream.stop_stream() playback_stream.close() print("✅ 播放完成") print("🔄 重新开启音频输入") except Exception as e: print(f"❌ 播放失败: {e}") self.play_with_system_player(filename) finally: # 恢复播放状态 self.is_playing = False # 等待播放完全结束 time.sleep(0.3) # 重新开启输入流 self._setup_audio() # 重置所有状态 self.energy_history = [] self.zcr_history = [] print("📡 音频输入已重新开启") def play_with_system_player(self, filename): """使用系统播放器播放音频""" try: import subprocess import platform # 获取文件扩展名 file_ext = filename.lower().split('.')[-1] if '.' in filename else '' # 根据文件类型和平台选择播放命令 if file_ext == 'mp3': # MP3文件播放 system = platform.system().lower() if system == 'linux': # Linux系统 - 尝试多个MP3播放器 mp3_players = [ ['mpg123', filename], # 最常用的MP3播放器 ['mpg321', filename], # 另一个MP3播放器 ['mplayer', filename], # 通用媒体播放器 ['cvlc', '--play-and-exit', filename], # VLC命令行版本 ['ffplay', '-nodisp', '-autoexit', filename] # FFmpeg播放器 ] cmd = None for player in mp3_players: try: subprocess.run(['which', player[0]], capture_output=True, check=True) cmd = player break except: continue if cmd is None: raise Exception("未找到可用的MP3播放器,请安装 mpg123 或 mplayer") elif system == 'darwin': # macOS cmd = ['afplay', filename] elif system == 'windows': cmd = ['cmd', '/c', 'start', '/min', filename] else: cmd = ['aplay', filename] # 默认,可能会失败 elif file_ext == 'pcm': # PCM文件播放 - 需要指定格式 cmd = ['aplay', '-f', 'S16_LE', '-r', '16000', '-c', '1', filename] else: # WAV文件或其他格式 cmd = ['aplay', filename] # Linux系统 print(f"🔊 使用系统播放器: {' '.join(cmd)}") print("🚫 系统播放器播放中,音频输入保持关闭") subprocess.run(cmd, check=True) print("✅ 播放完成") print("📡 音频输入已保持关闭状态") except Exception as e: print(f"❌ 系统播放器失败: {e}") # 尝试使用pygame作为备选方案 try: self._play_with_pygame(filename) except Exception as pygame_error: print(f"❌ pygame播放也失败: {pygame_error}") raise e def _check_audio_player(self): """检查系统是否支持音频播放""" try: import subprocess import platform system = platform.system().lower() if system == 'linux': # 检查aplay(用于PCM和WAV播放) try: subprocess.run(['which', 'aplay'], capture_output=True, check=True) print("✅ 找到音频播放器: aplay") return True except: pass # 检查pygame作为备选方案 try: import pygame print("✅ 找到pygame作为音频播放备选方案") return True except ImportError: pass return False elif system == 'darwin': # macOS # 检查afplay try: subprocess.run(['which', 'afplay'], capture_output=True, check=True) print("✅ 找到音频播放器: afplay") return True except: return False elif system == 'windows': # Windows通常支持音频播放 return True else: return False except Exception as e: print(f"❌ 检查音频播放器时出错: {e}") return False def _play_with_pygame(self, filename): """使用pygame播放音频作为备选方案""" try: import pygame pygame.mixer.init() print(f"🔊 尝试使用pygame播放: {filename}") # 加载并播放音频 sound = pygame.mixer.Sound(filename) sound.play() # 等待播放完成 while pygame.mixer.get_busy(): pygame.time.Clock().tick(10) print("✅ pygame播放完成") except ImportError: raise Exception("pygame未安装") except Exception as e: raise Exception(f"pygame播放失败: {e}") finally: try: pygame.mixer.quit() except: pass def play_audio_safe(self, filename, reopen_input=False): """安全的播放方式 - 使用系统播放器""" try: print("🔇 准备播放,完全停止音频输入") # 立即停止当前录音并清空所有缓冲区 if self.recording: self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None # 清空所有缓冲区 self.pre_record_buffer = [] self.energy_history = [] self.zcr_history = [] # 完全关闭输入流 if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # 设置播放状态 self.is_playing = True # 等待确保音频设备完全停止 time.sleep(0.5) print(f"🔊 开始播放: {filename}") print("🚫 使用系统播放器,音频输入已完全关闭") # 使用系统播放器 self.play_with_system_player(filename) if reopen_input: print("🔄 准备重新开启音频输入") except Exception as e: print(f"❌ 播放失败: {e}") finally: # 恢复播放状态 self.is_playing = False # 等待播放完全结束 time.sleep(0.5) # 只在需要时重新开启输入流 if reopen_input: # 重新开启输入流 self._setup_audio() # 重置所有状态 self.energy_history = [] self.zcr_history = [] print("📡 音频输入已重新开启") def update_pre_record_buffer(self, audio_data): """更新预录音缓冲区""" self.pre_record_buffer.append(audio_data) # 保持缓冲区大小 if len(self.pre_record_buffer) > self.pre_record_max_frames: self.pre_record_buffer.pop(0) def start_recording(self): """开始录音""" print("🎙️ 检测到声音,开始录音...") self.recording = True self.recorded_frames = [] # 将预录音缓冲区的内容添加到录音中 self.recorded_frames.extend(self.pre_record_buffer) # 清空预录音缓冲区 self.pre_record_buffer = [] self.recording_start_time = time.time() self.last_sound_time = time.time() self.energy_history = [] self.zcr_history = [] # 重置ZCR历史 # 重置ZCR相关计数器 self.consecutive_low_zcr_count = 0 self.consecutive_silence_count = 0 self.voice_activity_history = [] def stop_recording(self): """停止录音""" if len(self.recorded_frames) > 0: audio_data = b''.join(self.recorded_frames) duration = len(audio_data) / (self.RATE * 2) # 16位音频,每样本2字节 # 计算实际录音时长和预录音时长 actual_duration = duration pre_record_duration = min(duration, self.pre_record_duration) print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)") # 保存录音 success, filename = self.save_recording(audio_data) # 如果保存成功,进行后续处理 if success and filename: print("=" * 50) print("📡 音频输入已保持关闭状态") print("🔄 开始处理音频...") # 语音识别和LLM调用 if self.enable_asr and websockets is not None: print("🤖 开始语音识别...") asr_result = self.recognize_audio_sync(filename) if asr_result and 'payload_msg' in asr_result: result_text = asr_result['payload_msg'].get('result', []) if result_text: text = result_text[0].get('text', '识别失败') print(f"📝 识别结果: {text}") # 调用大语言模型 if self.enable_llm and text != '识别失败': print("-" * 50) llm_response = self.call_llm(text) if llm_response: print(f"💬 AI助手回复: {llm_response}") # 调用文本转语音 if self.enable_tts: print("-" * 50) tts_file = self.text_to_speech(llm_response) if tts_file: print("✅ AI语音回复完成") else: print("❌ 文本转语音失败") else: print("ℹ️ 文本转语音功能已禁用") else: print("❌ 大语言模型调用失败") else: if not self.enable_llm: print("ℹ️ 大语言模型功能已禁用") elif not self.llm_api_key: print("ℹ️ 请设置 ARK_API_KEY 环境变量以启用大语言模型功能") else: print("❌ 语音识别失败: 无结果") else: print("❌ 语音识别失败") else: if not self.enable_asr: print("ℹ️ 语音识别功能已禁用") elif websockets is None: print("ℹ️ 请安装 websockets 库以启用语音识别功能") print("🔄 准备重新开启音频输入") self.recording = False self.recorded_frames = [] self.recording_start_time = None self.last_sound_time = None self.energy_history = [] self.zcr_history = [] def get_average_energy(self): """计算平均能量""" if len(self.energy_history) == 0: return 0 return np.mean(self.energy_history) def monitor_performance(self): """性能监控""" self.frame_count += 1 if self.frame_count % 1000 == 0: # 每1000帧显示一次 elapsed = time.time() - self.start_time fps = self.frame_count / elapsed avg_energy = self.get_average_energy() print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}") def auto_adjust_threshold(self): """自动调整能量阈值""" if len(self.energy_history) >= 20: # 基于历史能量的中位数和标准差调整阈值 median_energy = np.median(self.energy_history) std_energy = np.std(self.energy_history) # 设置阈值为中位数 + 2倍标准差 new_threshold = max(300, median_energy + 2 * std_energy) # 平滑调整阈值 self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold def run(self): """运行录音系统""" if not self.stream: print("❌ 音频设备未初始化") return self.running = True print("🎤 开始监听...") print(f"能量阈值: {self.energy_threshold} (已弃用)") print(f"静音阈值: {self.silence_threshold}秒") print("📖 使用说明:") print("- 检测到声音自动开始录音") print("- 持续静音3秒自动结束录音") print("- 最少录音2秒,最多30秒") print("- 录音完成后自动播放") print("- 按 Ctrl+C 退出") print("🎯 新增功能:") print("- 纯ZCR语音检测(移除能量检测)") print("- 零交叉率检测(区分语音和噪音)") print("- 实时显示ZCR状态") print("- 预录音功能(包含声音开始前2秒)") print("- 环形缓冲区防止丢失开头音频") print("🤖 纯ZCR静音检测:") print("- 连续低ZCR计数(20次=2秒)") print("- ZCR活动历史追踪") print("- 基于ZCR模式的静音验证") print("- 语音范围: 2400-12000 Hz (适应16kHz采样率)") print("=" * 50) try: while self.running: # 检查音频流是否可用 if self.stream is None: print("\n❌ 音频流已断开,尝试重新连接...") self._setup_audio() if self.stream is None: print("❌ 音频流重连失败,等待...") time.sleep(1) continue # 读取音频数据 try: data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) except Exception as e: print(f"\n❌ 读取音频数据失败: {e}") self.stream = None continue if len(data) == 0: continue # 如果正在播放,完全跳过音频处理 if self.is_playing: # 显示播放状态 status = "🔊 播放中... 跳过录音处理" print(f"\r{status}", end='', flush=True) time.sleep(0.1) # 播放时增加延迟减少CPU使用 continue # 计算能量和零交叉率 energy = self.calculate_energy(data) zcr = self.calculate_zero_crossing_rate(data) peak_energy = self.calculate_peak_energy(data) # 性能监控 self.monitor_performance() if self.recording: # 录音模式 self.recorded_frames.append(data) recording_duration = time.time() - self.recording_start_time # 基于ZCR的智能静音检测 if self.is_voice_active_advanced(energy, zcr): self.last_sound_time = time.time() self.consecutive_low_zcr_count = 0 # 重置低ZCR计数 self.consecutive_silence_count = 0 # 重置静音计数 else: self.consecutive_low_zcr_count += 1 # 增加低ZCR计数 self.consecutive_silence_count += 1 # 增加静音计数 # 更新ZCR活动历史(基于ZCR是否在语音范围内) self.voice_activity_history.append(2400 < zcr < 12000) if len(self.voice_activity_history) > self.max_voice_history: self.voice_activity_history.pop(0) # 检查是否应该结束录音 current_time = time.time() # 纯ZCR静音检测 should_stop = False stop_reason = "" # 主要检测:连续低ZCR计数 if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count: # 进一步验证:检查最近的ZCR活动历史 if len(self.voice_activity_history) >= 15: recent_voice_activity = sum(self.voice_activity_history[-15:]) if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动 should_stop = True stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)" else: # 如果历史数据不足,使用基础检测 should_stop = True stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)" # 备用检测:基于时间的静音检测 if not should_stop and current_time - self.last_sound_time > self.silence_threshold: should_stop = True stop_reason = f"时间静音检测 ({self.silence_threshold}秒)" # 执行停止录音 if should_stop and recording_duration >= self.min_recording_time: print(f"\n🔇 {stop_reason},结束录音") self.stop_recording() # 检查最大录音时间 if recording_duration > self.max_recording_time: print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}秒") self.stop_recording() # 显示录音状态(仅ZCR相关信息) is_voice = self.is_voice_active_advanced(energy, zcr) zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}" recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0 status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}" print(f"\r{status}", end='', flush=True) else: # 监听模式 - 更新预录音缓冲区 self.update_pre_record_buffer(data) # 使用高级检测 if self.is_voice_active_advanced(energy, zcr): # 检测到声音,开始录音 self.start_recording() else: # 显示监听状态(仅ZCR相关信息) is_voice = self.is_voice_active_advanced(energy, zcr) buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100 status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%" print(f"\r{status}", end='', flush=True) # 减少CPU使用 time.sleep(0.01) except KeyboardInterrupt: print("\n👋 退出") except Exception as e: print(f"❌ 错误: {e}") finally: self.stop() def stop(self): """停止系统""" self.running = False if self.recording: self.stop_recording() if self.stream: self.stream.stop_stream() self.stream.close() if self.audio: self.audio.terminate() def generate_asr_header(self, message_type=1, message_type_specific_flags=0): """生成ASR请求头部""" PROTOCOL_VERSION = 0b0001 DEFAULT_HEADER_SIZE = 0b0001 JSON = 0b0001 GZIP = 0b0001 header = bytearray() header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE) header.append((message_type << 4) | message_type_specific_flags) header.append((JSON << 4) | GZIP) header.append(0x00) # reserved return header def parse_asr_response(self, res): """解析ASR响应""" PROTOCOL_VERSION = res[0] >> 4 header_size = res[0] & 0x0f message_type = res[1] >> 4 message_type_specific_flags = res[1] & 0x0f serialization_method = res[2] >> 4 message_compression = res[2] & 0x0f reserved = res[3] header_extensions = res[4:header_size * 4] payload = res[header_size * 4:] result = {} payload_msg = None payload_size = 0 if message_type == 0b1001: # SERVER_FULL_RESPONSE payload_size = int.from_bytes(payload[:4], "big", signed=True) payload_msg = payload[4:] elif message_type == 0b1011: # SERVER_ACK seq = int.from_bytes(payload[:4], "big", signed=True) result['seq'] = seq if len(payload) >= 8: payload_size = int.from_bytes(payload[4:8], "big", signed=False) payload_msg = payload[8:] elif message_type == 0b1111: # SERVER_ERROR_RESPONSE code = int.from_bytes(payload[:4], "big", signed=False) result['code'] = code payload_size = int.from_bytes(payload[4:8], "big", signed=False) payload_msg = payload[8:] if payload_msg is None: return result if message_compression == 0b0001: # GZIP payload_msg = gzip.decompress(payload_msg) if serialization_method == 0b0001: # JSON payload_msg = json.loads(str(payload_msg, "utf-8")) result['payload_msg'] = payload_msg result['payload_size'] = payload_size return result async def recognize_audio(self, audio_path): """识别音频文件""" if not self.enable_asr or websockets is None: return None try: print("🤖 开始语音识别...") # 读取音频文件 with open(audio_path, mode="rb") as f: audio_data = f.read() # 构建请求 reqid = str(uuid.uuid4()) request_params = { 'app': { 'appid': self.asr_appid, 'cluster': self.asr_cluster, 'token': self.asr_token, }, 'user': { 'uid': 'recorder_asr' }, 'request': { 'reqid': reqid, 'nbest': 1, 'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate', 'show_language': False, 'show_utterances': False, 'result_type': 'full', "sequence": 1 }, 'audio': { 'format': 'wav', 'rate': self.RATE, 'language': 'zh-CN', 'bits': 16, 'channel': self.CHANNELS, 'codec': 'raw' } } # 构建头部 payload_bytes = str.encode(json.dumps(request_params)) payload_bytes = gzip.compress(payload_bytes) full_client_request = bytearray(self.generate_asr_header()) full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) full_client_request.extend(payload_bytes) # 设置认证头 additional_headers = {'Authorization': 'Bearer; {}'.format(self.asr_token)} # 连接WebSocket并发送请求 async with websockets.connect(self.asr_ws_url, additional_headers=additional_headers, max_size=1000000000) as ws: # 发送完整请求 await ws.send(full_client_request) res = await ws.recv() result = self.parse_asr_response(res) if 'payload_msg' in result and result['payload_msg'].get('code') != 1000: print(f"❌ ASR请求失败: {result['payload_msg']}") return None # 分块发送音频数据 chunk_size = int(self.CHANNELS * 2 * self.RATE * 15000 / 1000) # 15ms chunks for offset in range(0, len(audio_data), chunk_size): chunk = audio_data[offset:offset + chunk_size] last = (offset + chunk_size) >= len(audio_data) payload_bytes = gzip.compress(chunk) audio_only_request = bytearray(self.generate_asr_header(message_type=0b0010, message_type_specific_flags=0b0010 if last else 0)) audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big')) audio_only_request.extend(payload_bytes) await ws.send(audio_only_request) res = await ws.recv() result = self.parse_asr_response(res) return result except Exception as e: print(f"❌ 语音识别失败: {e}") return None def recognize_audio_sync(self, audio_path): """同步版本的语音识别""" if not self.enable_asr or websockets is None: return None try: return asyncio.run(self.recognize_audio(audio_path)) except Exception as e: print(f"❌ 语音识别失败: {e}") return None def call_llm(self, user_message): """调用大语言模型API""" if not self.enable_llm: return None try: print("🤖 调用大语言模型...") headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.llm_api_key}" } data = { "model": self.llm_model, "messages": [ { "role": "system", "content": "你是唐朝大诗人李白,用简短诗词和小朋友对话,每次回答不超过50字。" }, { "role": "user", "content": user_message } ], "max_tokens": 50 } response = requests.post(self.llm_api_url, headers=headers, json=data, timeout=30) if response.status_code == 200: result = response.json() if "choices" in result and len(result["choices"]) > 0: llm_response = result["choices"][0]["message"]["content"] return llm_response.strip() else: print("❌ LLM API响应格式错误") return None else: print(f"❌ LLM API调用失败: {response.status_code}") print(f"响应内容: {response.text}") return None except requests.exceptions.RequestException as e: print(f"❌ 网络请求失败: {e}") return None except Exception as e: print(f"❌ LLM调用失败: {e}") return None def generate_tts_filename(self): """生成TTS文件名""" timestamp = time.strftime("%Y%m%d_%H%M%S") return f"tts_response_{timestamp}.pcm" def text_to_speech(self, text): """文本转语音""" if not self.enable_tts: return None try: print("🔊 开始文本转语音...") # 生成输出文件名 output_file = self.generate_tts_filename() # 构建请求头 headers = { "X-Api-App-Id": self.tts_app_id, "X-Api-Access-Key": self.tts_access_key, "X-Api-Resource-Id": self.tts_resource_id, "X-Api-App-Key": self.tts_app_key, "Content-Type": "application/json", "Connection": "keep-alive" } # 构建请求参数 payload = { "user": { "uid": "recorder_tts" }, "req_params": { "text": text, "speaker": self.tts_speaker, "audio_params": { "format": "pcm", "sample_rate": 16000, "enable_timestamp": True }, "additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}" } } # 发送请求 session = requests.Session() try: response = session.post(self.tts_url, headers=headers, json=payload, stream=True) if response.status_code != 200: print(f"❌ TTS请求失败: {response.status_code}") print(f"响应内容: {response.text}") return None # 处理流式响应 audio_data = bytearray() total_audio_size = 0 for chunk in response.iter_lines(decode_unicode=True): if not chunk: continue try: data = json.loads(chunk) if data.get("code", 0) == 0 and "data" in data and data["data"]: chunk_audio = base64.b64decode(data["data"]) audio_size = len(chunk_audio) total_audio_size += audio_size audio_data.extend(chunk_audio) continue if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]: print("TTS句子信息:", data["sentence"]) continue if data.get("code", 0) == 20000000: break if data.get("code", 0) > 0: print(f"❌ TTS错误响应: {data}") break except json.JSONDecodeError: print(f"❌ 解析TTS响应失败: {chunk}") continue # 保存音频文件 if audio_data: with open(output_file, "wb") as f: f.write(audio_data) print(f"✅ TTS音频已保存: {output_file}") print(f"📁 文件大小: {len(audio_data) / 1024:.2f} KB") # 确保文件有正确的访问权限 os.chmod(output_file, 0o644) # 播放生成的音频 if hasattr(self, 'audio_player_available') and self.audio_player_available: print("🔊 播放AI语音回复...") self.play_audio_safe(output_file, reopen_input=False) else: print("ℹ️ 跳过播放TTS音频(无可用播放器)") print(f"📁 TTS音频已保存到: {output_file}") return output_file else: print("❌ 未接收到TTS音频数据") return None finally: response.close() session.close() except Exception as e: print(f"❌ TTS转换失败: {e}") return None def main(): """主函数""" print("🚀 基于能量检测的极简录音系统") print("🤖 集成语音识别功能") print("=" * 50) # 创建录音系统 recorder = EnergyBasedRecorder( energy_threshold=200, # 能量阈值(降低以提高灵敏度) silence_threshold=3.0, # 静音阈值(秒)- 改为3秒 min_recording_time=2.0, # 最小录音时间 max_recording_time=30.0, # 最大录音时间 enable_asr=True, # 启用语音识别功能 enable_llm=True, # 启用大语言模型功能 enable_tts=True # 启用文本转语音功能 ) print("✅ 系统初始化成功") print("🎯 功能特点:") print(" - 完全移除Vosk识别依赖") print(" - 基于ZCR语音检测,精确识别") print(" - 集成在线语音识别(字节跳动ASR)") print(" - 集成大语言模型(豆包大模型)") print(" - 集成文本转语音(字节跳动TTS)") print(" - 录音完成后自动语音识别") print(" - 语音识别后自动调用AI助手") print(" - AI回复后自动转换为语音") print(" - 预录音功能(包含声音开始前2秒)") print(" - 环形缓冲区防止丢失开头音频") print(" - 自动调整能量阈值") print(" - 实时性能监控") print(" - 预期延迟: <0.1秒") print("=" * 50) # 显示API密钥状态 if not recorder.enable_llm: print("🔑 提示: 如需启用大语言模型功能,请设置环境变量:") print(" export ARK_API_KEY='your_api_key_here'") print("=" * 50) # 开始运行 recorder.run() if __name__ == "__main__": main()