Local-Voice/recorder.py
2025-09-21 10:48:51 +08:00

2201 lines
91 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于能量检测的极简录音系统
专门针对树莓派3B优化完全移除Vosk识别依赖
"""
import asyncio
import base64
import gzip
import hmac
import json
import os
import sys
import threading
import time
import uuid
import wave
import argparse
import queue
from io import BytesIO
from urllib.parse import urlparse
import numpy as np
import pyaudio
import requests
try:
import websockets
except ImportError:
print("⚠️ websockets 未安装,语音识别功能将不可用")
websockets = None
class EnergyBasedRecorder:
"""基于能量检测的录音系统"""
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0, enable_asr=True, enable_llm=True, enable_tts=True, character="libai"):
# 音频参数 - 极简优化
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000 # 16kHz采样率
self.CHUNK_SIZE = 1024 # 适中块大小
# 语音识别配置
self.enable_asr = enable_asr
self.asr_appid = "8718217928"
self.asr_token = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.asr_cluster = "volcengine_input_common"
self.asr_ws_url = "wss://openspeech.bytedance.com/api/v2/asr"
# 大语言模型配置
self.enable_llm = enable_llm
self.llm_api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
self.llm_model = "doubao-seed-1-6-flash-250828"
self.llm_api_key = os.environ.get("ARK_API_KEY", "")
# 检查API密钥
if self.enable_llm and not self.llm_api_key:
print("⚠️ 未设置 ARK_API_KEY 环境变量,大语言模型功能将被禁用")
self.enable_llm = False
# 文本转语音配置
self.enable_tts = enable_tts
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
self.tts_app_id = "8718217928"
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.tts_resource_id = "volc.service_type.10029"
self.tts_app_key = "aGjiRDfUWi"
self.tts_speaker = "zh_female_wanqudashu_moon_bigtts"
# 角色配置
self.current_character = character
self.characters_dir = os.path.join(os.path.dirname(__file__), "characters")
self.available_characters = self._load_available_characters()
self.character_config = self._load_character_config(character)
# 如果加载了角色配置更新TTS音色
if self.character_config and "voice" in self.character_config:
self.tts_speaker = self.character_config["voice"]
# 聊天历史记录
self.chat_history = []
self.max_history_length = 10 # 最多保存10轮对话
# 检查音频播放能力
if self.enable_tts:
self.audio_player_available = self._check_audio_player()
if not self.audio_player_available:
print("⚠️ 未找到音频播放器TTS音频播放功能可能不可用")
print(" 建议安装: sudo apt-get install alsa-utils")
# 不禁用TTS功能因为仍然可以生成文件
# 能量检测参数
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
self.min_recording_time = min_recording_time # 最小录音时间
self.max_recording_time = max_recording_time # 最大录音时间
self.pre_record_duration = 2.0 # 预录音时长(秒)
# 状态变量
self.audio = None
self.input_stream = None # 输入流(录音)
self.output_stream = None # 输出流(播放)
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = [] # ZCR历史
self.max_energy_history = 50 # 最大能量历史记录
# 预录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数
# 播放状态
self.is_playing = False # 是否正在播放
# 智能静音检测
self.voice_activity_history = [] # 语音活动历史
self.max_voice_history = 20 # 最大语音活动历史记录
self.consecutive_silence_count = 0 # 连续静音计数
self.silence_threshold_count = 15 # 连续静音次数阈值约1.5秒)
# 智能ZCR静音检测
self.max_zcr_history = 30 # 最大ZCR历史记录
self.consecutive_low_zcr_count = 0 # 连续低ZCR计数
self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值约2秒
# 性能监控
self.frame_count = 0
self.start_time = time.time()
# 流式TTS优化系统
self.tts_task_queue = queue.Queue(maxsize=10) # TTS任务队列
self.tts_buffer = [] # 智能缓冲区
self.tts_buffer_max_size = 3 # 最多缓冲3个句子
self.tts_buffer_min_size = 1 # 最少1个句子
self.tts_accumulation_time = 0.2 # 200ms积累窗口
self.tts_last_trigger_time = 0
self.tts_worker_running = True
self.tts_worker_thread = None
self.tts_pending_sentences = [] # 待处理的句子
# 统一音频播放系统
self.audio_playback_queue = queue.Queue(maxsize=1000) # 统一音频播放队列,大队列确保不会截断
self.playback_worker_running = True
self.playback_worker_thread = None
self.currently_playing = False # 当前是否正在播放
self.last_playback_time = 0 # 最后一次播放时间戳
self.playback_cooldown_period = 1.5 # 播放冷却时间(秒)
self.audio_device_healthy = True # 音频设备健康状态
# 启动工作线程
self._start_tts_worker()
self._start_playback_worker()
self._setup_audio()
def _start_tts_worker(self):
"""启动TTS工作线程"""
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
self.tts_worker_thread.start()
print("🎵 TTS工作线程已启动")
def _start_playback_worker(self):
"""启动音频播放工作线程"""
self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_worker_thread.start()
print("🔊 音频播放工作线程已启动")
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
# 从队列获取任务
task = self.tts_task_queue.get(timeout=1.0)
if task is None: # 结束信号
break
task_type, content = task
if task_type == "tts_sentence":
# 生成音频数据并发送到统一播放队列
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"❌ TTS工作线程错误: {e}")
time.sleep(0.1)
def _playback_worker(self):
"""音频播放工作线程 - 处理统一音频播放队列"""
print("🔊 播放工作线程开始运行...")
# 等待音频设备就绪
time.sleep(0.5)
# 创建音频播放流
playback_stream = None
try:
playback_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=512
)
print("🔊 音频播放流已创建")
except Exception as e:
print(f"❌ 创建音频播放流失败: {e}")
return
chunks_played = 0
total_size = 0
try:
while self.playback_worker_running:
try:
# 从播放队列获取音频数据
audio_chunk = self.audio_playback_queue.get(timeout=1.0)
if audio_chunk is None: # 结束信号
print("🔊 收到播放结束信号")
break
if isinstance(audio_chunk, str) and audio_chunk.startswith("METADATA:"):
# 处理元数据(如文本信息)
metadata = audio_chunk[9:] # 移除 "METADATA:" 前缀
print(f"📝 播放元数据: {metadata}")
self.audio_playback_queue.task_done()
continue
# 播放音频块
if audio_chunk and len(audio_chunk) > 0:
self.currently_playing = True
self.last_playback_time = time.time() # 更新最后播放时间
playback_stream.write(audio_chunk)
chunks_played += 1
total_size += len(audio_chunk)
# 减少进度显示频率
if chunks_played % 10 == 0:
print(f"\r🔊 统一播放: {chunks_played} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
self.audio_playback_queue.task_done()
except queue.Empty:
# 队列空时,检查是否正在播放
self.currently_playing = False
continue
except Exception as e:
print(f"❌ 播放工作线程错误: {e}")
self.currently_playing = False
time.sleep(0.1)
print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB")
finally:
self.currently_playing = False
if playback_stream:
playback_stream.stop_stream()
playback_stream.close()
print("🔊 音频播放流已关闭")
def _add_tts_task(self, content):
"""添加TTS任务到队列"""
try:
self.tts_task_queue.put_nowait(("tts_sentence", content))
return True
except queue.Full:
print("⚠️ TTS任务队列已满丢弃任务")
return False
def _generate_tts_audio(self, text):
"""生成TTS音频数据并发送到统一播放队列"""
if not self.enable_tts:
return False
try:
print(f"🎵 生成TTS音频: {text[:50]}...")
# 添加元数据到播放队列
try:
self.audio_playback_queue.put_nowait(f"METADATA:{text[:30]}...")
except queue.Full:
print("⚠️ 播放队列满,跳过元数据")
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "recorder_tts_generator"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
print(f"❌ TTS请求失败: {response.status_code}")
return False
# 处理流式响应
total_audio_size = 0
chunk_count = 0
success_count = 0
print("🔄 开始接收TTS音频流...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 将音频块发送到统一播放队列
try:
self.audio_playback_queue.put_nowait(chunk_audio)
success_count += 1
except queue.Full:
# 队列满时稍微等待
time.sleep(0.01)
try:
self.audio_playback_queue.put_nowait(chunk_audio)
success_count += 1
except queue.Full:
# 如果还是满的,跳过这个块
continue
# 减少进度显示频率
if chunk_count % 5 == 0:
print(f"\r📥 生成音频: {chunk_count} 块 | 成功: {success_count} | {total_audio_size / 1024:.1f} KB", end='', flush=True)
continue
if data.get("code", 0) == 20000000:
break
if data.get("code", 0) > 0:
print(f"\n❌ TTS错误响应: {data}")
break
except json.JSONDecodeError:
continue
success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0
print(f"\n✅ TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB")
return success_count > 0
finally:
response.close()
session.close()
except Exception as e:
print(f"❌ TTS音频生成失败: {e}")
return False
def _should_trigger_tts(self, sentence):
"""智能判断是否应该触发TTS"""
current_time = time.time()
# 检查缓冲区大小
if len(self.tts_buffer) >= self.tts_buffer_max_size:
return True
# 检查时间窗口
time_since_last = current_time - self.tts_last_trigger_time
if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size:
return True
# 检查是否为完整句子(使用新的严格检测)
if self._is_complete_sentence(sentence):
return True
# 检查句子特征 - 长句子优先50字符以上
if len(sentence) > 50: # 超过50字符的句子立即触发
return True
# 中等长度句子30-50字符如果有结束标点也触发
if len(sentence) > 30:
end_punctuations = ['', '', '', '.', '!', '?']
if any(sentence.strip().endswith(p) for p in end_punctuations):
return True
# 短句子只在缓冲区较多或时间窗口到期时触发
return False
def _process_tts_buffer(self):
"""处理TTS缓冲区"""
if not self.tts_buffer:
return
# 合并缓冲区的句子
combined_text = ''.join(self.tts_buffer)
# 添加到TTS任务队列
if self._add_tts_task(combined_text):
print(f"🎵 触发TTS: {combined_text[:50]}...")
self.tts_last_trigger_time = time.time()
# 清空缓冲区
self.tts_buffer.clear()
def _add_sentence_to_buffer(self, sentence):
"""添加句子到智能缓冲区"""
if not sentence.strip():
return
self.tts_buffer.append(sentence)
# 检查是否应该触发TTS
if self._should_trigger_tts(sentence):
self._process_tts_buffer()
def _flush_tts_buffer(self):
"""强制刷新TTS缓冲区"""
if self.tts_buffer:
self._process_tts_buffer()
def _load_available_characters(self):
"""加载可用角色列表"""
characters = []
if os.path.exists(self.characters_dir):
for file in os.listdir(self.characters_dir):
if file.endswith('.json'):
characters.append(file[:-5]) # 去掉.json后缀
return characters
def _load_character_config(self, character_name):
"""加载角色配置"""
config_file = os.path.join(self.characters_dir, f"{character_name}.json")
if not os.path.exists(config_file):
print(f"⚠️ 角色配置文件不存在: {config_file}")
return None
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"✅ 加载角色: {config.get('name', character_name)}")
print(f"📝 描述: {config.get('description', '无描述')}")
# 如果切换了角色,清空聊天历史
if hasattr(self, 'current_character') and self.current_character != character_name:
self.clear_chat_history()
print(f"🔄 角色已切换,聊天历史已清空")
return config
except Exception as e:
print(f"❌ 加载角色配置失败: {e}")
return None
def _setup_audio(self, force_reset=False):
"""设置音频设备"""
try:
# 如果强制重置,先完全释放资源
if force_reset:
self._force_close_audio_stream()
# 不终止整个PyAudio实例只重置输入流
# 创建新的PyAudio实例如果不存在
if not self.audio:
self.audio = pyaudio.PyAudio()
# 创建音频输入流
self.input_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
self.audio_device_healthy = True # 恢复设备健康状态
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
self.audio_device_healthy = False
def _force_close_audio_stream(self):
"""强制关闭音频输入流,避免阻塞"""
if self.input_stream:
try:
# 尝试优雅关闭,设置超时
import signal
def timeout_handler(signum, frame):
raise TimeoutError("音频流关闭超时")
# 设置5秒超时
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(5)
try:
self.input_stream.stop_stream()
self.input_stream.close()
finally:
signal.alarm(0) # 取消超时
except TimeoutError:
print("⚠️ 音频输入流关闭超时,强制终止")
self.input_stream = None
except Exception as e:
print(f"⚠️ 优雅关闭音频输入流失败: {e}")
try:
# 强制关闭
self.input_stream = None
except:
pass
finally:
self.input_stream = None
self.audio_device_healthy = False # 标记设备需要重置
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算RMS能量处理可能的无效值
try:
rms = np.sqrt(np.mean(audio_array ** 2))
# 检查是否为有效值
if np.isnan(rms) or np.isinf(rms):
return 0
# 更新能量历史(只在非录音状态下更新,避免语音影响背景噪音计算)
if not self.recording:
self.energy_history.append(rms)
if len(self.energy_history) > self.max_energy_history:
self.energy_history.pop(0)
return rms
except:
return 0
def calculate_peak_energy(self, audio_data):
"""计算峰值能量(辅助判断)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
peak_energy = np.max(np.abs(audio_array))
return peak_energy
def calculate_zero_crossing_rate(self, audio_data):
"""计算零交叉率(主要语音检测方法)"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算零交叉次数
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
# 归一化到采样率
zcr = zero_crossings / len(audio_array) * self.RATE
# 更新ZCR历史
self.zcr_history.append(zcr)
if len(self.zcr_history) > self.max_zcr_history:
self.zcr_history.pop(0)
return zcr
def is_voice_active_advanced(self, energy, zcr):
"""仅使用ZCR进行语音活动检测"""
# ZCR语音检测调整到2400-12000 Hz之间适应16000Hz采样率
# 16000Hz采样率时正常语音的ZCR范围会翻倍
zcr_condition = 2400 < zcr < 12000
# 添加一些容错避免短暂的ZCR波动导致误判
return zcr_condition
def is_voice_active(self, energy):
"""已弃用 - 仅用于兼容性"""
# 现在主要使用ZCR检测这个方法保留但不再使用
return False
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
print(f"📊 音频格式参数:")
print(f" - 采样率: {self.RATE} Hz")
print(f" - 声道数: {self.CHANNELS}")
print(f" - 位深度: {self.audio.get_sample_size(self.FORMAT) * 8} bits")
print(f" - 格式: PCM int16 小端序")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
print("🔇 准备播放,强制停止音频输入")
# 立即停止当前录音并清空所有缓冲区
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空所有缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 强制关闭输入流,避免阻塞
self._force_close_audio_stream()
# 设置播放状态
self.is_playing = True
# 等待一小段时间确保音频设备完全停止输入
time.sleep(0.5)
with wave.open(filename, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
total_frames = wf.getnframes()
# 分块读取音频数据
chunk_size = 1024
frames = []
for _ in range(0, total_frames, chunk_size):
chunk = wf.readframes(chunk_size)
if chunk:
frames.append(chunk)
else:
break
# 创建播放流
playback_stream = self.audio.open(
format=self.audio.get_format_from_width(width),
channels=channels,
rate=rate,
output=True
)
print(f"🔊 开始播放: {filename}")
print("🚫 音频输入已完全关闭")
# 分块播放音频
for chunk in frames:
playback_stream.write(chunk)
playback_stream.stop_stream()
playback_stream.close()
print("✅ 播放完成")
print("🔄 重新开启音频输入")
except Exception as e:
print(f"❌ 播放失败: {e}")
self.play_with_system_player(filename)
finally:
# 恢复播放状态
self.is_playing = False
# 等待播放完全结束
time.sleep(0.3)
# 重新开启输入流(强制重置)
self._setup_audio(force_reset=True)
# 重置所有状态
self.energy_history = []
self.zcr_history = []
print("📡 音频输入已重新开启(强制重置)")
def play_with_system_player(self, filename):
"""使用系统播放器播放音频"""
try:
import subprocess
import platform
# 获取文件扩展名
file_ext = filename.lower().split('.')[-1] if '.' in filename else ''
# 根据文件类型和平台选择播放命令
if file_ext == 'mp3':
# MP3文件播放
system = platform.system().lower()
if system == 'linux':
# Linux系统 - 尝试多个MP3播放器
mp3_players = [
['mpg123', filename], # 最常用的MP3播放器
['mpg321', filename], # 另一个MP3播放器
['mplayer', filename], # 通用媒体播放器
['cvlc', '--play-and-exit', filename], # VLC命令行版本
['ffplay', '-nodisp', '-autoexit', filename] # FFmpeg播放器
]
cmd = None
for player in mp3_players:
try:
subprocess.run(['which', player[0]], capture_output=True, check=True)
cmd = player
break
except:
continue
if cmd is None:
raise Exception("未找到可用的MP3播放器请安装 mpg123 或 mplayer")
elif system == 'darwin': # macOS
cmd = ['afplay', filename]
elif system == 'windows':
cmd = ['cmd', '/c', 'start', '/min', filename]
else:
cmd = ['aplay', filename] # 默认,可能会失败
elif file_ext == 'pcm':
# PCM文件播放 - 需要指定格式
cmd = ['aplay', '-f', 'S16_LE', '-r', '16000', '-c', '1', filename]
else:
# WAV文件或其他格式
cmd = ['aplay', filename] # Linux系统
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
print("🚫 系统播放器播放中,音频输入保持关闭")
subprocess.run(cmd, check=True)
print("✅ 播放完成")
print("📡 音频输入已保持关闭状态")
except Exception as e:
print(f"❌ 系统播放器失败: {e}")
# 尝试使用pygame作为备选方案
try:
self._play_with_pygame(filename)
except Exception as pygame_error:
print(f"❌ pygame播放也失败: {pygame_error}")
raise e
def _check_audio_player(self):
"""检查系统是否支持音频播放"""
try:
import subprocess
import platform
system = platform.system().lower()
if system == 'linux':
# 检查aplay用于PCM和WAV播放
try:
subprocess.run(['which', 'aplay'], capture_output=True, check=True)
print("✅ 找到音频播放器: aplay")
return True
except:
pass
# 检查pygame作为备选方案
try:
import pygame
print("✅ 找到pygame作为音频播放备选方案")
return True
except ImportError:
pass
return False
elif system == 'darwin': # macOS
# 检查afplay
try:
subprocess.run(['which', 'afplay'], capture_output=True, check=True)
print("✅ 找到音频播放器: afplay")
return True
except:
return False
elif system == 'windows':
# Windows通常支持音频播放
return True
else:
return False
except Exception as e:
print(f"❌ 检查音频播放器时出错: {e}")
return False
def _play_with_pygame(self, filename):
"""使用pygame播放音频作为备选方案"""
try:
import pygame
pygame.mixer.init()
print(f"🔊 尝试使用pygame播放: {filename}")
# 加载并播放音频
sound = pygame.mixer.Sound(filename)
sound.play()
# 等待播放完成
while pygame.mixer.get_busy():
pygame.time.Clock().tick(10)
print("✅ pygame播放完成")
except ImportError:
raise Exception("pygame未安装")
except Exception as e:
raise Exception(f"pygame播放失败: {e}")
finally:
try:
pygame.mixer.quit()
except:
pass
def play_audio_streaming(self, audio_chunks):
"""智能流式播放音频数据"""
try:
if not audio_chunks:
return False
print("🔊 开始智能流式播放音频...")
# 确保音频输入已停止
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 强制关闭输入流
self._force_close_audio_stream()
self.is_playing = True
time.sleep(0.3) # 等待音频设备切换
# 创建播放流,设置更大的缓冲区
playback_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=2048 # 增加缓冲区大小
)
print("🚫 音频输入已关闭,开始智能流式播放")
# 预加载前几个音频块以确保流畅播放
preload_chunks = 2
buffer_data = b''
# 预加载阶段
for i in range(min(preload_chunks, len(audio_chunks))):
if audio_chunks[i]:
buffer_data += audio_chunks[i]
progress = (i + 1) / len(audio_chunks) * 100
print(f"\r📥 预加载音频: {progress:.1f}%", end='', flush=True)
# 播放预加载的音频
if buffer_data:
playback_stream.write(buffer_data)
# 继续播放剩余音频块
start_idx = preload_chunks
for i in range(start_idx, len(audio_chunks)):
if audio_chunks[i]:
playback_stream.write(audio_chunks[i])
progress = (i + 1) / len(audio_chunks) * 100
print(f"\r🔊 流式播放进度: {progress:.1f}%", end='', flush=True)
# 确保所有数据都被播放
playback_stream.stop_stream()
playback_stream.close()
print("\n✅ 智能流式播放完成")
return True
except Exception as e:
print(f"\n❌ 智能流式播放失败: {e}")
return False
finally:
self.is_playing = False
time.sleep(0.3)
def play_audio_realtime(self, audio_queue):
"""音频队列转发器:将音频数据转发到统一播放队列"""
try:
print("🔊 启动音频队列转发器...")
chunks_forwarded = 0
total_size = 0
# 持续从队列中获取音频数据并转发到统一播放队列
while True:
try:
# 设置超时以避免无限等待
chunk = audio_queue.get(timeout=0.5)
if chunk is None: # 结束信号
print("\n📥 收到转发结束信号")
break
if chunk: # 确保chunk不为空
# 转发到统一播放队列
try:
self.audio_playback_queue.put_nowait(chunk)
chunks_forwarded += 1
total_size += len(chunk)
# 减少进度显示频率以提高性能
if chunks_forwarded % 10 == 0:
print(f"\r📤 转发音频: {chunks_forwarded} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
except queue.Full:
print("⚠️ 统一播放队列满,丢弃音频块")
continue
audio_queue.task_done()
except queue.Empty:
# 队列为空,检查是否还在接收数据
if not hasattr(self, '_receiving_audio') or not self._receiving_audio:
print("\n📡 音频接收完成,转发器结束")
break
# 继续等待,不要显示太多调试信息
continue
except Exception as e:
print(f"\n❌ 转发过程中出错: {e}")
break
print(f"\n✅ 音频转发完成: {chunks_forwarded} 块, {total_size / 1024:.1f} KB")
return True
except Exception as e:
print(f"\n❌ 音频转发失败: {e}")
return False
def play_audio_hybrid(self, audio_chunks):
"""混合模式播放:智能选择流式或传统播放"""
try:
if not audio_chunks:
return False
# 根据音频块数量和大小决定播放策略
total_size = sum(len(chunk) for chunk in audio_chunks)
chunk_count = len(audio_chunks)
print(f"📊 音频分析: {chunk_count} 块, 总大小: {total_size / 1024:.1f} KB")
# 决策策略:
# 1. 如果音频块很少或总大小很小,使用传统播放(音质更好)
# 2. 如果音频块很多或总大小很大,使用流式播放(响应更快)
if chunk_count <= 3 or total_size < 50 * 1024: # 小于50KB或少于3块
print("🎵 选择传统播放模式(保证音质)")
# 合并所有音频块
full_audio = b''.join(audio_chunks)
# 临时保存到文件
temp_file = self.generate_tts_filename()
with open(temp_file, "wb") as f:
f.write(full_audio)
# 使用传统方式播放
success = self.play_audio_safe(temp_file, reopen_input=False)
# 删除临时文件
self._safe_delete_file(temp_file, "临时音频文件")
return success
else:
print("⚡ 选择智能流式播放模式(快速响应)")
return self.play_audio_streaming(audio_chunks)
except Exception as e:
print(f"❌ 混合播放失败: {e}")
return False
def play_audio_safe(self, filename, reopen_input=False):
"""安全的播放方式 - 使用系统播放器"""
try:
print("🔇 准备播放,强制停止音频输入")
# 立即停止当前录音并清空所有缓冲区
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空所有缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 强制关闭输入流
self._force_close_audio_stream()
# 设置播放状态
self.is_playing = True
# 等待确保音频设备完全停止
time.sleep(0.5)
print(f"🔊 开始播放: {filename}")
print("🚫 使用系统播放器,音频输入已完全关闭")
# 使用系统播放器
self.play_with_system_player(filename)
if reopen_input:
print("🔄 准备重新开启音频输入")
except Exception as e:
print(f"❌ 播放失败: {e}")
finally:
# 恢复播放状态
self.is_playing = False
# 等待播放完全结束
time.sleep(0.5)
# 只在需要时重新开启输入流
if reopen_input:
# 重新开启输入流(强制重置)
self._setup_audio(force_reset=True)
# 重置所有状态
self.energy_history = []
self.zcr_history = []
print("📡 音频输入已重新开启(强制重置)")
def update_pre_record_buffer(self, audio_data):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
# 保持缓冲区大小
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
# 将预录音缓冲区的内容添加到录音中
self.recorded_frames.extend(self.pre_record_buffer)
# 清空预录音缓冲区
self.pre_record_buffer = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.energy_history = []
self.zcr_history = [] # 重置ZCR历史
# 重置ZCR相关计数器
self.consecutive_low_zcr_count = 0
self.consecutive_silence_count = 0
self.voice_activity_history = []
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2) # 16位音频每样本2字节
# 计算实际录音时长和预录音时长
actual_duration = duration
pre_record_duration = min(duration, self.pre_record_duration)
print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)")
# 保存录音
success, filename = self.save_recording(audio_data)
# 如果保存成功,进行后续处理
if success and filename:
print("=" * 50)
print("📡 音频输入已保持关闭状态")
print("🔄 开始处理音频...")
# 语音识别和LLM调用
if self.enable_asr and websockets is not None:
print("🤖 开始语音识别...")
asr_result = self.recognize_audio_sync(filename)
if asr_result and 'payload_msg' in asr_result:
result_text = asr_result['payload_msg'].get('result', [])
if result_text:
text = result_text[0].get('text', '识别失败')
print(f"📝 识别结果: {text}")
# 调用大语言模型
if self.enable_llm and text != '识别失败':
print("-" * 50)
llm_response = self.call_llm(text)
if llm_response:
print(f"💬 AI助手回复: {llm_response}")
# 调用文本转语音
if self.enable_tts:
print("-" * 50)
# 检查是否已经在流式处理中播放了语音
if not hasattr(self, '_streaming_tts_completed') or not self._streaming_tts_completed:
tts_file = self.text_to_speech(llm_response)
if tts_file:
print("✅ AI语音回复完成")
# 删除录音文件
self._safe_delete_file(filename, "录音文件")
else:
print("❌ 文本转语音失败")
else:
print("✅ AI语音回复已完成流式播放")
# 删除录音文件
self._safe_delete_file(filename, "录音文件")
# 重置流式标记
self._streaming_tts_completed = False
else:
print(" 文本转语音功能已禁用")
# 如果不启用TTS直接删除录音文件
self._safe_delete_file(filename, "录音文件")
else:
print("❌ 大语言模型调用失败")
else:
if not self.enable_llm:
print(" 大语言模型功能已禁用")
elif not self.llm_api_key:
print(" 请设置 ARK_API_KEY 环境变量以启用大语言模型功能")
else:
print("❌ 语音识别失败: 无结果")
else:
print("❌ 语音识别失败")
else:
if not self.enable_asr:
print(" 语音识别功能已禁用")
elif websockets is None:
print(" 请安装 websockets 库以启用语音识别功能")
# 等待音频播放完成后再重新开启音频输入
print("⏳ 等待音频播放完成...")
# 检查多个队列的状态
tts_queue_size = self.tts_task_queue.qsize()
playback_queue_size = self.audio_playback_queue.qsize()
while tts_queue_size > 0 or playback_queue_size > 0 or self.currently_playing:
tts_queue_size = self.tts_task_queue.qsize()
playback_queue_size = self.audio_playback_queue.qsize()
if tts_queue_size > 0:
status = f"🎵 TTS生成中... TTS队列: {tts_queue_size} 播放队列: {playback_queue_size}"
elif self.currently_playing:
status = f"🔊 正在播放... 播放队列: {playback_queue_size}"
else:
status = f"⏳ 等待播放... 播放队列: {playback_queue_size}"
print(f"\r{status}", end='', flush=True)
time.sleep(0.1)
# 额外等待1秒确保音频设备完全停止
print("\n⏳ 等待音频设备完全停止...")
time.sleep(1.0)
print("🔄 音频播放完成,准备重新开启音频输入")
# 强制重置音频设备,确保完全关闭
print("🔄 强制重置音频设备...")
self._force_close_audio_stream()
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = []
def get_average_energy(self):
"""计算平均能量"""
if len(self.energy_history) == 0:
return 0
return np.mean(self.energy_history)
def monitor_performance(self):
"""性能监控"""
self.frame_count += 1
if self.frame_count % 1000 == 0: # 每1000帧显示一次
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
avg_energy = self.get_average_energy()
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
def auto_adjust_threshold(self):
"""自动调整能量阈值"""
if len(self.energy_history) >= 20:
# 基于历史能量的中位数和标准差调整阈值
median_energy = np.median(self.energy_history)
std_energy = np.std(self.energy_history)
# 设置阈值为中位数 + 2倍标准差
new_threshold = max(300, median_energy + 2 * std_energy)
# 平滑调整阈值
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
def run(self):
"""运行录音系统"""
if not self.input_stream:
print("❌ 音频设备未初始化")
return
self.running = True
print("🎤 开始监听...")
print(f"能量阈值: {self.energy_threshold} (已弃用)")
print(f"静音阈值: {self.silence_threshold}")
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音3秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("🎯 新增功能:")
print("- 纯ZCR语音检测移除能量检测")
print("- 零交叉率检测(区分语音和噪音)")
print("- 实时显示ZCR状态")
print("- 预录音功能包含声音开始前2秒")
print("- 环形缓冲区防止丢失开头音频")
print("🤖 纯ZCR静音检测:")
print("- 连续低ZCR计数20次=2秒")
print("- ZCR活动历史追踪")
print("- 基于ZCR模式的静音验证")
print("- 语音范围: 2400-12000 Hz (适应16kHz采样率)")
print("=" * 50)
try:
while self.running:
# 检查音频流是否可用
if self.input_stream is None:
print("\n❌ 音频流已断开,尝试重新连接...")
self._setup_audio(force_reset=True)
if self.input_stream is None:
print("❌ 音频流重连失败,等待...")
time.sleep(1)
continue
# 读取音频数据
try:
data = self.input_stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
except Exception as e:
print(f"\n❌ 读取音频数据失败: {e}")
self.input_stream = None
continue
if len(data) == 0:
continue
# 检查设备健康状态、TTS状态和播放冷却期 - 防止回声
current_time = time.time()
time_since_last_play = current_time - self.last_playback_time
in_cooldown = time_since_last_play < self.playback_cooldown_period
# 检查TTS和播放队列状态
tts_active = not self.tts_task_queue.empty()
playback_active = not self.audio_playback_queue.empty() or self.currently_playing
# 如果设备不健康、正在播放、TTS生成中、播放队列不为空、或在冷却期内完全跳过音频处理
if not self.audio_device_healthy or self.is_playing or tts_active or playback_active or in_cooldown:
# 显示播放状态
tts_queue_size = self.tts_task_queue.qsize()
queue_size = self.audio_playback_queue.qsize()
if not self.audio_device_healthy:
status = f"🔧 设备重置中... TTS: {tts_queue_size} 播放: {queue_size}"
elif tts_active:
status = f"🎵 TTS生成中... TTS: {tts_queue_size} 播放: {queue_size}"
elif in_cooldown:
cooldown_time = self.playback_cooldown_period - time_since_last_play
status = f"🔊 播放冷却中... {cooldown_time:.1f}s TTS: {tts_queue_size} 播放: {queue_size}"
else:
playing_status = "播放中" if self.currently_playing else "等待播放"
status = f"🔊 {playing_status}... TTS: {tts_queue_size} 播放: {queue_size}"
print(f"\r{status}", end='', flush=True)
time.sleep(0.1) # 播放时增加延迟减少CPU使用
continue
# 计算能量和零交叉率
energy = self.calculate_energy(data)
zcr = self.calculate_zero_crossing_rate(data)
peak_energy = self.calculate_peak_energy(data)
# 性能监控
self.monitor_performance()
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 基于ZCR的智能静音检测
if self.is_voice_active_advanced(energy, zcr):
self.last_sound_time = time.time()
self.consecutive_low_zcr_count = 0 # 重置低ZCR计数
self.consecutive_silence_count = 0 # 重置静音计数
else:
self.consecutive_low_zcr_count += 1 # 增加低ZCR计数
self.consecutive_silence_count += 1 # 增加静音计数
# 更新ZCR活动历史基于ZCR是否在语音范围内
self.voice_activity_history.append(2400 < zcr < 12000)
if len(self.voice_activity_history) > self.max_voice_history:
self.voice_activity_history.pop(0)
# 检查是否应该结束录音
current_time = time.time()
# 纯ZCR静音检测
should_stop = False
stop_reason = ""
# 主要检测连续低ZCR计数
if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count:
# 进一步验证检查最近的ZCR活动历史
if len(self.voice_activity_history) >= 15:
recent_voice_activity = sum(self.voice_activity_history[-15:])
if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动
should_stop = True
stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)"
else:
# 如果历史数据不足,使用基础检测
should_stop = True
stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)"
# 备用检测:基于时间的静音检测
if not should_stop and current_time - self.last_sound_time > self.silence_threshold:
should_stop = True
stop_reason = f"时间静音检测 ({self.silence_threshold}秒)"
# 执行停止录音
if should_stop and recording_duration >= self.min_recording_time:
print(f"\n🔇 {stop_reason},结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}")
self.stop_recording()
# 显示录音状态仅ZCR相关信息
is_voice = self.is_voice_active_advanced(energy, zcr)
zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}"
recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0
status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式 - 更新预录音缓冲区
self.update_pre_record_buffer(data)
# 使用高级检测
if self.is_voice_active_advanced(energy, zcr):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态仅ZCR相关信息
is_voice = self.is_voice_active_advanced(energy, zcr)
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%"
print(f"\r{status}", end='', flush=True)
# 减少CPU使用
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
if self.recording:
self.stop_recording()
# 停止TTS工作线程
self.tts_worker_running = False
if self.tts_worker_thread:
self.tts_task_queue.put(None) # 发送结束信号
self.tts_worker_thread.join(timeout=2.0)
# 停止播放工作线程
self.playback_worker_running = False
if self.playback_worker_thread:
self.audio_playback_queue.put(None) # 发送结束信号
self.playback_worker_thread.join(timeout=3.0)
if self.input_stream:
self.input_stream.stop_stream()
self.input_stream.close()
if self.audio:
self.audio.terminate()
def generate_asr_header(self, message_type=1, message_type_specific_flags=0):
"""生成ASR请求头部"""
PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001
JSON = 0b0001
GZIP = 0b0001
header = bytearray()
header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE)
header.append((message_type << 4) | message_type_specific_flags)
header.append((JSON << 4) | GZIP)
header.append(0x00) # reserved
return header
def parse_asr_response(self, res):
"""解析ASR响应"""
PROTOCOL_VERSION = res[0] >> 4
header_size = res[0] & 0x0f
message_type = res[1] >> 4
message_type_specific_flags = res[1] & 0x0f
serialization_method = res[2] >> 4
message_compression = res[2] & 0x0f
reserved = res[3]
header_extensions = res[4:header_size * 4]
payload = res[header_size * 4:]
result = {}
payload_msg = None
payload_size = 0
if message_type == 0b1001: # SERVER_FULL_RESPONSE
payload_size = int.from_bytes(payload[:4], "big", signed=True)
payload_msg = payload[4:]
elif message_type == 0b1011: # SERVER_ACK
seq = int.from_bytes(payload[:4], "big", signed=True)
result['seq'] = seq
if len(payload) >= 8:
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
elif message_type == 0b1111: # SERVER_ERROR_RESPONSE
code = int.from_bytes(payload[:4], "big", signed=False)
result['code'] = code
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
if payload_msg is None:
return result
if message_compression == 0b0001: # GZIP
payload_msg = gzip.decompress(payload_msg)
if serialization_method == 0b0001: # JSON
payload_msg = json.loads(str(payload_msg, "utf-8"))
result['payload_msg'] = payload_msg
result['payload_size'] = payload_size
return result
async def recognize_audio(self, audio_path):
"""识别音频文件"""
if not self.enable_asr or websockets is None:
return None
try:
print("🤖 开始语音识别...")
# 读取音频文件
with open(audio_path, mode="rb") as f:
audio_data = f.read()
# 构建请求
reqid = str(uuid.uuid4())
request_params = {
'app': {
'appid': self.asr_appid,
'cluster': self.asr_cluster,
'token': self.asr_token,
},
'user': {
'uid': 'recorder_asr'
},
'request': {
'reqid': reqid,
'nbest': 1,
'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate',
'show_language': False,
'show_utterances': False,
'result_type': 'full',
"sequence": 1
},
'audio': {
'format': 'wav',
'rate': self.RATE,
'language': 'zh-CN',
'bits': 16,
'channel': self.CHANNELS,
'codec': 'raw'
}
}
# 构建头部
payload_bytes = str.encode(json.dumps(request_params))
payload_bytes = gzip.compress(payload_bytes)
full_client_request = bytearray(self.generate_asr_header())
full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
full_client_request.extend(payload_bytes)
# 设置认证头
additional_headers = {'Authorization': 'Bearer; {}'.format(self.asr_token)}
# 连接WebSocket并发送请求
async with websockets.connect(self.asr_ws_url, additional_headers=additional_headers, max_size=1000000000) as ws:
# 发送完整请求
await ws.send(full_client_request)
res = await ws.recv()
result = self.parse_asr_response(res)
if 'payload_msg' in result and result['payload_msg'].get('code') != 1000:
print(f"❌ ASR请求失败: {result['payload_msg']}")
return None
# 分块发送音频数据
chunk_size = int(self.CHANNELS * 2 * self.RATE * 15000 / 1000) # 15ms chunks
for offset in range(0, len(audio_data), chunk_size):
chunk = audio_data[offset:offset + chunk_size]
last = (offset + chunk_size) >= len(audio_data)
payload_bytes = gzip.compress(chunk)
audio_only_request = bytearray(self.generate_asr_header(message_type=0b0010, message_type_specific_flags=0b0010 if last else 0))
audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
audio_only_request.extend(payload_bytes)
await ws.send(audio_only_request)
res = await ws.recv()
result = self.parse_asr_response(res)
return result
except Exception as e:
print(f"❌ 语音识别失败: {e}")
return None
def recognize_audio_sync(self, audio_path):
"""同步版本的语音识别"""
if not self.enable_asr or websockets is None:
return None
try:
return asyncio.run(self.recognize_audio(audio_path))
except Exception as e:
print(f"❌ 语音识别失败: {e}")
return None
def call_llm(self, user_message):
"""调用大语言模型API - 支持流式输出"""
if not self.enable_llm:
return None
try:
print("🤖 调用大语言模型(流式输出)...")
# 获取角色配置中的系统提示词
if self.character_config and "system_prompt" in self.character_config:
system_prompt = self.character_config["system_prompt"]
else:
system_prompt = "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。"
# 获取角色配置中的最大token数
max_tokens = 50
if self.character_config and "max_tokens" in self.character_config:
max_tokens = self.character_config["max_tokens"]
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.llm_api_key}"
}
# 构建消息列表,包含历史记录
messages = [
{
"role": "system",
"content": system_prompt
}
]
# 添加历史对话记录
for history_item in self.chat_history:
messages.extend([
{"role": "user", "content": history_item["user"]},
{"role": "assistant", "content": history_item["assistant"]}
])
# 添加当前用户消息
messages.append({
"role": "user",
"content": user_message
})
data = {
"model": self.llm_model,
"messages": messages,
"max_tokens": max_tokens,
"stream": True # 启用流式输出
}
# 使用流式请求
response = requests.post(self.llm_api_url, headers=headers, json=data, stream=True, timeout=30)
if response.status_code == 200:
print("🔄 开始接收流式响应...")
# 处理流式响应
accumulated_text = ""
sentence_buffer = ""
print("🔍 开始接收SSE流...")
# 使用行缓冲区处理多字节字符
buffer = ""
for line_bytes in response.iter_lines():
if not line_bytes:
continue
# 解码为字符串,处理可能的编码问题
try:
line = line_bytes.decode('utf-8')
except UnicodeDecodeError:
try:
line = line_bytes.decode('latin-1')
except Exception:
continue
# 添加到缓冲区
buffer += line
# 检查是否包含完整的数据行
while buffer:
# 查找完整的数据行
if '\n' in buffer:
line, buffer = buffer.split('\n', 1)
else:
line = buffer
buffer = ""
line = line.strip()
if not line:
continue
try:
# 解析SSE格式的响应
if line.startswith("data: "):
data_str = line[6:] # 移除 "data: " 前缀
if data_str == "[DONE]":
break
# 尝试解析JSON增加错误处理
try:
chunk_data = json.loads(data_str)
except json.JSONDecodeError as e:
# 尝试修复JSON检查是否是不完整的字符串
if not data_str.endswith('}'):
# 这可能是一个不完整的JSON等待下一行
buffer = data_str + buffer
continue
print(f"\n🔍 调试 - 原始行: {repr(line)}")
print(f"🔍 调试 - 数据字符串: {repr(data_str)}")
print(f"🔍 调试 - JSON错误: {e}")
continue
# 处理解析后的数据
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
accumulated_text += content
sentence_buffer += content
# 检查是否有完整句子
if self._is_complete_sentence(sentence_buffer):
print(f"📝 检测到完整句子: {sentence_buffer}")
# 过滤括号内容
filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip())
if filtered_sentence:
print(f"🎵 添加到智能缓冲区: {filtered_sentence}")
# 使用智能缓冲系统替代立即调用TTS
self._add_sentence_to_buffer(filtered_sentence)
# 标记已经进行了流式TTS播放
self._streaming_tts_completed = True
# 重置句子缓冲区
sentence_buffer = ""
# 显示进度
print(f"\r💬 已生成: {accumulated_text}", end='', flush=True)
except Exception as e:
print(f"\n⚠️ 处理流式响应时出错: {e}")
print(f"🔍 调试 - 问题行: {repr(line)}")
continue
print(f"\n✅ 流式响应完成: {accumulated_text}")
# 强制刷新TTS缓冲区确保所有内容都被处理
self._flush_tts_buffer()
# 保存完整回复到历史记录
if accumulated_text:
filtered_response = self._filter_parentheses_content(accumulated_text.strip())
self._add_to_chat_history(user_message, filtered_response)
return accumulated_text
else:
print(f"❌ LLM API调用失败: {response.status_code}")
print(f"响应内容: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求失败: {e}")
return None
except Exception as e:
print(f"❌ LLM调用失败: {e}")
return None
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
# 移除方括号内容:【内容】
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
# 移除方括号内容:[content]
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
# 移除书名号内容:「内容」
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
# 清理多余空格
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
return filtered_text
def _is_complete_sentence(self, text):
"""检测是否为完整句子"""
import re
if not text or len(text.strip()) == 0:
return False
# 增加最小长度要求 - 至少10个字符才考虑作为完整句子
if len(text.strip()) < 10:
return False
# 句子结束标点符号
sentence_endings = r'[。!?.!?]'
# 检查是否以句子结束符结尾
if re.search(sentence_endings + r'\s*$', text):
# 对于以结束符结尾的句子要求至少15个字符
if len(text.strip()) >= 15:
return True
# 检查是否包含句子结束符(可能在句子中间)
if re.search(sentence_endings, text):
# 如果后面有其他标点或字符,可能不是完整句子
remaining_text = re.split(sentence_endings, text, 1)[-1]
if len(remaining_text.strip()) > 0:
return False
# 对于包含结束符的句子要求至少20个字符
if len(text.strip()) >= 20:
return True
# 对于较长的文本超过50字符即使没有结束符也可以考虑
if len(text.strip()) >= 50:
return True
# 对于中等长度的文本,如果包含常见完整句式模式
if len(text.strip()) >= 20:
common_patterns = [
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得
]
for pattern in common_patterns:
if re.match(pattern, text):
return True
return False
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
# 移除方括号内容:【内容】
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
# 移除方括号内容:[content]
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
# 移除书名号内容:「内容」
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
# 清理多余空格
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
return filtered_text
def _add_to_chat_history(self, user_message, assistant_response):
"""添加对话到历史记录"""
# 如果历史记录超过最大长度,移除最早的记录
if len(self.chat_history) >= self.max_history_length:
self.chat_history.pop(0)
# 添加新的对话记录
self.chat_history.append({
"user": user_message,
"assistant": assistant_response,
"timestamp": time.time()
})
def clear_chat_history(self):
"""清空聊天历史"""
self.chat_history = []
print("💬 聊天历史已清空")
def get_chat_history_summary(self):
"""获取聊天历史摘要"""
if not self.chat_history:
return "暂无聊天历史"
return f"当前有 {len(self.chat_history)} 轮对话记录"
def _add_to_chat_history(self, user_message, assistant_response):
"""添加对话到历史记录"""
# 如果历史记录超过最大长度,移除最早的记录
if len(self.chat_history) >= self.max_history_length:
self.chat_history.pop(0)
# 添加新的对话记录
self.chat_history.append({
"user": user_message,
"assistant": assistant_response,
"timestamp": time.time()
})
def clear_chat_history(self):
"""清空聊天历史"""
self.chat_history = []
print("💬 聊天历史已清空")
def get_chat_history_summary(self):
"""获取聊天历史摘要"""
if not self.chat_history:
return "暂无聊天历史"
return f"当前有 {len(self.chat_history)} 轮对话记录"
def _safe_delete_file(self, filepath, description="文件"):
"""安全删除文件"""
try:
if filepath and os.path.exists(filepath):
os.remove(filepath)
print(f"🗑️ 已删除{description}: {filepath}")
return True
except Exception as e:
print(f"⚠️ 删除{description}失败: {e}")
return False
def generate_tts_filename(self):
"""生成TTS文件名"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
return f"tts_response_{timestamp}.pcm"
def text_to_speech(self, text, mode='normal', use_worker=True):
"""文本转语音 - 真正实时流式播放
mode: 'normal' 普通模式, 'buffered' 智能缓冲模式
use_worker: 是否使用工作线程模式
"""
if not self.enable_tts:
return None
try:
if mode == 'buffered':
print("🎵 开始智能缓冲TTS处理...")
else:
print("🔊 开始文本转语音(实时流式播放)...")
# 如果是在工作线程中,不启动播放器
if use_worker and hasattr(self, 'is_playing') and self.is_playing:
print("⚠️ 工作线程模式,跳过播放器启动")
return None
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "recorder_tts"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 创建音频队列 - 设置更大的队列大小以减少中断
audio_queue = queue.Queue(maxsize=50) # 增加队列大小
# 启动实时播放线程
self._receiving_audio = True
player_thread = threading.Thread(target=self.play_audio_realtime, args=(audio_queue,))
player_thread.daemon = True
player_thread.start()
print("🎵 实时播放器已启动")
# 给播放器一点时间启动
time.sleep(0.1)
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
print(f"❌ TTS请求失败: {response.status_code}")
print(f"响应内容: {response.text}")
# 向队列发送结束信号
audio_queue.put(None)
return None
# 处理流式响应 - 根据模式优化
total_audio_size = 0
chunk_count = 0
if mode == 'buffered':
print("🔄 开始接收TTS音频流智能缓冲模式...")
else:
print("🔄 开始接收TTS音频流实时播放...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 将音频块放入队列进行实时播放 - 使用非阻塞方式
try:
audio_queue.put_nowait(chunk_audio)
except queue.Full:
# 如果队列满了,稍微等待一下
time.sleep(0.01)
try:
audio_queue.put_nowait(chunk_audio)
except queue.Full:
# 如果还是满的,跳过这个块以避免阻塞
print(f"\n⚠️ 音频队列已满,跳过块 {chunk_count}")
continue
# 减少进度显示频率以提高性能
if chunk_count % 5 == 0:
print(f"\r📥 接收并播放: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB", end='', flush=True)
continue
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
print(f"\n📝 TTS句子信息: {data['sentence']}")
continue
if data.get("code", 0) == 20000000:
break
if data.get("code", 0) > 0:
print(f"\n❌ TTS错误响应: {data}")
break
except json.JSONDecodeError:
print(f"\n❌ 解析TTS响应失败: {chunk}")
continue
print(f"\n✅ TTS音频接收完成: {chunk_count} 个音频块, 总大小: {total_audio_size / 1024:.1f} KB")
# 等待播放完成
print("⏳ 等待音频播放完成...")
player_thread.join(timeout=5.0)
# 生成临时文件名用于返回
temp_file = self.generate_tts_filename()
return temp_file
finally:
response.close()
session.close()
# 确保播放线程结束
self._receiving_audio = False
audio_queue.put(None)
except Exception as e:
print(f"❌ TTS转换失败: {e}")
return None
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='基于能量检测的极简录音系统')
parser.add_argument('--character', '-c', type=str, default='libai',
help='选择角色 (默认: libai)')
parser.add_argument('--list-characters', '-l', action='store_true',
help='列出所有可用角色')
return parser.parse_args()
def list_characters(characters_dir):
"""列出所有可用角色"""
characters = []
if os.path.exists(characters_dir):
for file in os.listdir(characters_dir):
if file.endswith('.json'):
character_name = file[:-5]
config_file = os.path.join(characters_dir, file)
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
name = config.get('name', character_name)
desc = config.get('description', '无描述')
characters.append(f"{character_name}: {name} - {desc}")
except:
characters.append(f"{character_name}: 配置文件读取失败")
if characters:
print("🎭 可用角色列表:")
for char in characters:
print(f" - {char}")
else:
print("❌ 未找到任何角色配置文件")
def main():
"""主函数"""
args = parse_arguments()
characters_dir = os.path.join(os.path.dirname(__file__), "characters")
# 如果要求列出角色,显示后退出
if args.list_characters:
list_characters(characters_dir)
return
print("🚀 基于能量检测的极简录音系统")
print("🤖 集成语音识别功能")
print(f"🎭 当前角色: {args.character}")
print("=" * 50)
# 创建录音系统
recorder = EnergyBasedRecorder(
energy_threshold=200, # 能量阈值(降低以提高灵敏度)
silence_threshold=3.0, # 静音阈值(秒)- 改为3秒
min_recording_time=2.0, # 最小录音时间
max_recording_time=30.0, # 最大录音时间
enable_asr=True, # 启用语音识别功能
enable_llm=True, # 启用大语言模型功能
enable_tts=True, # 启用文本转语音功能
character=args.character # 指定角色
)
print("✅ 系统初始化成功")
print("🎯 功能特点:")
print(" - 完全移除Vosk识别依赖")
print(" - 基于ZCR语音检测精确识别")
print(" - 集成在线语音识别字节跳动ASR")
print(" - 集成大语言模型(豆包大模型)")
print(" - 集成文本转语音字节跳动TTS")
print(" - 录音完成后自动语音识别")
print(" - 语音识别后自动调用AI助手")
print(" - AI回复后自动转换为语音")
print(" - 多角色支持 (李白、猪八戒等)")
print(" - 每个角色独特音色和性格")
print(" - 预录音功能包含声音开始前2秒")
print(" - 环形缓冲区防止丢失开头音频")
print(" - 自动调整能量阈值")
print(" - 实时性能监控")
print(" - 预期延迟: <0.1秒")
print("=" * 50)
# 显示API密钥状态
if not recorder.enable_llm:
print("🔑 提示: 如需启用大语言模型功能,请设置环境变量:")
print(" export ARK_API_KEY='your_api_key_here'")
print("=" * 50)
# 显示角色信息
if recorder.character_config:
print(f"🎭 当前角色: {recorder.character_config.get('name', '未知')}")
print(f"📝 描述: {recorder.character_config.get('description', '无描述')}")
print(f"🎤 音色: {recorder.tts_speaker}")
print("=" * 50)
# 显示使用说明
print("📖 使用说明:")
print("- 检测到声音自动开始录音")
print("- 持续静音3秒自动结束录音")
print("- 最少录音2秒最多30秒")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("🎭 角色切换:")
print("- 使用 --character 或 -c 参数选择角色")
print("- 使用 --list-characters 或 -l 查看所有角色")
print("- 示例: python recorder.py --character zhubajie")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()