Local-Voice/recorder.py
2025-09-20 18:01:49 +08:00

1387 lines
55 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于能量检测的极简录音系统
专门针对树莓派3B优化完全移除Vosk识别依赖
"""
import asyncio
import base64
import gzip
import hmac
import json
import os
import sys
import threading
import time
import uuid
import wave
import argparse
from io import BytesIO
from urllib.parse import urlparse
import numpy as np
import pyaudio
import requests
try:
import websockets
except ImportError:
print("⚠️ websockets 未安装,语音识别功能将不可用")
websockets = None
class EnergyBasedRecorder:
"""基于能量检测的录音系统"""
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0, enable_asr=True, enable_llm=True, enable_tts=True, character="libai"):
# 音频参数 - 极简优化
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000 # 16kHz采样率
self.CHUNK_SIZE = 1024 # 适中块大小
# 语音识别配置
self.enable_asr = enable_asr
self.asr_appid = "8718217928"
self.asr_token = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.asr_cluster = "volcengine_input_common"
self.asr_ws_url = "wss://openspeech.bytedance.com/api/v2/asr"
# 大语言模型配置
self.enable_llm = enable_llm
self.llm_api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
self.llm_model = "doubao-seed-1-6-flash-250828"
self.llm_api_key = os.environ.get("ARK_API_KEY", "")
# 检查API密钥
if self.enable_llm and not self.llm_api_key:
print("⚠️ 未设置 ARK_API_KEY 环境变量,大语言模型功能将被禁用")
self.enable_llm = False
# 文本转语音配置
self.enable_tts = enable_tts
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
self.tts_app_id = "8718217928"
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.tts_resource_id = "volc.service_type.10029"
self.tts_app_key = "aGjiRDfUWi"
self.tts_speaker = "zh_female_wanqudashu_moon_bigtts"
# 角色配置
self.current_character = character
self.characters_dir = os.path.join(os.path.dirname(__file__), "characters")
self.available_characters = self._load_available_characters()
self.character_config = self._load_character_config(character)
# 如果加载了角色配置更新TTS音色
if self.character_config and "voice" in self.character_config:
self.tts_speaker = self.character_config["voice"]
# 聊天历史记录
self.chat_history = []
self.max_history_length = 10 # 最多保存10轮对话
# 检查音频播放能力
if self.enable_tts:
self.audio_player_available = self._check_audio_player()
if not self.audio_player_available:
print("⚠️ 未找到音频播放器TTS音频播放功能可能不可用")
print(" 建议安装: sudo apt-get install alsa-utils")
# 不禁用TTS功能因为仍然可以生成文件
# 能量检测参数
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
self.min_recording_time = min_recording_time # 最小录音时间
self.max_recording_time = max_recording_time # 最大录音时间
self.pre_record_duration = 2.0 # 预录音时长(秒)
# 状态变量
self.audio = None
self.stream = None
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = [] # ZCR历史
self.max_energy_history = 50 # 最大能量历史记录
# 预录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数
# 播放状态
self.is_playing = False # 是否正在播放
# 智能静音检测
self.voice_activity_history = [] # 语音活动历史
self.max_voice_history = 20 # 最大语音活动历史记录
self.consecutive_silence_count = 0 # 连续静音计数
self.silence_threshold_count = 15 # 连续静音次数阈值约1.5秒)
# 智能ZCR静音检测
self.max_zcr_history = 30 # 最大ZCR历史记录
self.consecutive_low_zcr_count = 0 # 连续低ZCR计数
self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值约2秒
# 性能监控
self.frame_count = 0
self.start_time = time.time()
self._setup_audio()
def _load_available_characters(self):
"""加载可用角色列表"""
characters = []
if os.path.exists(self.characters_dir):
for file in os.listdir(self.characters_dir):
if file.endswith('.json'):
characters.append(file[:-5]) # 去掉.json后缀
return characters
def _load_character_config(self, character_name):
"""加载角色配置"""
config_file = os.path.join(self.characters_dir, f"{character_name}.json")
if not os.path.exists(config_file):
print(f"⚠️ 角色配置文件不存在: {config_file}")
return None
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"✅ 加载角色: {config.get('name', character_name)}")
print(f"📝 描述: {config.get('description', '无描述')}")
# 如果切换了角色,清空聊天历史
if hasattr(self, 'current_character') and self.current_character != character_name:
self.clear_chat_history()
print(f"🔄 角色已切换,聊天历史已清空")
return config
except Exception as e:
print(f"❌ 加载角色配置失败: {e}")
return None
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量处理可能的无效值
try:
rms = np.sqrt(np.mean(audio_array ** 2))
# 检查是否为有效值
if np.isnan(rms) or np.isinf(rms):
return 0
# 更新能量历史(只在非录音状态下更新,避免语音影响背景噪音计算)
if not self.recording:
self.energy_history.append(rms)
if len(self.energy_history) > self.max_energy_history:
self.energy_history.pop(0)
return rms
except:
return 0
def calculate_peak_energy(self, audio_data):
"""计算峰值能量(辅助判断)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
peak_energy = np.max(np.abs(audio_array))
return peak_energy
def calculate_zero_crossing_rate(self, audio_data):
"""计算零交叉率(主要语音检测方法)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算零交叉次数
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
# 归一化到采样率
zcr = zero_crossings / len(audio_array) * self.RATE
# 更新ZCR历史
self.zcr_history.append(zcr)
if len(self.zcr_history) > self.max_zcr_history:
self.zcr_history.pop(0)
return zcr
def is_voice_active_advanced(self, energy, zcr):
"""仅使用ZCR进行语音活动检测"""
# ZCR语音检测调整到2400-12000 Hz之间适应16000Hz采样率
# 16000Hz采样率时正常语音的ZCR范围会翻倍
zcr_condition = 2400 < zcr < 12000
# 添加一些容错避免短暂的ZCR波动导致误判
return zcr_condition
def is_voice_active(self, energy):
"""已弃用 - 仅用于兼容性"""
# 现在主要使用ZCR检测这个方法保留但不再使用
return False
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
print(f"📊 音频格式参数:")
print(f" - 采样率: {self.RATE} Hz")
print(f" - 声道数: {self.CHANNELS}")
print(f" - 位深度: {self.audio.get_sample_size(self.FORMAT) * 8} bits")
print(f" - 格式: PCM int16 小端序")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
print("🔇 准备播放,完全停止音频输入")
# 立即停止当前录音并清空所有缓冲区
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空所有缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 完全关闭输入流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# 设置播放状态
self.is_playing = True
# 等待一小段时间确保音频设备完全停止输入
time.sleep(0.5)
with wave.open(filename, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
print("🚫 音频输入已完全关闭")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
print("🔄 重新开启音频输入")
except Exception as e:
print(f"❌ 播放失败: {e}")
self.play_with_system_player(filename)
finally:
# 恢复播放状态
self.is_playing = False
# 等待播放完全结束
time.sleep(0.3)
# 重新开启输入流
self._setup_audio()
# 重置所有状态
self.energy_history = []
self.zcr_history = []
print("📡 音频输入已重新开启")
def play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import subprocess
import platform
# 获取文件扩展名
file_ext = filename.lower().split('.')[-1] if '.' in filename else ''
# 根据文件类型和平台选择播放命令
if file_ext == 'mp3':
# MP3文件播放
system = platform.system().lower()
if system == 'linux':
# Linux系统 - 尝试多个MP3播放器
mp3_players = [
['mpg123', filename], # 最常用的MP3播放器
['mpg321', filename], # 另一个MP3播放器
['mplayer', filename], # 通用媒体播放器
['cvlc', '--play-and-exit', filename], # VLC命令行版本
['ffplay', '-nodisp', '-autoexit', filename] # FFmpeg播放器
]
cmd = None
for player in mp3_players:
try:
subprocess.run(['which', player[0]], capture_output=True, check=True)
cmd = player
break
except:
continue
if cmd is None:
raise Exception("未找到可用的MP3播放器请安装 mpg123 或 mplayer")
elif system == 'darwin': # macOS
cmd = ['afplay', filename]
elif system == 'windows':
cmd = ['cmd', '/c', 'start', '/min', filename]
else:
cmd = ['aplay', filename] # 默认,可能会失败
elif file_ext == 'pcm':
# PCM文件播放 - 需要指定格式
cmd = ['aplay', '-f', 'S16_LE', '-r', '16000', '-c', '1', filename]
else:
# WAV文件或其他格式
cmd = ['aplay', filename] # Linux系统
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
print("🚫 系统播放器播放中,音频输入保持关闭")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
print("📡 音频输入已保持关闭状态")
except Exception as e:
print(f"❌ 系统播放器失败: {e}")
# 尝试使用pygame作为备选方案
try:
self._play_with_pygame(filename)
except Exception as pygame_error:
print(f"❌ pygame播放也失败: {pygame_error}")
raise e
def _check_audio_player(self):
"""检查系统是否支持音频播放"""
try:
import subprocess
import platform
system = platform.system().lower()
if system == 'linux':
# 检查aplay用于PCM和WAV播放
try:
subprocess.run(['which', 'aplay'], capture_output=True, check=True)
print("✅ 找到音频播放器: aplay")
return True
except:
pass
# 检查pygame作为备选方案
try:
import pygame
print("✅ 找到pygame作为音频播放备选方案")
return True
except ImportError:
pass
return False
elif system == 'darwin': # macOS
# 检查afplay
try:
subprocess.run(['which', 'afplay'], capture_output=True, check=True)
print("✅ 找到音频播放器: afplay")
return True
except:
return False
elif system == 'windows':
# Windows通常支持音频播放
return True
else:
return False
except Exception as e:
print(f"❌ 检查音频播放器时出错: {e}")
return False
def _play_with_pygame(self, filename):
"""使用pygame播放音频作为备选方案"""
try:
import pygame
pygame.mixer.init()
print(f"🔊 尝试使用pygame播放: {filename}")
# 加载并播放音频
sound = pygame.mixer.Sound(filename)
sound.play()
# 等待播放完成
while pygame.mixer.get_busy():
pygame.time.Clock().tick(10)
print("✅ pygame播放完成")
except ImportError:
raise Exception("pygame未安装")
except Exception as e:
raise Exception(f"pygame播放失败: {e}")
finally:
try:
pygame.mixer.quit()
except:
pass
def play_audio_safe(self, filename, reopen_input=False):
"""安全的播放方式 - 使用系统播放器"""
try:
print("🔇 准备播放,完全停止音频输入")
# 立即停止当前录音并清空所有缓冲区
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空所有缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 完全关闭输入流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# 设置播放状态
self.is_playing = True
# 等待确保音频设备完全停止
time.sleep(0.5)
print(f"🔊 开始播放: {filename}")
print("🚫 使用系统播放器,音频输入已完全关闭")
# 使用系统播放器
self.play_with_system_player(filename)
if reopen_input:
print("🔄 准备重新开启音频输入")
except Exception as e:
print(f"❌ 播放失败: {e}")
finally:
# 恢复播放状态
self.is_playing = False
# 等待播放完全结束
time.sleep(0.5)
# 只在需要时重新开启输入流
if reopen_input:
# 重新开启输入流
self._setup_audio()
# 重置所有状态
self.energy_history = []
self.zcr_history = []
print("📡 音频输入已重新开启")
def update_pre_record_buffer(self, audio_data):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
# 保持缓冲区大小
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
# 将预录音缓冲区的内容添加到录音中
self.recorded_frames.extend(self.pre_record_buffer)
# 清空预录音缓冲区
self.pre_record_buffer = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.energy_history = []
self.zcr_history = [] # 重置ZCR历史
# 重置ZCR相关计数器
self.consecutive_low_zcr_count = 0
self.consecutive_silence_count = 0
self.voice_activity_history = []
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2) # 16位音频每样本2字节
# 计算实际录音时长和预录音时长
actual_duration = duration
pre_record_duration = min(duration, self.pre_record_duration)
print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)")
# 保存录音
success, filename = self.save_recording(audio_data)
# 如果保存成功,进行后续处理
if success and filename:
print("=" * 50)
print("📡 音频输入已保持关闭状态")
print("🔄 开始处理音频...")
# 语音识别和LLM调用
if self.enable_asr and websockets is not None:
print("🤖 开始语音识别...")
asr_result = self.recognize_audio_sync(filename)
if asr_result and 'payload_msg' in asr_result:
result_text = asr_result['payload_msg'].get('result', [])
if result_text:
text = result_text[0].get('text', '识别失败')
print(f"📝 识别结果: {text}")
# 调用大语言模型
if self.enable_llm and text != '识别失败':
print("-" * 50)
llm_response = self.call_llm(text)
if llm_response:
print(f"💬 AI助手回复: {llm_response}")
# 调用文本转语音
if self.enable_tts:
print("-" * 50)
tts_file = self.text_to_speech(llm_response)
if tts_file:
print("✅ AI语音回复完成")
# 删除录音文件
self._safe_delete_file(filename, "录音文件")
else:
print("❌ 文本转语音失败")
else:
print(" 文本转语音功能已禁用")
# 如果不启用TTS直接删除录音文件
self._safe_delete_file(filename, "录音文件")
else:
print("❌ 大语言模型调用失败")
else:
if not self.enable_llm:
print(" 大语言模型功能已禁用")
elif not self.llm_api_key:
print(" 请设置 ARK_API_KEY 环境变量以启用大语言模型功能")
else:
print("❌ 语音识别失败: 无结果")
else:
print("❌ 语音识别失败")
else:
if not self.enable_asr:
print(" 语音识别功能已禁用")
elif websockets is None:
print(" 请安装 websockets 库以启用语音识别功能")
print("🔄 准备重新开启音频输入")
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = []
def get_average_energy(self):
"""计算平均能量"""
if len(self.energy_history) == 0:
return 0
return np.mean(self.energy_history)
def monitor_performance(self):
"""性能监控"""
self.frame_count += 1
if self.frame_count % 1000 == 0: # 每1000帧显示一次
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
avg_energy = self.get_average_energy()
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
def auto_adjust_threshold(self):
"""自动调整能量阈值"""
if len(self.energy_history) >= 20:
# 基于历史能量的中位数和标准差调整阈值
median_energy = np.median(self.energy_history)
std_energy = np.std(self.energy_history)
# 设置阈值为中位数 + 2倍标准差
new_threshold = max(300, median_energy + 2 * std_energy)
# 平滑调整阈值
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
def run(self):
"""运行录音系统"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"能量阈值: {self.energy_threshold} (已弃用)")
print(f"静音阈值: {self.silence_threshold}")
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音3秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("🎯 新增功能:")
print("- 纯ZCR语音检测移除能量检测")
print("- 零交叉率检测(区分语音和噪音)")
print("- 实时显示ZCR状态")
print("- 预录音功能包含声音开始前2秒")
print("- 环形缓冲区防止丢失开头音频")
print("🤖 纯ZCR静音检测:")
print("- 连续低ZCR计数20次=2秒")
print("- ZCR活动历史追踪")
print("- 基于ZCR模式的静音验证")
print("- 语音范围: 2400-12000 Hz (适应16kHz采样率)")
print("=" * 50)
try:
while self.running:
# 检查音频流是否可用
if self.stream is None:
print("\n❌ 音频流已断开,尝试重新连接...")
self._setup_audio()
if self.stream is None:
print("❌ 音频流重连失败,等待...")
time.sleep(1)
continue
# 读取音频数据
try:
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
except Exception as e:
print(f"\n❌ 读取音频数据失败: {e}")
self.stream = None
continue
if len(data) == 0:
continue
# 如果正在播放,完全跳过音频处理
if self.is_playing:
# 显示播放状态
status = "🔊 播放中... 跳过录音处理"
print(f"\r{status}", end='', flush=True)
time.sleep(0.1) # 播放时增加延迟减少CPU使用
continue
# 计算能量和零交叉率
energy = self.calculate_energy(data)
zcr = self.calculate_zero_crossing_rate(data)
peak_energy = self.calculate_peak_energy(data)
# 性能监控
self.monitor_performance()
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 基于ZCR的智能静音检测
if self.is_voice_active_advanced(energy, zcr):
self.last_sound_time = time.time()
self.consecutive_low_zcr_count = 0 # 重置低ZCR计数
self.consecutive_silence_count = 0 # 重置静音计数
else:
self.consecutive_low_zcr_count += 1 # 增加低ZCR计数
self.consecutive_silence_count += 1 # 增加静音计数
# 更新ZCR活动历史基于ZCR是否在语音范围内
self.voice_activity_history.append(2400 < zcr < 12000)
if len(self.voice_activity_history) > self.max_voice_history:
self.voice_activity_history.pop(0)
# 检查是否应该结束录音
current_time = time.time()
# 纯ZCR静音检测
should_stop = False
stop_reason = ""
# 主要检测连续低ZCR计数
if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count:
# 进一步验证检查最近的ZCR活动历史
if len(self.voice_activity_history) >= 15:
recent_voice_activity = sum(self.voice_activity_history[-15:])
if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动
should_stop = True
stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)"
else:
# 如果历史数据不足,使用基础检测
should_stop = True
stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)"
# 备用检测:基于时间的静音检测
if not should_stop and current_time - self.last_sound_time > self.silence_threshold:
should_stop = True
stop_reason = f"时间静音检测 ({self.silence_threshold}秒)"
# 执行停止录音
if should_stop and recording_duration >= self.min_recording_time:
print(f"\n🔇 {stop_reason},结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}")
self.stop_recording()
# 显示录音状态仅ZCR相关信息
is_voice = self.is_voice_active_advanced(energy, zcr)
zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}"
recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0
status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式 - 更新预录音缓冲区
self.update_pre_record_buffer(data)
# 使用高级检测
if self.is_voice_active_advanced(energy, zcr):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态仅ZCR相关信息
is_voice = self.is_voice_active_advanced(energy, zcr)
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%"
print(f"\r{status}", end='', flush=True)
# 减少CPU使用
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
if self.recording:
self.stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
def generate_asr_header(self, message_type=1, message_type_specific_flags=0):
"""生成ASR请求头部"""
PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001
JSON = 0b0001
GZIP = 0b0001
header = bytearray()
header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE)
header.append((message_type << 4) | message_type_specific_flags)
header.append((JSON << 4) | GZIP)
header.append(0x00) # reserved
return header
def parse_asr_response(self, res):
"""解析ASR响应"""
PROTOCOL_VERSION = res[0] >> 4
header_size = res[0] & 0x0f
message_type = res[1] >> 4
message_type_specific_flags = res[1] & 0x0f
serialization_method = res[2] >> 4
message_compression = res[2] & 0x0f
reserved = res[3]
header_extensions = res[4:header_size * 4]
payload = res[header_size * 4:]
result = {}
payload_msg = None
payload_size = 0
if message_type == 0b1001: # SERVER_FULL_RESPONSE
payload_size = int.from_bytes(payload[:4], "big", signed=True)
payload_msg = payload[4:]
elif message_type == 0b1011: # SERVER_ACK
seq = int.from_bytes(payload[:4], "big", signed=True)
result['seq'] = seq
if len(payload) >= 8:
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
elif message_type == 0b1111: # SERVER_ERROR_RESPONSE
code = int.from_bytes(payload[:4], "big", signed=False)
result['code'] = code
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
if payload_msg is None:
return result
if message_compression == 0b0001: # GZIP
payload_msg = gzip.decompress(payload_msg)
if serialization_method == 0b0001: # JSON
payload_msg = json.loads(str(payload_msg, "utf-8"))
result['payload_msg'] = payload_msg
result['payload_size'] = payload_size
return result
async def recognize_audio(self, audio_path):
"""识别音频文件"""
if not self.enable_asr or websockets is None:
return None
try:
print("🤖 开始语音识别...")
# 读取音频文件
with open(audio_path, mode="rb") as f:
audio_data = f.read()
# 构建请求
reqid = str(uuid.uuid4())
request_params = {
'app': {
'appid': self.asr_appid,
'cluster': self.asr_cluster,
'token': self.asr_token,
},
'user': {
'uid': 'recorder_asr'
},
'request': {
'reqid': reqid,
'nbest': 1,
'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate',
'show_language': False,
'show_utterances': False,
'result_type': 'full',
"sequence": 1
},
'audio': {
'format': 'wav',
'rate': self.RATE,
'language': 'zh-CN',
'bits': 16,
'channel': self.CHANNELS,
'codec': 'raw'
}
}
# 构建头部
payload_bytes = str.encode(json.dumps(request_params))
payload_bytes = gzip.compress(payload_bytes)
full_client_request = bytearray(self.generate_asr_header())
full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
full_client_request.extend(payload_bytes)
# 设置认证头
additional_headers = {'Authorization': 'Bearer; {}'.format(self.asr_token)}
# 连接WebSocket并发送请求
async with websockets.connect(self.asr_ws_url, additional_headers=additional_headers, max_size=1000000000) as ws:
# 发送完整请求
await ws.send(full_client_request)
res = await ws.recv()
result = self.parse_asr_response(res)
if 'payload_msg' in result and result['payload_msg'].get('code') != 1000:
print(f"❌ ASR请求失败: {result['payload_msg']}")
return None
# 分块发送音频数据
chunk_size = int(self.CHANNELS * 2 * self.RATE * 15000 / 1000) # 15ms chunks
for offset in range(0, len(audio_data), chunk_size):
chunk = audio_data[offset:offset + chunk_size]
last = (offset + chunk_size) >= len(audio_data)
payload_bytes = gzip.compress(chunk)
audio_only_request = bytearray(self.generate_asr_header(message_type=0b0010, message_type_specific_flags=0b0010 if last else 0))
audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
audio_only_request.extend(payload_bytes)
await ws.send(audio_only_request)
res = await ws.recv()
result = self.parse_asr_response(res)
return result
except Exception as e:
print(f"❌ 语音识别失败: {e}")
return None
def recognize_audio_sync(self, audio_path):
"""同步版本的语音识别"""
if not self.enable_asr or websockets is None:
return None
try:
return asyncio.run(self.recognize_audio(audio_path))
except Exception as e:
print(f"❌ 语音识别失败: {e}")
return None
def call_llm(self, user_message):
"""调用大语言模型API"""
if not self.enable_llm:
return None
try:
print("🤖 调用大语言模型...")
# 获取角色配置中的系统提示词
if self.character_config and "system_prompt" in self.character_config:
system_prompt = self.character_config["system_prompt"]
else:
system_prompt = "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。"
# 获取角色配置中的最大token数
max_tokens = 50
if self.character_config and "max_tokens" in self.character_config:
max_tokens = self.character_config["max_tokens"]
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.llm_api_key}"
}
# 构建消息列表,包含历史记录
messages = [
{
"role": "system",
"content": system_prompt
}
]
# 添加历史对话记录
for history_item in self.chat_history:
messages.extend([
{"role": "user", "content": history_item["user"]},
{"role": "assistant", "content": history_item["assistant"]}
])
# 添加当前用户消息
messages.append({
"role": "user",
"content": user_message
})
data = {
"model": self.llm_model,
"messages": messages,
"max_tokens": max_tokens
}
response = requests.post(self.llm_api_url, headers=headers, json=data, timeout=30)
if response.status_code == 200:
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
llm_response = result["choices"][0]["message"]["content"]
# 过滤括号中的旁白内容
filtered_response = self._filter_parentheses_content(llm_response.strip())
# 保存到历史记录
self._add_to_chat_history(user_message, filtered_response)
return filtered_response
else:
print("❌ LLM API响应格式错误")
return None
else:
print(f"❌ LLM API调用失败: {response.status_code}")
print(f"响应内容: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求失败: {e}")
return None
except Exception as e:
print(f"❌ LLM调用失败: {e}")
return None
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
# 移除方括号内容:【内容】
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
# 移除方括号内容:[content]
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
# 移除书名号内容:「内容」
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
# 清理多余空格
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
return filtered_text
def _add_to_chat_history(self, user_message, assistant_response):
"""添加对话到历史记录"""
# 如果历史记录超过最大长度,移除最早的记录
if len(self.chat_history) >= self.max_history_length:
self.chat_history.pop(0)
# 添加新的对话记录
self.chat_history.append({
"user": user_message,
"assistant": assistant_response,
"timestamp": time.time()
})
def clear_chat_history(self):
"""清空聊天历史"""
self.chat_history = []
print("💬 聊天历史已清空")
def get_chat_history_summary(self):
"""获取聊天历史摘要"""
if not self.chat_history:
return "暂无聊天历史"
return f"当前有 {len(self.chat_history)} 轮对话记录"
def _safe_delete_file(self, filepath, description="文件"):
"""安全删除文件"""
try:
if filepath and os.path.exists(filepath):
os.remove(filepath)
print(f"🗑️ 已删除{description}: {filepath}")
return True
except Exception as e:
print(f"⚠️ 删除{description}失败: {e}")
return False
def generate_tts_filename(self):
"""生成TTS文件名"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
return f"tts_response_{timestamp}.pcm"
def text_to_speech(self, text):
"""文本转语音"""
if not self.enable_tts:
return None
try:
print("🔊 开始文本转语音...")
# 生成输出文件名
output_file = self.generate_tts_filename()
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "recorder_tts"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
print(f"❌ TTS请求失败: {response.status_code}")
print(f"响应内容: {response.text}")
return None
# 处理流式响应
audio_data = bytearray()
total_audio_size = 0
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
audio_data.extend(chunk_audio)
continue
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
print("TTS句子信息:", data["sentence"])
continue
if data.get("code", 0) == 20000000:
break
if data.get("code", 0) > 0:
print(f"❌ TTS错误响应: {data}")
break
except json.JSONDecodeError:
print(f"❌ 解析TTS响应失败: {chunk}")
continue
# 保存音频文件
if audio_data:
with open(output_file, "wb") as f:
f.write(audio_data)
print(f"✅ TTS音频已保存: {output_file}")
print(f"📁 文件大小: {len(audio_data) / 1024:.2f} KB")
# 确保文件有正确的访问权限
os.chmod(output_file, 0o644)
# 播放生成的音频
if hasattr(self, 'audio_player_available') and self.audio_player_available:
print("🔊 播放AI语音回复...")
self.play_audio_safe(output_file, reopen_input=False)
else:
print(" 跳过播放TTS音频无可用播放器")
print(f"📁 TTS音频已保存到: {output_file}")
# 播放完成后删除PCM文件
self._safe_delete_file(output_file, "TTS音频文件")
return output_file
else:
print("❌ 未接收到TTS音频数据")
# 尝试删除可能存在的空文件
self._safe_delete_file(output_file, "空的TTS音频文件")
return None
finally:
response.close()
session.close()
except Exception as e:
print(f"❌ TTS转换失败: {e}")
return None
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='基于能量检测的极简录音系统')
parser.add_argument('--character', '-c', type=str, default='libai',
help='选择角色 (默认: libai)')
parser.add_argument('--list-characters', '-l', action='store_true',
help='列出所有可用角色')
return parser.parse_args()
def list_characters(characters_dir):
"""列出所有可用角色"""
characters = []
if os.path.exists(characters_dir):
for file in os.listdir(characters_dir):
if file.endswith('.json'):
character_name = file[:-5]
config_file = os.path.join(characters_dir, file)
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
name = config.get('name', character_name)
desc = config.get('description', '无描述')
characters.append(f"{character_name}: {name} - {desc}")
except:
characters.append(f"{character_name}: 配置文件读取失败")
if characters:
print("🎭 可用角色列表:")
for char in characters:
print(f" - {char}")
else:
print("❌ 未找到任何角色配置文件")
def main():
"""主函数"""
args = parse_arguments()
characters_dir = os.path.join(os.path.dirname(__file__), "characters")
# 如果要求列出角色,显示后退出
if args.list_characters:
list_characters(characters_dir)
return
print("🚀 基于能量检测的极简录音系统")
print("🤖 集成语音识别功能")
print(f"🎭 当前角色: {args.character}")
print("=" * 50)
# 创建录音系统
recorder = EnergyBasedRecorder(
energy_threshold=200, # 能量阈值(降低以提高灵敏度)
silence_threshold=3.0, # 静音阈值(秒)- 改为3秒
min_recording_time=2.0, # 最小录音时间
max_recording_time=30.0, # 最大录音时间
enable_asr=True, # 启用语音识别功能
enable_llm=True, # 启用大语言模型功能
enable_tts=True, # 启用文本转语音功能
character=args.character # 指定角色
)
print("✅ 系统初始化成功")
print("🎯 功能特点:")
print(" - 完全移除Vosk识别依赖")
print(" - 基于ZCR语音检测精确识别")
print(" - 集成在线语音识别字节跳动ASR")
print(" - 集成大语言模型(豆包大模型)")
print(" - 集成文本转语音字节跳动TTS")
print(" - 录音完成后自动语音识别")
print(" - 语音识别后自动调用AI助手")
print(" - AI回复后自动转换为语音")
print(" - 多角色支持 (李白、猪八戒等)")
print(" - 每个角色独特音色和性格")
print(" - 预录音功能包含声音开始前2秒")
print(" - 环形缓冲区防止丢失开头音频")
print(" - 自动调整能量阈值")
print(" - 实时性能监控")
print(" - 预期延迟: <0.1秒")
print("=" * 50)
# 显示API密钥状态
if not recorder.enable_llm:
print("🔑 提示: 如需启用大语言模型功能,请设置环境变量:")
print(" export ARK_API_KEY='your_api_key_here'")
print("=" * 50)
# 显示角色信息
if recorder.character_config:
print(f"🎭 当前角色: {recorder.character_config.get('name', '未知')}")
print(f"📝 描述: {recorder.character_config.get('description', '无描述')}")
print(f"🎤 音色: {recorder.tts_speaker}")
print("=" * 50)
# 显示使用说明
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音3秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("🎭 角色切换:")
print("- 使用 --character 或 -c 参数选择角色")
print("- 使用 --list-characters 或 -l 查看所有角色")
print("- 示例: python recorder.py --character zhubajie")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()