2201 lines
91 KiB
Python
2201 lines
91 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
基于能量检测的极简录音系统
|
||
专门针对树莓派3B优化,完全移除Vosk识别依赖
|
||
"""
|
||
|
||
import asyncio
|
||
import base64
|
||
import gzip
|
||
import hmac
|
||
import json
|
||
import os
|
||
import sys
|
||
import threading
|
||
import time
|
||
import uuid
|
||
import wave
|
||
import argparse
|
||
import queue
|
||
from io import BytesIO
|
||
from urllib.parse import urlparse
|
||
|
||
import numpy as np
|
||
import pyaudio
|
||
import requests
|
||
|
||
try:
|
||
import websockets
|
||
except ImportError:
|
||
print("⚠️ websockets 未安装,语音识别功能将不可用")
|
||
websockets = None
|
||
|
||
class EnergyBasedRecorder:
|
||
"""基于能量检测的录音系统"""
|
||
|
||
def __init__(self, energy_threshold=500, silence_threshold=1.5, min_recording_time=2.0, max_recording_time=30.0, enable_asr=True, enable_llm=True, enable_tts=True, character="libai"):
|
||
# 音频参数 - 极简优化
|
||
self.FORMAT = pyaudio.paInt16
|
||
self.CHANNELS = 1
|
||
self.RATE = 16000 # 16kHz采样率
|
||
self.CHUNK_SIZE = 1024 # 适中块大小
|
||
|
||
# 语音识别配置
|
||
self.enable_asr = enable_asr
|
||
self.asr_appid = "8718217928"
|
||
self.asr_token = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
|
||
self.asr_cluster = "volcengine_input_common"
|
||
self.asr_ws_url = "wss://openspeech.bytedance.com/api/v2/asr"
|
||
|
||
# 大语言模型配置
|
||
self.enable_llm = enable_llm
|
||
self.llm_api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
|
||
self.llm_model = "doubao-seed-1-6-flash-250828"
|
||
self.llm_api_key = os.environ.get("ARK_API_KEY", "")
|
||
|
||
# 检查API密钥
|
||
if self.enable_llm and not self.llm_api_key:
|
||
print("⚠️ 未设置 ARK_API_KEY 环境变量,大语言模型功能将被禁用")
|
||
self.enable_llm = False
|
||
|
||
# 文本转语音配置
|
||
self.enable_tts = enable_tts
|
||
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
|
||
self.tts_app_id = "8718217928"
|
||
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
|
||
self.tts_resource_id = "volc.service_type.10029"
|
||
self.tts_app_key = "aGjiRDfUWi"
|
||
self.tts_speaker = "zh_female_wanqudashu_moon_bigtts"
|
||
|
||
# 角色配置
|
||
self.current_character = character
|
||
self.characters_dir = os.path.join(os.path.dirname(__file__), "characters")
|
||
self.available_characters = self._load_available_characters()
|
||
self.character_config = self._load_character_config(character)
|
||
|
||
# 如果加载了角色配置,更新TTS音色
|
||
if self.character_config and "voice" in self.character_config:
|
||
self.tts_speaker = self.character_config["voice"]
|
||
|
||
# 聊天历史记录
|
||
self.chat_history = []
|
||
self.max_history_length = 10 # 最多保存10轮对话
|
||
|
||
# 检查音频播放能力
|
||
if self.enable_tts:
|
||
self.audio_player_available = self._check_audio_player()
|
||
if not self.audio_player_available:
|
||
print("⚠️ 未找到音频播放器,TTS音频播放功能可能不可用")
|
||
print(" 建议安装: sudo apt-get install alsa-utils")
|
||
# 不禁用TTS功能,因为仍然可以生成文件
|
||
|
||
# 能量检测参数
|
||
self.energy_threshold = energy_threshold # 能量阈值,高于此值认为有声音
|
||
self.silence_threshold = silence_threshold # 静音阈值,低于此值持续多久认为结束
|
||
self.min_recording_time = min_recording_time # 最小录音时间
|
||
self.max_recording_time = max_recording_time # 最大录音时间
|
||
self.pre_record_duration = 2.0 # 预录音时长(秒)
|
||
|
||
# 状态变量
|
||
self.audio = None
|
||
self.input_stream = None # 输入流(录音)
|
||
self.output_stream = None # 输出流(播放)
|
||
self.running = False
|
||
self.recording = False
|
||
self.recorded_frames = []
|
||
self.recording_start_time = None
|
||
self.last_sound_time = None
|
||
self.energy_history = []
|
||
self.zcr_history = [] # ZCR历史
|
||
self.max_energy_history = 50 # 最大能量历史记录
|
||
|
||
# 预录音缓冲区
|
||
self.pre_record_buffer = [] # 预录音缓冲区
|
||
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) # 最大预录音帧数
|
||
|
||
# 播放状态
|
||
self.is_playing = False # 是否正在播放
|
||
|
||
# 智能静音检测
|
||
self.voice_activity_history = [] # 语音活动历史
|
||
self.max_voice_history = 20 # 最大语音活动历史记录
|
||
self.consecutive_silence_count = 0 # 连续静音计数
|
||
self.silence_threshold_count = 15 # 连续静音次数阈值(约1.5秒)
|
||
|
||
# 智能ZCR静音检测
|
||
self.max_zcr_history = 30 # 最大ZCR历史记录
|
||
self.consecutive_low_zcr_count = 0 # 连续低ZCR计数
|
||
self.low_zcr_threshold_count = 20 # 连续低ZCR次数阈值(约2秒)
|
||
|
||
# 性能监控
|
||
self.frame_count = 0
|
||
self.start_time = time.time()
|
||
|
||
# 流式TTS优化系统
|
||
self.tts_task_queue = queue.Queue(maxsize=10) # TTS任务队列
|
||
self.tts_buffer = [] # 智能缓冲区
|
||
self.tts_buffer_max_size = 3 # 最多缓冲3个句子
|
||
self.tts_buffer_min_size = 1 # 最少1个句子
|
||
self.tts_accumulation_time = 0.2 # 200ms积累窗口
|
||
self.tts_last_trigger_time = 0
|
||
self.tts_worker_running = True
|
||
self.tts_worker_thread = None
|
||
self.tts_pending_sentences = [] # 待处理的句子
|
||
|
||
# 统一音频播放系统
|
||
self.audio_playback_queue = queue.Queue(maxsize=1000) # 统一音频播放队列,大队列确保不会截断
|
||
self.playback_worker_running = True
|
||
self.playback_worker_thread = None
|
||
self.currently_playing = False # 当前是否正在播放
|
||
self.last_playback_time = 0 # 最后一次播放时间戳
|
||
self.playback_cooldown_period = 1.5 # 播放冷却时间(秒)
|
||
self.audio_device_healthy = True # 音频设备健康状态
|
||
|
||
# 启动工作线程
|
||
self._start_tts_worker()
|
||
self._start_playback_worker()
|
||
|
||
self._setup_audio()
|
||
|
||
def _start_tts_worker(self):
|
||
"""启动TTS工作线程"""
|
||
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
|
||
self.tts_worker_thread.start()
|
||
print("🎵 TTS工作线程已启动")
|
||
|
||
def _start_playback_worker(self):
|
||
"""启动音频播放工作线程"""
|
||
self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True)
|
||
self.playback_worker_thread.start()
|
||
print("🔊 音频播放工作线程已启动")
|
||
|
||
def _tts_worker(self):
|
||
"""TTS工作线程 - 处理TTS任务队列"""
|
||
while self.tts_worker_running:
|
||
try:
|
||
# 从队列获取任务
|
||
task = self.tts_task_queue.get(timeout=1.0)
|
||
if task is None: # 结束信号
|
||
break
|
||
|
||
task_type, content = task
|
||
if task_type == "tts_sentence":
|
||
# 生成音频数据并发送到统一播放队列
|
||
self._generate_tts_audio(content)
|
||
|
||
self.tts_task_queue.task_done()
|
||
|
||
except queue.Empty:
|
||
continue
|
||
except Exception as e:
|
||
print(f"❌ TTS工作线程错误: {e}")
|
||
time.sleep(0.1)
|
||
|
||
def _playback_worker(self):
|
||
"""音频播放工作线程 - 处理统一音频播放队列"""
|
||
print("🔊 播放工作线程开始运行...")
|
||
|
||
# 等待音频设备就绪
|
||
time.sleep(0.5)
|
||
|
||
# 创建音频播放流
|
||
playback_stream = None
|
||
try:
|
||
playback_stream = self.audio.open(
|
||
format=self.FORMAT,
|
||
channels=self.CHANNELS,
|
||
rate=self.RATE,
|
||
output=True,
|
||
frames_per_buffer=512
|
||
)
|
||
print("🔊 音频播放流已创建")
|
||
except Exception as e:
|
||
print(f"❌ 创建音频播放流失败: {e}")
|
||
return
|
||
|
||
chunks_played = 0
|
||
total_size = 0
|
||
|
||
try:
|
||
while self.playback_worker_running:
|
||
try:
|
||
# 从播放队列获取音频数据
|
||
audio_chunk = self.audio_playback_queue.get(timeout=1.0)
|
||
|
||
if audio_chunk is None: # 结束信号
|
||
print("🔊 收到播放结束信号")
|
||
break
|
||
|
||
if isinstance(audio_chunk, str) and audio_chunk.startswith("METADATA:"):
|
||
# 处理元数据(如文本信息)
|
||
metadata = audio_chunk[9:] # 移除 "METADATA:" 前缀
|
||
print(f"📝 播放元数据: {metadata}")
|
||
self.audio_playback_queue.task_done()
|
||
continue
|
||
|
||
# 播放音频块
|
||
if audio_chunk and len(audio_chunk) > 0:
|
||
self.currently_playing = True
|
||
self.last_playback_time = time.time() # 更新最后播放时间
|
||
playback_stream.write(audio_chunk)
|
||
chunks_played += 1
|
||
total_size += len(audio_chunk)
|
||
|
||
# 减少进度显示频率
|
||
if chunks_played % 10 == 0:
|
||
print(f"\r🔊 统一播放: {chunks_played} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
|
||
|
||
self.audio_playback_queue.task_done()
|
||
|
||
except queue.Empty:
|
||
# 队列空时,检查是否正在播放
|
||
self.currently_playing = False
|
||
continue
|
||
except Exception as e:
|
||
print(f"❌ 播放工作线程错误: {e}")
|
||
self.currently_playing = False
|
||
time.sleep(0.1)
|
||
|
||
print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB")
|
||
|
||
finally:
|
||
self.currently_playing = False
|
||
if playback_stream:
|
||
playback_stream.stop_stream()
|
||
playback_stream.close()
|
||
print("🔊 音频播放流已关闭")
|
||
|
||
def _add_tts_task(self, content):
|
||
"""添加TTS任务到队列"""
|
||
try:
|
||
self.tts_task_queue.put_nowait(("tts_sentence", content))
|
||
return True
|
||
except queue.Full:
|
||
print("⚠️ TTS任务队列已满,丢弃任务")
|
||
return False
|
||
|
||
def _generate_tts_audio(self, text):
|
||
"""生成TTS音频数据并发送到统一播放队列"""
|
||
if not self.enable_tts:
|
||
return False
|
||
|
||
try:
|
||
print(f"🎵 生成TTS音频: {text[:50]}...")
|
||
|
||
# 添加元数据到播放队列
|
||
try:
|
||
self.audio_playback_queue.put_nowait(f"METADATA:{text[:30]}...")
|
||
except queue.Full:
|
||
print("⚠️ 播放队列满,跳过元数据")
|
||
|
||
# 构建请求头
|
||
headers = {
|
||
"X-Api-App-Id": self.tts_app_id,
|
||
"X-Api-Access-Key": self.tts_access_key,
|
||
"X-Api-Resource-Id": self.tts_resource_id,
|
||
"X-Api-App-Key": self.tts_app_key,
|
||
"Content-Type": "application/json",
|
||
"Connection": "keep-alive"
|
||
}
|
||
|
||
# 构建请求参数
|
||
payload = {
|
||
"user": {
|
||
"uid": "recorder_tts_generator"
|
||
},
|
||
"req_params": {
|
||
"text": text,
|
||
"speaker": self.tts_speaker,
|
||
"audio_params": {
|
||
"format": "pcm",
|
||
"sample_rate": 16000,
|
||
"enable_timestamp": True
|
||
},
|
||
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
|
||
}
|
||
}
|
||
|
||
# 发送请求
|
||
session = requests.Session()
|
||
try:
|
||
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
|
||
|
||
if response.status_code != 200:
|
||
print(f"❌ TTS请求失败: {response.status_code}")
|
||
return False
|
||
|
||
# 处理流式响应
|
||
total_audio_size = 0
|
||
chunk_count = 0
|
||
success_count = 0
|
||
|
||
print("🔄 开始接收TTS音频流...")
|
||
|
||
for chunk in response.iter_lines(decode_unicode=True):
|
||
if not chunk:
|
||
continue
|
||
|
||
try:
|
||
data = json.loads(chunk)
|
||
|
||
if data.get("code", 0) == 0 and "data" in data and data["data"]:
|
||
chunk_audio = base64.b64decode(data["data"])
|
||
audio_size = len(chunk_audio)
|
||
total_audio_size += audio_size
|
||
chunk_count += 1
|
||
|
||
# 将音频块发送到统一播放队列
|
||
try:
|
||
self.audio_playback_queue.put_nowait(chunk_audio)
|
||
success_count += 1
|
||
except queue.Full:
|
||
# 队列满时稍微等待
|
||
time.sleep(0.01)
|
||
try:
|
||
self.audio_playback_queue.put_nowait(chunk_audio)
|
||
success_count += 1
|
||
except queue.Full:
|
||
# 如果还是满的,跳过这个块
|
||
continue
|
||
|
||
# 减少进度显示频率
|
||
if chunk_count % 5 == 0:
|
||
print(f"\r📥 生成音频: {chunk_count} 块 | 成功: {success_count} | {total_audio_size / 1024:.1f} KB", end='', flush=True)
|
||
continue
|
||
|
||
if data.get("code", 0) == 20000000:
|
||
break
|
||
|
||
if data.get("code", 0) > 0:
|
||
print(f"\n❌ TTS错误响应: {data}")
|
||
break
|
||
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0
|
||
print(f"\n✅ TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB")
|
||
|
||
return success_count > 0
|
||
|
||
finally:
|
||
response.close()
|
||
session.close()
|
||
|
||
except Exception as e:
|
||
print(f"❌ TTS音频生成失败: {e}")
|
||
return False
|
||
|
||
def _should_trigger_tts(self, sentence):
|
||
"""智能判断是否应该触发TTS"""
|
||
current_time = time.time()
|
||
|
||
# 检查缓冲区大小
|
||
if len(self.tts_buffer) >= self.tts_buffer_max_size:
|
||
return True
|
||
|
||
# 检查时间窗口
|
||
time_since_last = current_time - self.tts_last_trigger_time
|
||
if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size:
|
||
return True
|
||
|
||
# 检查是否为完整句子(使用新的严格检测)
|
||
if self._is_complete_sentence(sentence):
|
||
return True
|
||
|
||
# 检查句子特征 - 长句子优先(50字符以上)
|
||
if len(sentence) > 50: # 超过50字符的句子立即触发
|
||
return True
|
||
|
||
# 中等长度句子(30-50字符)如果有结束标点也触发
|
||
if len(sentence) > 30:
|
||
end_punctuations = ['。', '!', '?', '.', '!', '?']
|
||
if any(sentence.strip().endswith(p) for p in end_punctuations):
|
||
return True
|
||
|
||
# 短句子只在缓冲区较多或时间窗口到期时触发
|
||
return False
|
||
|
||
def _process_tts_buffer(self):
|
||
"""处理TTS缓冲区"""
|
||
if not self.tts_buffer:
|
||
return
|
||
|
||
# 合并缓冲区的句子
|
||
combined_text = ''.join(self.tts_buffer)
|
||
|
||
# 添加到TTS任务队列
|
||
if self._add_tts_task(combined_text):
|
||
print(f"🎵 触发TTS: {combined_text[:50]}...")
|
||
self.tts_last_trigger_time = time.time()
|
||
|
||
# 清空缓冲区
|
||
self.tts_buffer.clear()
|
||
|
||
def _add_sentence_to_buffer(self, sentence):
|
||
"""添加句子到智能缓冲区"""
|
||
if not sentence.strip():
|
||
return
|
||
|
||
self.tts_buffer.append(sentence)
|
||
|
||
# 检查是否应该触发TTS
|
||
if self._should_trigger_tts(sentence):
|
||
self._process_tts_buffer()
|
||
|
||
def _flush_tts_buffer(self):
|
||
"""强制刷新TTS缓冲区"""
|
||
if self.tts_buffer:
|
||
self._process_tts_buffer()
|
||
|
||
def _load_available_characters(self):
|
||
"""加载可用角色列表"""
|
||
characters = []
|
||
if os.path.exists(self.characters_dir):
|
||
for file in os.listdir(self.characters_dir):
|
||
if file.endswith('.json'):
|
||
characters.append(file[:-5]) # 去掉.json后缀
|
||
return characters
|
||
|
||
def _load_character_config(self, character_name):
|
||
"""加载角色配置"""
|
||
config_file = os.path.join(self.characters_dir, f"{character_name}.json")
|
||
if not os.path.exists(config_file):
|
||
print(f"⚠️ 角色配置文件不存在: {config_file}")
|
||
return None
|
||
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
|
||
print(f"✅ 加载角色: {config.get('name', character_name)}")
|
||
print(f"📝 描述: {config.get('description', '无描述')}")
|
||
|
||
# 如果切换了角色,清空聊天历史
|
||
if hasattr(self, 'current_character') and self.current_character != character_name:
|
||
self.clear_chat_history()
|
||
print(f"🔄 角色已切换,聊天历史已清空")
|
||
|
||
return config
|
||
except Exception as e:
|
||
print(f"❌ 加载角色配置失败: {e}")
|
||
return None
|
||
|
||
def _setup_audio(self, force_reset=False):
|
||
"""设置音频设备"""
|
||
try:
|
||
# 如果强制重置,先完全释放资源
|
||
if force_reset:
|
||
self._force_close_audio_stream()
|
||
# 不终止整个PyAudio实例,只重置输入流
|
||
|
||
# 创建新的PyAudio实例(如果不存在)
|
||
if not self.audio:
|
||
self.audio = pyaudio.PyAudio()
|
||
|
||
# 创建音频输入流
|
||
self.input_stream = self.audio.open(
|
||
format=self.FORMAT,
|
||
channels=self.CHANNELS,
|
||
rate=self.RATE,
|
||
input=True,
|
||
frames_per_buffer=self.CHUNK_SIZE
|
||
)
|
||
print("✅ 音频设备初始化成功")
|
||
self.audio_device_healthy = True # 恢复设备健康状态
|
||
except Exception as e:
|
||
print(f"❌ 音频设备初始化失败: {e}")
|
||
self.audio_device_healthy = False
|
||
|
||
def _force_close_audio_stream(self):
|
||
"""强制关闭音频输入流,避免阻塞"""
|
||
if self.input_stream:
|
||
try:
|
||
# 尝试优雅关闭,设置超时
|
||
import signal
|
||
|
||
def timeout_handler(signum, frame):
|
||
raise TimeoutError("音频流关闭超时")
|
||
|
||
# 设置5秒超时
|
||
signal.signal(signal.SIGALRM, timeout_handler)
|
||
signal.alarm(5)
|
||
|
||
try:
|
||
self.input_stream.stop_stream()
|
||
self.input_stream.close()
|
||
finally:
|
||
signal.alarm(0) # 取消超时
|
||
|
||
except TimeoutError:
|
||
print("⚠️ 音频输入流关闭超时,强制终止")
|
||
self.input_stream = None
|
||
except Exception as e:
|
||
print(f"⚠️ 优雅关闭音频输入流失败: {e}")
|
||
try:
|
||
# 强制关闭
|
||
self.input_stream = None
|
||
except:
|
||
pass
|
||
finally:
|
||
self.input_stream = None
|
||
self.audio_device_healthy = False # 标记设备需要重置
|
||
|
||
def calculate_energy(self, audio_data):
|
||
"""计算音频能量"""
|
||
if len(audio_data) == 0:
|
||
return 0
|
||
|
||
# 将字节数据转换为numpy数组
|
||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||
|
||
# 计算RMS能量,处理可能的无效值
|
||
try:
|
||
rms = np.sqrt(np.mean(audio_array ** 2))
|
||
# 检查是否为有效值
|
||
if np.isnan(rms) or np.isinf(rms):
|
||
return 0
|
||
|
||
# 更新能量历史(只在非录音状态下更新,避免语音影响背景噪音计算)
|
||
if not self.recording:
|
||
self.energy_history.append(rms)
|
||
if len(self.energy_history) > self.max_energy_history:
|
||
self.energy_history.pop(0)
|
||
|
||
return rms
|
||
except:
|
||
return 0
|
||
|
||
def calculate_peak_energy(self, audio_data):
|
||
"""计算峰值能量(辅助判断)"""
|
||
if len(audio_data) == 0:
|
||
return 0
|
||
|
||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||
peak_energy = np.max(np.abs(audio_array))
|
||
|
||
return peak_energy
|
||
|
||
def calculate_zero_crossing_rate(self, audio_data):
|
||
"""计算零交叉率(主要语音检测方法)"""
|
||
if len(audio_data) == 0:
|
||
return 0
|
||
|
||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||
|
||
# 计算零交叉次数
|
||
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
|
||
|
||
# 归一化到采样率
|
||
zcr = zero_crossings / len(audio_array) * self.RATE
|
||
|
||
# 更新ZCR历史
|
||
self.zcr_history.append(zcr)
|
||
if len(self.zcr_history) > self.max_zcr_history:
|
||
self.zcr_history.pop(0)
|
||
|
||
return zcr
|
||
|
||
def is_voice_active_advanced(self, energy, zcr):
|
||
"""仅使用ZCR进行语音活动检测"""
|
||
# ZCR语音检测:调整到2400-12000 Hz之间,适应16000Hz采样率
|
||
# 16000Hz采样率时,正常语音的ZCR范围会翻倍
|
||
zcr_condition = 2400 < zcr < 12000
|
||
|
||
# 添加一些容错,避免短暂的ZCR波动导致误判
|
||
return zcr_condition
|
||
|
||
|
||
def is_voice_active(self, energy):
|
||
"""已弃用 - 仅用于兼容性"""
|
||
# 现在主要使用ZCR检测,这个方法保留但不再使用
|
||
return False
|
||
|
||
def save_recording(self, audio_data, filename=None):
|
||
"""保存录音"""
|
||
if filename is None:
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
filename = f"recording_{timestamp}.wav"
|
||
|
||
try:
|
||
with wave.open(filename, 'wb') as wf:
|
||
wf.setnchannels(self.CHANNELS)
|
||
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
|
||
wf.setframerate(self.RATE)
|
||
wf.writeframes(audio_data)
|
||
|
||
print(f"✅ 录音已保存: {filename}")
|
||
print(f"📊 音频格式参数:")
|
||
print(f" - 采样率: {self.RATE} Hz")
|
||
print(f" - 声道数: {self.CHANNELS}")
|
||
print(f" - 位深度: {self.audio.get_sample_size(self.FORMAT) * 8} bits")
|
||
print(f" - 格式: PCM int16 小端序")
|
||
return True, filename
|
||
except Exception as e:
|
||
print(f"❌ 保存录音失败: {e}")
|
||
return False, None
|
||
|
||
def play_audio(self, filename):
|
||
"""播放音频文件"""
|
||
try:
|
||
print("🔇 准备播放,强制停止音频输入")
|
||
|
||
# 立即停止当前录音并清空所有缓冲区
|
||
if self.recording:
|
||
self.recording = False
|
||
self.recorded_frames = []
|
||
self.recording_start_time = None
|
||
self.last_sound_time = None
|
||
|
||
# 清空所有缓冲区
|
||
self.pre_record_buffer = []
|
||
self.energy_history = []
|
||
self.zcr_history = []
|
||
|
||
# 强制关闭输入流,避免阻塞
|
||
self._force_close_audio_stream()
|
||
|
||
# 设置播放状态
|
||
self.is_playing = True
|
||
|
||
# 等待一小段时间确保音频设备完全停止输入
|
||
time.sleep(0.5)
|
||
|
||
with wave.open(filename, 'rb') as wf:
|
||
channels = wf.getnchannels()
|
||
width = wf.getsampwidth()
|
||
rate = wf.getframerate()
|
||
total_frames = wf.getnframes()
|
||
|
||
# 分块读取音频数据
|
||
chunk_size = 1024
|
||
frames = []
|
||
|
||
for _ in range(0, total_frames, chunk_size):
|
||
chunk = wf.readframes(chunk_size)
|
||
if chunk:
|
||
frames.append(chunk)
|
||
else:
|
||
break
|
||
|
||
# 创建播放流
|
||
playback_stream = self.audio.open(
|
||
format=self.audio.get_format_from_width(width),
|
||
channels=channels,
|
||
rate=rate,
|
||
output=True
|
||
)
|
||
|
||
print(f"🔊 开始播放: {filename}")
|
||
print("🚫 音频输入已完全关闭")
|
||
|
||
# 分块播放音频
|
||
for chunk in frames:
|
||
playback_stream.write(chunk)
|
||
|
||
playback_stream.stop_stream()
|
||
playback_stream.close()
|
||
|
||
print("✅ 播放完成")
|
||
print("🔄 重新开启音频输入")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 播放失败: {e}")
|
||
self.play_with_system_player(filename)
|
||
finally:
|
||
# 恢复播放状态
|
||
self.is_playing = False
|
||
|
||
# 等待播放完全结束
|
||
time.sleep(0.3)
|
||
|
||
# 重新开启输入流(强制重置)
|
||
self._setup_audio(force_reset=True)
|
||
|
||
# 重置所有状态
|
||
self.energy_history = []
|
||
self.zcr_history = []
|
||
print("📡 音频输入已重新开启(强制重置)")
|
||
|
||
def play_with_system_player(self, filename):
|
||
"""使用系统播放器播放音频"""
|
||
try:
|
||
import subprocess
|
||
import platform
|
||
|
||
# 获取文件扩展名
|
||
file_ext = filename.lower().split('.')[-1] if '.' in filename else ''
|
||
|
||
# 根据文件类型和平台选择播放命令
|
||
if file_ext == 'mp3':
|
||
# MP3文件播放
|
||
system = platform.system().lower()
|
||
|
||
if system == 'linux':
|
||
# Linux系统 - 尝试多个MP3播放器
|
||
mp3_players = [
|
||
['mpg123', filename], # 最常用的MP3播放器
|
||
['mpg321', filename], # 另一个MP3播放器
|
||
['mplayer', filename], # 通用媒体播放器
|
||
['cvlc', '--play-and-exit', filename], # VLC命令行版本
|
||
['ffplay', '-nodisp', '-autoexit', filename] # FFmpeg播放器
|
||
]
|
||
|
||
cmd = None
|
||
for player in mp3_players:
|
||
try:
|
||
subprocess.run(['which', player[0]], capture_output=True, check=True)
|
||
cmd = player
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if cmd is None:
|
||
raise Exception("未找到可用的MP3播放器,请安装 mpg123 或 mplayer")
|
||
|
||
elif system == 'darwin': # macOS
|
||
cmd = ['afplay', filename]
|
||
|
||
elif system == 'windows':
|
||
cmd = ['cmd', '/c', 'start', '/min', filename]
|
||
|
||
else:
|
||
cmd = ['aplay', filename] # 默认,可能会失败
|
||
|
||
elif file_ext == 'pcm':
|
||
# PCM文件播放 - 需要指定格式
|
||
cmd = ['aplay', '-f', 'S16_LE', '-r', '16000', '-c', '1', filename]
|
||
|
||
else:
|
||
# WAV文件或其他格式
|
||
cmd = ['aplay', filename] # Linux系统
|
||
|
||
print(f"🔊 使用系统播放器: {' '.join(cmd)}")
|
||
print("🚫 系统播放器播放中,音频输入保持关闭")
|
||
subprocess.run(cmd, check=True)
|
||
print("✅ 播放完成")
|
||
print("📡 音频输入已保持关闭状态")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 系统播放器失败: {e}")
|
||
|
||
# 尝试使用pygame作为备选方案
|
||
try:
|
||
self._play_with_pygame(filename)
|
||
except Exception as pygame_error:
|
||
print(f"❌ pygame播放也失败: {pygame_error}")
|
||
raise e
|
||
|
||
def _check_audio_player(self):
|
||
"""检查系统是否支持音频播放"""
|
||
try:
|
||
import subprocess
|
||
import platform
|
||
|
||
system = platform.system().lower()
|
||
|
||
if system == 'linux':
|
||
# 检查aplay(用于PCM和WAV播放)
|
||
try:
|
||
subprocess.run(['which', 'aplay'], capture_output=True, check=True)
|
||
print("✅ 找到音频播放器: aplay")
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
# 检查pygame作为备选方案
|
||
try:
|
||
import pygame
|
||
print("✅ 找到pygame作为音频播放备选方案")
|
||
return True
|
||
except ImportError:
|
||
pass
|
||
|
||
return False
|
||
|
||
elif system == 'darwin': # macOS
|
||
# 检查afplay
|
||
try:
|
||
subprocess.run(['which', 'afplay'], capture_output=True, check=True)
|
||
print("✅ 找到音频播放器: afplay")
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
elif system == 'windows':
|
||
# Windows通常支持音频播放
|
||
return True
|
||
|
||
else:
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 检查音频播放器时出错: {e}")
|
||
return False
|
||
|
||
def _play_with_pygame(self, filename):
|
||
"""使用pygame播放音频作为备选方案"""
|
||
try:
|
||
import pygame
|
||
pygame.mixer.init()
|
||
|
||
print(f"🔊 尝试使用pygame播放: {filename}")
|
||
|
||
# 加载并播放音频
|
||
sound = pygame.mixer.Sound(filename)
|
||
sound.play()
|
||
|
||
# 等待播放完成
|
||
while pygame.mixer.get_busy():
|
||
pygame.time.Clock().tick(10)
|
||
|
||
print("✅ pygame播放完成")
|
||
|
||
except ImportError:
|
||
raise Exception("pygame未安装")
|
||
except Exception as e:
|
||
raise Exception(f"pygame播放失败: {e}")
|
||
finally:
|
||
try:
|
||
pygame.mixer.quit()
|
||
except:
|
||
pass
|
||
|
||
def play_audio_streaming(self, audio_chunks):
|
||
"""智能流式播放音频数据"""
|
||
try:
|
||
if not audio_chunks:
|
||
return False
|
||
|
||
print("🔊 开始智能流式播放音频...")
|
||
|
||
# 确保音频输入已停止
|
||
if self.recording:
|
||
self.recording = False
|
||
self.recorded_frames = []
|
||
self.recording_start_time = None
|
||
self.last_sound_time = None
|
||
|
||
# 清空缓冲区
|
||
self.pre_record_buffer = []
|
||
self.energy_history = []
|
||
self.zcr_history = []
|
||
|
||
# 强制关闭输入流
|
||
self._force_close_audio_stream()
|
||
|
||
self.is_playing = True
|
||
time.sleep(0.3) # 等待音频设备切换
|
||
|
||
# 创建播放流,设置更大的缓冲区
|
||
playback_stream = self.audio.open(
|
||
format=self.FORMAT,
|
||
channels=self.CHANNELS,
|
||
rate=self.RATE,
|
||
output=True,
|
||
frames_per_buffer=2048 # 增加缓冲区大小
|
||
)
|
||
|
||
print("🚫 音频输入已关闭,开始智能流式播放")
|
||
|
||
# 预加载前几个音频块以确保流畅播放
|
||
preload_chunks = 2
|
||
buffer_data = b''
|
||
|
||
# 预加载阶段
|
||
for i in range(min(preload_chunks, len(audio_chunks))):
|
||
if audio_chunks[i]:
|
||
buffer_data += audio_chunks[i]
|
||
progress = (i + 1) / len(audio_chunks) * 100
|
||
print(f"\r📥 预加载音频: {progress:.1f}%", end='', flush=True)
|
||
|
||
# 播放预加载的音频
|
||
if buffer_data:
|
||
playback_stream.write(buffer_data)
|
||
|
||
# 继续播放剩余音频块
|
||
start_idx = preload_chunks
|
||
for i in range(start_idx, len(audio_chunks)):
|
||
if audio_chunks[i]:
|
||
playback_stream.write(audio_chunks[i])
|
||
progress = (i + 1) / len(audio_chunks) * 100
|
||
print(f"\r🔊 流式播放进度: {progress:.1f}%", end='', flush=True)
|
||
|
||
# 确保所有数据都被播放
|
||
playback_stream.stop_stream()
|
||
playback_stream.close()
|
||
|
||
print("\n✅ 智能流式播放完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 智能流式播放失败: {e}")
|
||
return False
|
||
finally:
|
||
self.is_playing = False
|
||
time.sleep(0.3)
|
||
|
||
def play_audio_realtime(self, audio_queue):
|
||
"""音频队列转发器:将音频数据转发到统一播放队列"""
|
||
try:
|
||
print("🔊 启动音频队列转发器...")
|
||
|
||
chunks_forwarded = 0
|
||
total_size = 0
|
||
|
||
# 持续从队列中获取音频数据并转发到统一播放队列
|
||
while True:
|
||
try:
|
||
# 设置超时以避免无限等待
|
||
chunk = audio_queue.get(timeout=0.5)
|
||
|
||
if chunk is None: # 结束信号
|
||
print("\n📥 收到转发结束信号")
|
||
break
|
||
|
||
if chunk: # 确保chunk不为空
|
||
# 转发到统一播放队列
|
||
try:
|
||
self.audio_playback_queue.put_nowait(chunk)
|
||
chunks_forwarded += 1
|
||
total_size += len(chunk)
|
||
|
||
# 减少进度显示频率以提高性能
|
||
if chunks_forwarded % 10 == 0:
|
||
print(f"\r📤 转发音频: {chunks_forwarded} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
|
||
except queue.Full:
|
||
print("⚠️ 统一播放队列满,丢弃音频块")
|
||
continue
|
||
|
||
audio_queue.task_done()
|
||
|
||
except queue.Empty:
|
||
# 队列为空,检查是否还在接收数据
|
||
if not hasattr(self, '_receiving_audio') or not self._receiving_audio:
|
||
print("\n📡 音频接收完成,转发器结束")
|
||
break
|
||
# 继续等待,不要显示太多调试信息
|
||
continue
|
||
except Exception as e:
|
||
print(f"\n❌ 转发过程中出错: {e}")
|
||
break
|
||
|
||
print(f"\n✅ 音频转发完成: {chunks_forwarded} 块, {total_size / 1024:.1f} KB")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 音频转发失败: {e}")
|
||
return False
|
||
|
||
def play_audio_hybrid(self, audio_chunks):
|
||
"""混合模式播放:智能选择流式或传统播放"""
|
||
try:
|
||
if not audio_chunks:
|
||
return False
|
||
|
||
# 根据音频块数量和大小决定播放策略
|
||
total_size = sum(len(chunk) for chunk in audio_chunks)
|
||
chunk_count = len(audio_chunks)
|
||
|
||
print(f"📊 音频分析: {chunk_count} 块, 总大小: {total_size / 1024:.1f} KB")
|
||
|
||
# 决策策略:
|
||
# 1. 如果音频块很少或总大小很小,使用传统播放(音质更好)
|
||
# 2. 如果音频块很多或总大小很大,使用流式播放(响应更快)
|
||
if chunk_count <= 3 or total_size < 50 * 1024: # 小于50KB或少于3块
|
||
print("🎵 选择传统播放模式(保证音质)")
|
||
# 合并所有音频块
|
||
full_audio = b''.join(audio_chunks)
|
||
|
||
# 临时保存到文件
|
||
temp_file = self.generate_tts_filename()
|
||
with open(temp_file, "wb") as f:
|
||
f.write(full_audio)
|
||
|
||
# 使用传统方式播放
|
||
success = self.play_audio_safe(temp_file, reopen_input=False)
|
||
|
||
# 删除临时文件
|
||
self._safe_delete_file(temp_file, "临时音频文件")
|
||
|
||
return success
|
||
else:
|
||
print("⚡ 选择智能流式播放模式(快速响应)")
|
||
return self.play_audio_streaming(audio_chunks)
|
||
|
||
except Exception as e:
|
||
print(f"❌ 混合播放失败: {e}")
|
||
return False
|
||
|
||
def play_audio_safe(self, filename, reopen_input=False):
|
||
"""安全的播放方式 - 使用系统播放器"""
|
||
try:
|
||
print("🔇 准备播放,强制停止音频输入")
|
||
|
||
# 立即停止当前录音并清空所有缓冲区
|
||
if self.recording:
|
||
self.recording = False
|
||
self.recorded_frames = []
|
||
self.recording_start_time = None
|
||
self.last_sound_time = None
|
||
|
||
# 清空所有缓冲区
|
||
self.pre_record_buffer = []
|
||
self.energy_history = []
|
||
self.zcr_history = []
|
||
|
||
# 强制关闭输入流
|
||
self._force_close_audio_stream()
|
||
|
||
# 设置播放状态
|
||
self.is_playing = True
|
||
|
||
# 等待确保音频设备完全停止
|
||
time.sleep(0.5)
|
||
|
||
print(f"🔊 开始播放: {filename}")
|
||
print("🚫 使用系统播放器,音频输入已完全关闭")
|
||
|
||
# 使用系统播放器
|
||
self.play_with_system_player(filename)
|
||
|
||
if reopen_input:
|
||
print("🔄 准备重新开启音频输入")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 播放失败: {e}")
|
||
finally:
|
||
# 恢复播放状态
|
||
self.is_playing = False
|
||
|
||
# 等待播放完全结束
|
||
time.sleep(0.5)
|
||
|
||
# 只在需要时重新开启输入流
|
||
if reopen_input:
|
||
# 重新开启输入流(强制重置)
|
||
self._setup_audio(force_reset=True)
|
||
|
||
# 重置所有状态
|
||
self.energy_history = []
|
||
self.zcr_history = []
|
||
print("📡 音频输入已重新开启(强制重置)")
|
||
|
||
def update_pre_record_buffer(self, audio_data):
|
||
"""更新预录音缓冲区"""
|
||
self.pre_record_buffer.append(audio_data)
|
||
|
||
# 保持缓冲区大小
|
||
if len(self.pre_record_buffer) > self.pre_record_max_frames:
|
||
self.pre_record_buffer.pop(0)
|
||
|
||
def start_recording(self):
|
||
"""开始录音"""
|
||
print("🎙️ 检测到声音,开始录音...")
|
||
self.recording = True
|
||
self.recorded_frames = []
|
||
|
||
# 将预录音缓冲区的内容添加到录音中
|
||
self.recorded_frames.extend(self.pre_record_buffer)
|
||
|
||
# 清空预录音缓冲区
|
||
self.pre_record_buffer = []
|
||
|
||
self.recording_start_time = time.time()
|
||
self.last_sound_time = time.time()
|
||
self.energy_history = []
|
||
self.zcr_history = [] # 重置ZCR历史
|
||
|
||
# 重置ZCR相关计数器
|
||
self.consecutive_low_zcr_count = 0
|
||
self.consecutive_silence_count = 0
|
||
self.voice_activity_history = []
|
||
|
||
def stop_recording(self):
|
||
"""停止录音"""
|
||
if len(self.recorded_frames) > 0:
|
||
audio_data = b''.join(self.recorded_frames)
|
||
duration = len(audio_data) / (self.RATE * 2) # 16位音频,每样本2字节
|
||
|
||
# 计算实际录音时长和预录音时长
|
||
actual_duration = duration
|
||
pre_record_duration = min(duration, self.pre_record_duration)
|
||
|
||
print(f"📝 录音完成,时长: {actual_duration:.2f}秒 (包含预录音 {pre_record_duration:.1f}秒)")
|
||
|
||
# 保存录音
|
||
success, filename = self.save_recording(audio_data)
|
||
|
||
# 如果保存成功,进行后续处理
|
||
if success and filename:
|
||
print("=" * 50)
|
||
print("📡 音频输入已保持关闭状态")
|
||
print("🔄 开始处理音频...")
|
||
|
||
# 语音识别和LLM调用
|
||
if self.enable_asr and websockets is not None:
|
||
print("🤖 开始语音识别...")
|
||
asr_result = self.recognize_audio_sync(filename)
|
||
if asr_result and 'payload_msg' in asr_result:
|
||
result_text = asr_result['payload_msg'].get('result', [])
|
||
if result_text:
|
||
text = result_text[0].get('text', '识别失败')
|
||
print(f"📝 识别结果: {text}")
|
||
|
||
# 调用大语言模型
|
||
if self.enable_llm and text != '识别失败':
|
||
print("-" * 50)
|
||
llm_response = self.call_llm(text)
|
||
if llm_response:
|
||
print(f"💬 AI助手回复: {llm_response}")
|
||
|
||
# 调用文本转语音
|
||
if self.enable_tts:
|
||
print("-" * 50)
|
||
# 检查是否已经在流式处理中播放了语音
|
||
if not hasattr(self, '_streaming_tts_completed') or not self._streaming_tts_completed:
|
||
tts_file = self.text_to_speech(llm_response)
|
||
if tts_file:
|
||
print("✅ AI语音回复完成")
|
||
# 删除录音文件
|
||
self._safe_delete_file(filename, "录音文件")
|
||
else:
|
||
print("❌ 文本转语音失败")
|
||
else:
|
||
print("✅ AI语音回复已完成(流式播放)")
|
||
# 删除录音文件
|
||
self._safe_delete_file(filename, "录音文件")
|
||
# 重置流式标记
|
||
self._streaming_tts_completed = False
|
||
else:
|
||
print("ℹ️ 文本转语音功能已禁用")
|
||
# 如果不启用TTS,直接删除录音文件
|
||
self._safe_delete_file(filename, "录音文件")
|
||
else:
|
||
print("❌ 大语言模型调用失败")
|
||
else:
|
||
if not self.enable_llm:
|
||
print("ℹ️ 大语言模型功能已禁用")
|
||
elif not self.llm_api_key:
|
||
print("ℹ️ 请设置 ARK_API_KEY 环境变量以启用大语言模型功能")
|
||
else:
|
||
print("❌ 语音识别失败: 无结果")
|
||
else:
|
||
print("❌ 语音识别失败")
|
||
else:
|
||
if not self.enable_asr:
|
||
print("ℹ️ 语音识别功能已禁用")
|
||
elif websockets is None:
|
||
print("ℹ️ 请安装 websockets 库以启用语音识别功能")
|
||
|
||
# 等待音频播放完成后再重新开启音频输入
|
||
print("⏳ 等待音频播放完成...")
|
||
|
||
# 检查多个队列的状态
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
playback_queue_size = self.audio_playback_queue.qsize()
|
||
|
||
while tts_queue_size > 0 or playback_queue_size > 0 or self.currently_playing:
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
playback_queue_size = self.audio_playback_queue.qsize()
|
||
|
||
if tts_queue_size > 0:
|
||
status = f"🎵 TTS生成中... TTS队列: {tts_queue_size} 播放队列: {playback_queue_size}"
|
||
elif self.currently_playing:
|
||
status = f"🔊 正在播放... 播放队列: {playback_queue_size}"
|
||
else:
|
||
status = f"⏳ 等待播放... 播放队列: {playback_queue_size}"
|
||
|
||
print(f"\r{status}", end='', flush=True)
|
||
time.sleep(0.1)
|
||
|
||
# 额外等待1秒,确保音频设备完全停止
|
||
print("\n⏳ 等待音频设备完全停止...")
|
||
time.sleep(1.0)
|
||
|
||
print("🔄 音频播放完成,准备重新开启音频输入")
|
||
|
||
# 强制重置音频设备,确保完全关闭
|
||
print("🔄 强制重置音频设备...")
|
||
self._force_close_audio_stream()
|
||
|
||
self.recording = False
|
||
self.recorded_frames = []
|
||
self.recording_start_time = None
|
||
self.last_sound_time = None
|
||
self.energy_history = []
|
||
self.zcr_history = []
|
||
|
||
def get_average_energy(self):
|
||
"""计算平均能量"""
|
||
if len(self.energy_history) == 0:
|
||
return 0
|
||
return np.mean(self.energy_history)
|
||
|
||
def monitor_performance(self):
|
||
"""性能监控"""
|
||
self.frame_count += 1
|
||
if self.frame_count % 1000 == 0: # 每1000帧显示一次
|
||
elapsed = time.time() - self.start_time
|
||
fps = self.frame_count / elapsed
|
||
avg_energy = self.get_average_energy()
|
||
print(f"📊 性能: {fps:.1f} FPS | 平均能量: {avg_energy:.1f} | 阈值: {self.energy_threshold}")
|
||
|
||
def auto_adjust_threshold(self):
|
||
"""自动调整能量阈值"""
|
||
if len(self.energy_history) >= 20:
|
||
# 基于历史能量的中位数和标准差调整阈值
|
||
median_energy = np.median(self.energy_history)
|
||
std_energy = np.std(self.energy_history)
|
||
|
||
# 设置阈值为中位数 + 2倍标准差
|
||
new_threshold = max(300, median_energy + 2 * std_energy)
|
||
|
||
# 平滑调整阈值
|
||
self.energy_threshold = 0.9 * self.energy_threshold + 0.1 * new_threshold
|
||
|
||
def run(self):
|
||
"""运行录音系统"""
|
||
if not self.input_stream:
|
||
print("❌ 音频设备未初始化")
|
||
return
|
||
|
||
self.running = True
|
||
print("🎤 开始监听...")
|
||
print(f"能量阈值: {self.energy_threshold} (已弃用)")
|
||
print(f"静音阈值: {self.silence_threshold}秒")
|
||
print("📖 使用说明:")
|
||
print("- 检测到声音自动开始录音")
|
||
print("- 持续静音3秒自动结束录音")
|
||
print("- 最少录音2秒,最多30秒")
|
||
print("- 录音完成后自动播放")
|
||
print("- 按 Ctrl+C 退出")
|
||
print("🎯 新增功能:")
|
||
print("- 纯ZCR语音检测(移除能量检测)")
|
||
print("- 零交叉率检测(区分语音和噪音)")
|
||
print("- 实时显示ZCR状态")
|
||
print("- 预录音功能(包含声音开始前2秒)")
|
||
print("- 环形缓冲区防止丢失开头音频")
|
||
print("🤖 纯ZCR静音检测:")
|
||
print("- 连续低ZCR计数(20次=2秒)")
|
||
print("- ZCR活动历史追踪")
|
||
print("- 基于ZCR模式的静音验证")
|
||
print("- 语音范围: 2400-12000 Hz (适应16kHz采样率)")
|
||
print("=" * 50)
|
||
|
||
try:
|
||
while self.running:
|
||
# 检查音频流是否可用
|
||
if self.input_stream is None:
|
||
print("\n❌ 音频流已断开,尝试重新连接...")
|
||
self._setup_audio(force_reset=True)
|
||
if self.input_stream is None:
|
||
print("❌ 音频流重连失败,等待...")
|
||
time.sleep(1)
|
||
continue
|
||
|
||
# 读取音频数据
|
||
try:
|
||
data = self.input_stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
|
||
except Exception as e:
|
||
print(f"\n❌ 读取音频数据失败: {e}")
|
||
self.input_stream = None
|
||
continue
|
||
|
||
if len(data) == 0:
|
||
continue
|
||
|
||
# 检查设备健康状态、TTS状态和播放冷却期 - 防止回声
|
||
current_time = time.time()
|
||
time_since_last_play = current_time - self.last_playback_time
|
||
in_cooldown = time_since_last_play < self.playback_cooldown_period
|
||
|
||
# 检查TTS和播放队列状态
|
||
tts_active = not self.tts_task_queue.empty()
|
||
playback_active = not self.audio_playback_queue.empty() or self.currently_playing
|
||
|
||
# 如果设备不健康、正在播放、TTS生成中、播放队列不为空、或在冷却期内,完全跳过音频处理
|
||
if not self.audio_device_healthy or self.is_playing or tts_active or playback_active or in_cooldown:
|
||
# 显示播放状态
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
queue_size = self.audio_playback_queue.qsize()
|
||
|
||
if not self.audio_device_healthy:
|
||
status = f"🔧 设备重置中... TTS: {tts_queue_size} 播放: {queue_size}"
|
||
elif tts_active:
|
||
status = f"🎵 TTS生成中... TTS: {tts_queue_size} 播放: {queue_size}"
|
||
elif in_cooldown:
|
||
cooldown_time = self.playback_cooldown_period - time_since_last_play
|
||
status = f"🔊 播放冷却中... {cooldown_time:.1f}s TTS: {tts_queue_size} 播放: {queue_size}"
|
||
else:
|
||
playing_status = "播放中" if self.currently_playing else "等待播放"
|
||
status = f"🔊 {playing_status}... TTS: {tts_queue_size} 播放: {queue_size}"
|
||
print(f"\r{status}", end='', flush=True)
|
||
time.sleep(0.1) # 播放时增加延迟减少CPU使用
|
||
continue
|
||
|
||
# 计算能量和零交叉率
|
||
energy = self.calculate_energy(data)
|
||
zcr = self.calculate_zero_crossing_rate(data)
|
||
peak_energy = self.calculate_peak_energy(data)
|
||
|
||
# 性能监控
|
||
self.monitor_performance()
|
||
|
||
if self.recording:
|
||
# 录音模式
|
||
self.recorded_frames.append(data)
|
||
recording_duration = time.time() - self.recording_start_time
|
||
|
||
# 基于ZCR的智能静音检测
|
||
if self.is_voice_active_advanced(energy, zcr):
|
||
self.last_sound_time = time.time()
|
||
self.consecutive_low_zcr_count = 0 # 重置低ZCR计数
|
||
self.consecutive_silence_count = 0 # 重置静音计数
|
||
else:
|
||
self.consecutive_low_zcr_count += 1 # 增加低ZCR计数
|
||
self.consecutive_silence_count += 1 # 增加静音计数
|
||
|
||
# 更新ZCR活动历史(基于ZCR是否在语音范围内)
|
||
self.voice_activity_history.append(2400 < zcr < 12000)
|
||
if len(self.voice_activity_history) > self.max_voice_history:
|
||
self.voice_activity_history.pop(0)
|
||
|
||
# 检查是否应该结束录音
|
||
current_time = time.time()
|
||
|
||
# 纯ZCR静音检测
|
||
should_stop = False
|
||
stop_reason = ""
|
||
|
||
# 主要检测:连续低ZCR计数
|
||
if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count:
|
||
# 进一步验证:检查最近的ZCR活动历史
|
||
if len(self.voice_activity_history) >= 15:
|
||
recent_voice_activity = sum(self.voice_activity_history[-15:])
|
||
if recent_voice_activity <= 3: # 最近15个样本中最多3个有语音活动
|
||
should_stop = True
|
||
stop_reason = f"ZCR静音检测 ({self.consecutive_low_zcr_count}次连续低ZCR)"
|
||
else:
|
||
# 如果历史数据不足,使用基础检测
|
||
should_stop = True
|
||
stop_reason = f"基础ZCR静音检测 ({self.consecutive_low_zcr_count}次)"
|
||
|
||
# 备用检测:基于时间的静音检测
|
||
if not should_stop and current_time - self.last_sound_time > self.silence_threshold:
|
||
should_stop = True
|
||
stop_reason = f"时间静音检测 ({self.silence_threshold}秒)"
|
||
|
||
# 执行停止录音
|
||
if should_stop and recording_duration >= self.min_recording_time:
|
||
print(f"\n🔇 {stop_reason},结束录音")
|
||
self.stop_recording()
|
||
|
||
# 检查最大录音时间
|
||
if recording_duration > self.max_recording_time:
|
||
print(f"\n⏰ 达到最大录音时间 {self.max_recording_time}秒")
|
||
self.stop_recording()
|
||
|
||
# 显示录音状态(仅ZCR相关信息)
|
||
is_voice = self.is_voice_active_advanced(energy, zcr)
|
||
zcr_progress = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}"
|
||
recent_activity = sum(self.voice_activity_history[-5:]) if len(self.voice_activity_history) >= 5 else 0
|
||
status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 低ZCR计数: {zcr_progress} | 活动: {recent_activity}"
|
||
print(f"\r{status}", end='', flush=True)
|
||
|
||
else:
|
||
# 监听模式 - 更新预录音缓冲区
|
||
self.update_pre_record_buffer(data)
|
||
|
||
# 使用高级检测
|
||
if self.is_voice_active_advanced(energy, zcr):
|
||
# 检测到声音,开始录音
|
||
self.start_recording()
|
||
else:
|
||
# 显示监听状态(仅ZCR相关信息)
|
||
is_voice = self.is_voice_active_advanced(energy, zcr)
|
||
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
|
||
status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%"
|
||
print(f"\r{status}", end='', flush=True)
|
||
|
||
# 减少CPU使用
|
||
time.sleep(0.01)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n👋 退出")
|
||
except Exception as e:
|
||
print(f"❌ 错误: {e}")
|
||
finally:
|
||
self.stop()
|
||
|
||
def stop(self):
|
||
"""停止系统"""
|
||
self.running = False
|
||
if self.recording:
|
||
self.stop_recording()
|
||
|
||
# 停止TTS工作线程
|
||
self.tts_worker_running = False
|
||
if self.tts_worker_thread:
|
||
self.tts_task_queue.put(None) # 发送结束信号
|
||
self.tts_worker_thread.join(timeout=2.0)
|
||
|
||
# 停止播放工作线程
|
||
self.playback_worker_running = False
|
||
if self.playback_worker_thread:
|
||
self.audio_playback_queue.put(None) # 发送结束信号
|
||
self.playback_worker_thread.join(timeout=3.0)
|
||
|
||
if self.input_stream:
|
||
self.input_stream.stop_stream()
|
||
self.input_stream.close()
|
||
|
||
if self.audio:
|
||
self.audio.terminate()
|
||
|
||
def generate_asr_header(self, message_type=1, message_type_specific_flags=0):
|
||
"""生成ASR请求头部"""
|
||
PROTOCOL_VERSION = 0b0001
|
||
DEFAULT_HEADER_SIZE = 0b0001
|
||
JSON = 0b0001
|
||
GZIP = 0b0001
|
||
|
||
header = bytearray()
|
||
header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE)
|
||
header.append((message_type << 4) | message_type_specific_flags)
|
||
header.append((JSON << 4) | GZIP)
|
||
header.append(0x00) # reserved
|
||
return header
|
||
|
||
def parse_asr_response(self, res):
|
||
"""解析ASR响应"""
|
||
PROTOCOL_VERSION = res[0] >> 4
|
||
header_size = res[0] & 0x0f
|
||
message_type = res[1] >> 4
|
||
message_type_specific_flags = res[1] & 0x0f
|
||
serialization_method = res[2] >> 4
|
||
message_compression = res[2] & 0x0f
|
||
reserved = res[3]
|
||
header_extensions = res[4:header_size * 4]
|
||
payload = res[header_size * 4:]
|
||
result = {}
|
||
payload_msg = None
|
||
payload_size = 0
|
||
|
||
if message_type == 0b1001: # SERVER_FULL_RESPONSE
|
||
payload_size = int.from_bytes(payload[:4], "big", signed=True)
|
||
payload_msg = payload[4:]
|
||
elif message_type == 0b1011: # SERVER_ACK
|
||
seq = int.from_bytes(payload[:4], "big", signed=True)
|
||
result['seq'] = seq
|
||
if len(payload) >= 8:
|
||
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
|
||
payload_msg = payload[8:]
|
||
elif message_type == 0b1111: # SERVER_ERROR_RESPONSE
|
||
code = int.from_bytes(payload[:4], "big", signed=False)
|
||
result['code'] = code
|
||
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
|
||
payload_msg = payload[8:]
|
||
|
||
if payload_msg is None:
|
||
return result
|
||
|
||
if message_compression == 0b0001: # GZIP
|
||
payload_msg = gzip.decompress(payload_msg)
|
||
|
||
if serialization_method == 0b0001: # JSON
|
||
payload_msg = json.loads(str(payload_msg, "utf-8"))
|
||
|
||
result['payload_msg'] = payload_msg
|
||
result['payload_size'] = payload_size
|
||
return result
|
||
|
||
async def recognize_audio(self, audio_path):
|
||
"""识别音频文件"""
|
||
if not self.enable_asr or websockets is None:
|
||
return None
|
||
|
||
try:
|
||
print("🤖 开始语音识别...")
|
||
|
||
# 读取音频文件
|
||
with open(audio_path, mode="rb") as f:
|
||
audio_data = f.read()
|
||
|
||
# 构建请求
|
||
reqid = str(uuid.uuid4())
|
||
request_params = {
|
||
'app': {
|
||
'appid': self.asr_appid,
|
||
'cluster': self.asr_cluster,
|
||
'token': self.asr_token,
|
||
},
|
||
'user': {
|
||
'uid': 'recorder_asr'
|
||
},
|
||
'request': {
|
||
'reqid': reqid,
|
||
'nbest': 1,
|
||
'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate',
|
||
'show_language': False,
|
||
'show_utterances': False,
|
||
'result_type': 'full',
|
||
"sequence": 1
|
||
},
|
||
'audio': {
|
||
'format': 'wav',
|
||
'rate': self.RATE,
|
||
'language': 'zh-CN',
|
||
'bits': 16,
|
||
'channel': self.CHANNELS,
|
||
'codec': 'raw'
|
||
}
|
||
}
|
||
|
||
# 构建头部
|
||
payload_bytes = str.encode(json.dumps(request_params))
|
||
payload_bytes = gzip.compress(payload_bytes)
|
||
full_client_request = bytearray(self.generate_asr_header())
|
||
full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
|
||
full_client_request.extend(payload_bytes)
|
||
|
||
# 设置认证头
|
||
additional_headers = {'Authorization': 'Bearer; {}'.format(self.asr_token)}
|
||
|
||
# 连接WebSocket并发送请求
|
||
async with websockets.connect(self.asr_ws_url, additional_headers=additional_headers, max_size=1000000000) as ws:
|
||
# 发送完整请求
|
||
await ws.send(full_client_request)
|
||
res = await ws.recv()
|
||
result = self.parse_asr_response(res)
|
||
|
||
if 'payload_msg' in result and result['payload_msg'].get('code') != 1000:
|
||
print(f"❌ ASR请求失败: {result['payload_msg']}")
|
||
return None
|
||
|
||
# 分块发送音频数据
|
||
chunk_size = int(self.CHANNELS * 2 * self.RATE * 15000 / 1000) # 15ms chunks
|
||
|
||
for offset in range(0, len(audio_data), chunk_size):
|
||
chunk = audio_data[offset:offset + chunk_size]
|
||
last = (offset + chunk_size) >= len(audio_data)
|
||
|
||
payload_bytes = gzip.compress(chunk)
|
||
audio_only_request = bytearray(self.generate_asr_header(message_type=0b0010, message_type_specific_flags=0b0010 if last else 0))
|
||
audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
|
||
audio_only_request.extend(payload_bytes)
|
||
|
||
await ws.send(audio_only_request)
|
||
res = await ws.recv()
|
||
result = self.parse_asr_response(res)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"❌ 语音识别失败: {e}")
|
||
return None
|
||
|
||
def recognize_audio_sync(self, audio_path):
|
||
"""同步版本的语音识别"""
|
||
if not self.enable_asr or websockets is None:
|
||
return None
|
||
|
||
try:
|
||
return asyncio.run(self.recognize_audio(audio_path))
|
||
except Exception as e:
|
||
print(f"❌ 语音识别失败: {e}")
|
||
return None
|
||
|
||
def call_llm(self, user_message):
|
||
"""调用大语言模型API - 支持流式输出"""
|
||
if not self.enable_llm:
|
||
return None
|
||
|
||
try:
|
||
print("🤖 调用大语言模型(流式输出)...")
|
||
|
||
# 获取角色配置中的系统提示词
|
||
if self.character_config and "system_prompt" in self.character_config:
|
||
system_prompt = self.character_config["system_prompt"]
|
||
else:
|
||
system_prompt = "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。"
|
||
|
||
# 获取角色配置中的最大token数
|
||
max_tokens = 50
|
||
if self.character_config and "max_tokens" in self.character_config:
|
||
max_tokens = self.character_config["max_tokens"]
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {self.llm_api_key}"
|
||
}
|
||
|
||
# 构建消息列表,包含历史记录
|
||
messages = [
|
||
{
|
||
"role": "system",
|
||
"content": system_prompt
|
||
}
|
||
]
|
||
|
||
# 添加历史对话记录
|
||
for history_item in self.chat_history:
|
||
messages.extend([
|
||
{"role": "user", "content": history_item["user"]},
|
||
{"role": "assistant", "content": history_item["assistant"]}
|
||
])
|
||
|
||
# 添加当前用户消息
|
||
messages.append({
|
||
"role": "user",
|
||
"content": user_message
|
||
})
|
||
|
||
data = {
|
||
"model": self.llm_model,
|
||
"messages": messages,
|
||
"max_tokens": max_tokens,
|
||
"stream": True # 启用流式输出
|
||
}
|
||
|
||
# 使用流式请求
|
||
response = requests.post(self.llm_api_url, headers=headers, json=data, stream=True, timeout=30)
|
||
|
||
if response.status_code == 200:
|
||
print("🔄 开始接收流式响应...")
|
||
|
||
# 处理流式响应
|
||
accumulated_text = ""
|
||
sentence_buffer = ""
|
||
|
||
print("🔍 开始接收SSE流...")
|
||
|
||
# 使用行缓冲区处理多字节字符
|
||
buffer = ""
|
||
for line_bytes in response.iter_lines():
|
||
if not line_bytes:
|
||
continue
|
||
|
||
# 解码为字符串,处理可能的编码问题
|
||
try:
|
||
line = line_bytes.decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
try:
|
||
line = line_bytes.decode('latin-1')
|
||
except Exception:
|
||
continue
|
||
|
||
# 添加到缓冲区
|
||
buffer += line
|
||
|
||
# 检查是否包含完整的数据行
|
||
while buffer:
|
||
# 查找完整的数据行
|
||
if '\n' in buffer:
|
||
line, buffer = buffer.split('\n', 1)
|
||
else:
|
||
line = buffer
|
||
buffer = ""
|
||
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
try:
|
||
# 解析SSE格式的响应
|
||
if line.startswith("data: "):
|
||
data_str = line[6:] # 移除 "data: " 前缀
|
||
|
||
if data_str == "[DONE]":
|
||
break
|
||
|
||
# 尝试解析JSON,增加错误处理
|
||
try:
|
||
chunk_data = json.loads(data_str)
|
||
except json.JSONDecodeError as e:
|
||
# 尝试修复JSON:检查是否是不完整的字符串
|
||
if not data_str.endswith('}'):
|
||
# 这可能是一个不完整的JSON,等待下一行
|
||
buffer = data_str + buffer
|
||
continue
|
||
|
||
print(f"\n🔍 调试 - 原始行: {repr(line)}")
|
||
print(f"🔍 调试 - 数据字符串: {repr(data_str)}")
|
||
print(f"🔍 调试 - JSON错误: {e}")
|
||
continue
|
||
|
||
# 处理解析后的数据
|
||
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
|
||
delta = chunk_data["choices"][0].get("delta", {})
|
||
content = delta.get("content", "")
|
||
|
||
if content:
|
||
accumulated_text += content
|
||
sentence_buffer += content
|
||
|
||
# 检查是否有完整句子
|
||
if self._is_complete_sentence(sentence_buffer):
|
||
print(f"📝 检测到完整句子: {sentence_buffer}")
|
||
|
||
# 过滤括号内容
|
||
filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip())
|
||
|
||
if filtered_sentence:
|
||
print(f"🎵 添加到智能缓冲区: {filtered_sentence}")
|
||
# 使用智能缓冲系统替代立即调用TTS
|
||
self._add_sentence_to_buffer(filtered_sentence)
|
||
|
||
# 标记已经进行了流式TTS播放
|
||
self._streaming_tts_completed = True
|
||
|
||
# 重置句子缓冲区
|
||
sentence_buffer = ""
|
||
|
||
# 显示进度
|
||
print(f"\r💬 已生成: {accumulated_text}", end='', flush=True)
|
||
|
||
except Exception as e:
|
||
print(f"\n⚠️ 处理流式响应时出错: {e}")
|
||
print(f"🔍 调试 - 问题行: {repr(line)}")
|
||
continue
|
||
|
||
print(f"\n✅ 流式响应完成: {accumulated_text}")
|
||
|
||
# 强制刷新TTS缓冲区,确保所有内容都被处理
|
||
self._flush_tts_buffer()
|
||
|
||
# 保存完整回复到历史记录
|
||
if accumulated_text:
|
||
filtered_response = self._filter_parentheses_content(accumulated_text.strip())
|
||
self._add_to_chat_history(user_message, filtered_response)
|
||
|
||
return accumulated_text
|
||
|
||
else:
|
||
print(f"❌ LLM API调用失败: {response.status_code}")
|
||
print(f"响应内容: {response.text}")
|
||
return None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"❌ 网络请求失败: {e}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"❌ LLM调用失败: {e}")
|
||
return None
|
||
|
||
def _filter_parentheses_content(self, text):
|
||
"""过滤文本中的括号内容(包括中文和英文括号)"""
|
||
import re
|
||
|
||
# 移除中文括号内容:(内容)
|
||
filtered_text = re.sub(r'([^)]*)', '', text)
|
||
# 移除英文括号内容:(content)
|
||
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
|
||
# 移除方括号内容:【内容】
|
||
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
|
||
# 移除方括号内容:[content]
|
||
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
|
||
# 移除书名号内容:「内容」
|
||
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
|
||
|
||
# 清理多余空格
|
||
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
|
||
|
||
return filtered_text
|
||
|
||
def _is_complete_sentence(self, text):
|
||
"""检测是否为完整句子"""
|
||
import re
|
||
|
||
if not text or len(text.strip()) == 0:
|
||
return False
|
||
|
||
# 增加最小长度要求 - 至少10个字符才考虑作为完整句子
|
||
if len(text.strip()) < 10:
|
||
return False
|
||
|
||
# 句子结束标点符号
|
||
sentence_endings = r'[。!?.!?]'
|
||
|
||
# 检查是否以句子结束符结尾
|
||
if re.search(sentence_endings + r'\s*$', text):
|
||
# 对于以结束符结尾的句子,要求至少15个字符
|
||
if len(text.strip()) >= 15:
|
||
return True
|
||
|
||
# 检查是否包含句子结束符(可能在句子中间)
|
||
if re.search(sentence_endings, text):
|
||
# 如果后面有其他标点或字符,可能不是完整句子
|
||
remaining_text = re.split(sentence_endings, text, 1)[-1]
|
||
if len(remaining_text.strip()) > 0:
|
||
return False
|
||
# 对于包含结束符的句子,要求至少20个字符
|
||
if len(text.strip()) >= 20:
|
||
return True
|
||
|
||
# 对于较长的文本(超过50字符),即使没有结束符也可以考虑
|
||
if len(text.strip()) >= 50:
|
||
return True
|
||
|
||
# 对于中等长度的文本,如果包含常见完整句式模式
|
||
if len(text.strip()) >= 20:
|
||
common_patterns = [
|
||
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
|
||
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
|
||
r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得
|
||
]
|
||
|
||
for pattern in common_patterns:
|
||
if re.match(pattern, text):
|
||
return True
|
||
|
||
return False
|
||
|
||
def _filter_parentheses_content(self, text):
|
||
"""过滤文本中的括号内容(包括中文和英文括号)"""
|
||
import re
|
||
|
||
# 移除中文括号内容:(内容)
|
||
filtered_text = re.sub(r'([^)]*)', '', text)
|
||
# 移除英文括号内容:(content)
|
||
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
|
||
# 移除方括号内容:【内容】
|
||
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
|
||
# 移除方括号内容:[content]
|
||
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
|
||
# 移除书名号内容:「内容」
|
||
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
|
||
|
||
# 清理多余空格
|
||
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
|
||
|
||
return filtered_text
|
||
|
||
def _add_to_chat_history(self, user_message, assistant_response):
|
||
"""添加对话到历史记录"""
|
||
# 如果历史记录超过最大长度,移除最早的记录
|
||
if len(self.chat_history) >= self.max_history_length:
|
||
self.chat_history.pop(0)
|
||
|
||
# 添加新的对话记录
|
||
self.chat_history.append({
|
||
"user": user_message,
|
||
"assistant": assistant_response,
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
def clear_chat_history(self):
|
||
"""清空聊天历史"""
|
||
self.chat_history = []
|
||
print("💬 聊天历史已清空")
|
||
|
||
def get_chat_history_summary(self):
|
||
"""获取聊天历史摘要"""
|
||
if not self.chat_history:
|
||
return "暂无聊天历史"
|
||
|
||
return f"当前有 {len(self.chat_history)} 轮对话记录"
|
||
|
||
def _add_to_chat_history(self, user_message, assistant_response):
|
||
"""添加对话到历史记录"""
|
||
# 如果历史记录超过最大长度,移除最早的记录
|
||
if len(self.chat_history) >= self.max_history_length:
|
||
self.chat_history.pop(0)
|
||
|
||
# 添加新的对话记录
|
||
self.chat_history.append({
|
||
"user": user_message,
|
||
"assistant": assistant_response,
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
def clear_chat_history(self):
|
||
"""清空聊天历史"""
|
||
self.chat_history = []
|
||
print("💬 聊天历史已清空")
|
||
|
||
def get_chat_history_summary(self):
|
||
"""获取聊天历史摘要"""
|
||
if not self.chat_history:
|
||
return "暂无聊天历史"
|
||
|
||
return f"当前有 {len(self.chat_history)} 轮对话记录"
|
||
|
||
def _safe_delete_file(self, filepath, description="文件"):
|
||
"""安全删除文件"""
|
||
try:
|
||
if filepath and os.path.exists(filepath):
|
||
os.remove(filepath)
|
||
print(f"🗑️ 已删除{description}: {filepath}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"⚠️ 删除{description}失败: {e}")
|
||
return False
|
||
|
||
def generate_tts_filename(self):
|
||
"""生成TTS文件名"""
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
return f"tts_response_{timestamp}.pcm"
|
||
|
||
def text_to_speech(self, text, mode='normal', use_worker=True):
|
||
"""文本转语音 - 真正实时流式播放
|
||
mode: 'normal' 普通模式, 'buffered' 智能缓冲模式
|
||
use_worker: 是否使用工作线程模式
|
||
"""
|
||
if not self.enable_tts:
|
||
return None
|
||
|
||
try:
|
||
if mode == 'buffered':
|
||
print("🎵 开始智能缓冲TTS处理...")
|
||
else:
|
||
print("🔊 开始文本转语音(实时流式播放)...")
|
||
|
||
# 如果是在工作线程中,不启动播放器
|
||
if use_worker and hasattr(self, 'is_playing') and self.is_playing:
|
||
print("⚠️ 工作线程模式,跳过播放器启动")
|
||
return None
|
||
|
||
# 构建请求头
|
||
headers = {
|
||
"X-Api-App-Id": self.tts_app_id,
|
||
"X-Api-Access-Key": self.tts_access_key,
|
||
"X-Api-Resource-Id": self.tts_resource_id,
|
||
"X-Api-App-Key": self.tts_app_key,
|
||
"Content-Type": "application/json",
|
||
"Connection": "keep-alive"
|
||
}
|
||
|
||
# 构建请求参数
|
||
payload = {
|
||
"user": {
|
||
"uid": "recorder_tts"
|
||
},
|
||
"req_params": {
|
||
"text": text,
|
||
"speaker": self.tts_speaker,
|
||
"audio_params": {
|
||
"format": "pcm",
|
||
"sample_rate": 16000,
|
||
"enable_timestamp": True
|
||
},
|
||
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
|
||
}
|
||
}
|
||
|
||
# 创建音频队列 - 设置更大的队列大小以减少中断
|
||
audio_queue = queue.Queue(maxsize=50) # 增加队列大小
|
||
|
||
# 启动实时播放线程
|
||
self._receiving_audio = True
|
||
player_thread = threading.Thread(target=self.play_audio_realtime, args=(audio_queue,))
|
||
player_thread.daemon = True
|
||
player_thread.start()
|
||
print("🎵 实时播放器已启动")
|
||
|
||
# 给播放器一点时间启动
|
||
time.sleep(0.1)
|
||
|
||
# 发送请求
|
||
session = requests.Session()
|
||
try:
|
||
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
|
||
|
||
if response.status_code != 200:
|
||
print(f"❌ TTS请求失败: {response.status_code}")
|
||
print(f"响应内容: {response.text}")
|
||
# 向队列发送结束信号
|
||
audio_queue.put(None)
|
||
return None
|
||
|
||
# 处理流式响应 - 根据模式优化
|
||
total_audio_size = 0
|
||
chunk_count = 0
|
||
|
||
if mode == 'buffered':
|
||
print("🔄 开始接收TTS音频流(智能缓冲模式)...")
|
||
else:
|
||
print("🔄 开始接收TTS音频流(实时播放)...")
|
||
|
||
for chunk in response.iter_lines(decode_unicode=True):
|
||
if not chunk:
|
||
continue
|
||
|
||
try:
|
||
data = json.loads(chunk)
|
||
|
||
if data.get("code", 0) == 0 and "data" in data and data["data"]:
|
||
chunk_audio = base64.b64decode(data["data"])
|
||
audio_size = len(chunk_audio)
|
||
total_audio_size += audio_size
|
||
chunk_count += 1
|
||
|
||
# 将音频块放入队列进行实时播放 - 使用非阻塞方式
|
||
try:
|
||
audio_queue.put_nowait(chunk_audio)
|
||
except queue.Full:
|
||
# 如果队列满了,稍微等待一下
|
||
time.sleep(0.01)
|
||
try:
|
||
audio_queue.put_nowait(chunk_audio)
|
||
except queue.Full:
|
||
# 如果还是满的,跳过这个块以避免阻塞
|
||
print(f"\n⚠️ 音频队列已满,跳过块 {chunk_count}")
|
||
continue
|
||
|
||
# 减少进度显示频率以提高性能
|
||
if chunk_count % 5 == 0:
|
||
print(f"\r📥 接收并播放: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB", end='', flush=True)
|
||
continue
|
||
|
||
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
|
||
print(f"\n📝 TTS句子信息: {data['sentence']}")
|
||
continue
|
||
|
||
if data.get("code", 0) == 20000000:
|
||
break
|
||
|
||
if data.get("code", 0) > 0:
|
||
print(f"\n❌ TTS错误响应: {data}")
|
||
break
|
||
|
||
except json.JSONDecodeError:
|
||
print(f"\n❌ 解析TTS响应失败: {chunk}")
|
||
continue
|
||
|
||
print(f"\n✅ TTS音频接收完成: {chunk_count} 个音频块, 总大小: {total_audio_size / 1024:.1f} KB")
|
||
|
||
# 等待播放完成
|
||
print("⏳ 等待音频播放完成...")
|
||
player_thread.join(timeout=5.0)
|
||
|
||
# 生成临时文件名用于返回
|
||
temp_file = self.generate_tts_filename()
|
||
|
||
return temp_file
|
||
|
||
finally:
|
||
response.close()
|
||
session.close()
|
||
# 确保播放线程结束
|
||
self._receiving_audio = False
|
||
audio_queue.put(None)
|
||
|
||
except Exception as e:
|
||
print(f"❌ TTS转换失败: {e}")
|
||
return None
|
||
|
||
def parse_arguments():
|
||
"""解析命令行参数"""
|
||
parser = argparse.ArgumentParser(description='基于能量检测的极简录音系统')
|
||
parser.add_argument('--character', '-c', type=str, default='libai',
|
||
help='选择角色 (默认: libai)')
|
||
parser.add_argument('--list-characters', '-l', action='store_true',
|
||
help='列出所有可用角色')
|
||
return parser.parse_args()
|
||
|
||
def list_characters(characters_dir):
|
||
"""列出所有可用角色"""
|
||
characters = []
|
||
if os.path.exists(characters_dir):
|
||
for file in os.listdir(characters_dir):
|
||
if file.endswith('.json'):
|
||
character_name = file[:-5]
|
||
config_file = os.path.join(characters_dir, file)
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
name = config.get('name', character_name)
|
||
desc = config.get('description', '无描述')
|
||
characters.append(f"{character_name}: {name} - {desc}")
|
||
except:
|
||
characters.append(f"{character_name}: 配置文件读取失败")
|
||
|
||
if characters:
|
||
print("🎭 可用角色列表:")
|
||
for char in characters:
|
||
print(f" - {char}")
|
||
else:
|
||
print("❌ 未找到任何角色配置文件")
|
||
|
||
def main():
|
||
"""主函数"""
|
||
args = parse_arguments()
|
||
|
||
characters_dir = os.path.join(os.path.dirname(__file__), "characters")
|
||
|
||
# 如果要求列出角色,显示后退出
|
||
if args.list_characters:
|
||
list_characters(characters_dir)
|
||
return
|
||
|
||
print("🚀 基于能量检测的极简录音系统")
|
||
print("🤖 集成语音识别功能")
|
||
print(f"🎭 当前角色: {args.character}")
|
||
print("=" * 50)
|
||
|
||
# 创建录音系统
|
||
recorder = EnergyBasedRecorder(
|
||
energy_threshold=200, # 能量阈值(降低以提高灵敏度)
|
||
silence_threshold=3.0, # 静音阈值(秒)- 改为3秒
|
||
min_recording_time=2.0, # 最小录音时间
|
||
max_recording_time=30.0, # 最大录音时间
|
||
enable_asr=True, # 启用语音识别功能
|
||
enable_llm=True, # 启用大语言模型功能
|
||
enable_tts=True, # 启用文本转语音功能
|
||
character=args.character # 指定角色
|
||
)
|
||
|
||
print("✅ 系统初始化成功")
|
||
print("🎯 功能特点:")
|
||
print(" - 完全移除Vosk识别依赖")
|
||
print(" - 基于ZCR语音检测,精确识别")
|
||
print(" - 集成在线语音识别(字节跳动ASR)")
|
||
print(" - 集成大语言模型(豆包大模型)")
|
||
print(" - 集成文本转语音(字节跳动TTS)")
|
||
print(" - 录音完成后自动语音识别")
|
||
print(" - 语音识别后自动调用AI助手")
|
||
print(" - AI回复后自动转换为语音")
|
||
print(" - 多角色支持 (李白、猪八戒等)")
|
||
print(" - 每个角色独特音色和性格")
|
||
print(" - 预录音功能(包含声音开始前2秒)")
|
||
print(" - 环形缓冲区防止丢失开头音频")
|
||
print(" - 自动调整能量阈值")
|
||
print(" - 实时性能监控")
|
||
print(" - 预期延迟: <0.1秒")
|
||
print("=" * 50)
|
||
|
||
# 显示API密钥状态
|
||
if not recorder.enable_llm:
|
||
print("🔑 提示: 如需启用大语言模型功能,请设置环境变量:")
|
||
print(" export ARK_API_KEY='your_api_key_here'")
|
||
print("=" * 50)
|
||
|
||
# 显示角色信息
|
||
if recorder.character_config:
|
||
print(f"🎭 当前角色: {recorder.character_config.get('name', '未知')}")
|
||
print(f"📝 描述: {recorder.character_config.get('description', '无描述')}")
|
||
print(f"🎤 音色: {recorder.tts_speaker}")
|
||
print("=" * 50)
|
||
|
||
# 显示使用说明
|
||
print("📖 使用说明:")
|
||
print("- 检测到声音自动开始录音")
|
||
print("- 持续静音3秒自动结束录音")
|
||
print("- 最少录音2秒,最多30秒")
|
||
print("- 录音完成后自动播放")
|
||
print("- 按 Ctrl+C 退出")
|
||
print("🎭 角色切换:")
|
||
print("- 使用 --character 或 -c 参数选择角色")
|
||
print("- 使用 --list-characters 或 -l 查看所有角色")
|
||
print("- 示例: python recorder.py --character zhubajie")
|
||
print("=" * 50)
|
||
|
||
# 开始运行
|
||
recorder.run()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|