2831 lines
133 KiB
Python
2831 lines
133 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
多进程音频处理模块
|
||
定义输入进程和输出进程的类
|
||
使用增强版语音检测器
|
||
"""
|
||
|
||
import base64
|
||
import glob
|
||
import gzip
|
||
import json
|
||
import multiprocessing as mp
|
||
import os
|
||
import queue
|
||
import threading
|
||
import time
|
||
import wave
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import numpy as np
|
||
import pyaudio
|
||
import requests
|
||
|
||
from enhanced_voice_detector import EnhancedVoiceDetector
|
||
from process_logger import ProcessLogger
|
||
|
||
|
||
# ========== Greeting缓存管理工具函数 ==========
|
||
def get_greeting_cache_path(character_name):
|
||
"""获取角色greeting缓存文件路径"""
|
||
return os.path.join("greeting_cache", f"{character_name}.wav")
|
||
|
||
def greeting_cache_exists(character_name):
|
||
"""检查角色greeting缓存是否存在"""
|
||
cache_path = get_greeting_cache_path(character_name)
|
||
return os.path.exists(cache_path)
|
||
|
||
def load_cached_audio(character_name):
|
||
"""加载缓存的音频数据"""
|
||
# 确保缓存目录存在
|
||
cache_dir = "greeting_cache"
|
||
os.makedirs(cache_dir, exist_ok=True)
|
||
|
||
cache_path = get_greeting_cache_path(character_name)
|
||
if not os.path.exists(cache_path):
|
||
return None
|
||
|
||
try:
|
||
with open(cache_path, 'rb') as f:
|
||
audio_data = f.read()
|
||
print(f"✅ 已加载角色 {character_name} 的缓存音频: {len(audio_data)} 字节")
|
||
return audio_data
|
||
except Exception as e:
|
||
print(f"❌ 加载缓存音频失败: {e}")
|
||
return None
|
||
|
||
def save_greeting_cache(character_name, audio_data):
|
||
"""保存greeting音频到缓存"""
|
||
try:
|
||
# 确保缓存目录存在
|
||
os.makedirs("greeting_cache", exist_ok=True)
|
||
|
||
cache_path = get_greeting_cache_path(character_name)
|
||
with open(cache_path, 'wb') as f:
|
||
f.write(audio_data)
|
||
print(f"✅ 已保存角色 {character_name} 的greeting音频到缓存: {len(audio_data)} 字节")
|
||
return True
|
||
except Exception as e:
|
||
print(f"❌ 保存缓存音频失败: {e}")
|
||
return False
|
||
|
||
# ========== 录音文件清理机制 ==========
|
||
|
||
def cleanup_recordings(retention_hours=24, dry_run=False):
|
||
"""
|
||
清理过期的录音文件
|
||
|
||
Args:
|
||
retention_hours: 保留时间(小时),默认24小时
|
||
dry_run: 是否只是模拟运行,不实际删除文件
|
||
|
||
Returns:
|
||
dict: 清理结果统计
|
||
"""
|
||
try:
|
||
# 计算截止时间
|
||
cutoff_time = time.time() - (retention_hours * 3600)
|
||
|
||
# 查找所有录音文件
|
||
recording_files = glob.glob("recording_*.wav")
|
||
|
||
stats = {
|
||
'total_files': len(recording_files),
|
||
'files_to_delete': 0,
|
||
'files_deleted': 0,
|
||
'total_size_mb': 0.0,
|
||
'freed_space_mb': 0.0,
|
||
'deleted_files': []
|
||
}
|
||
|
||
if not recording_files:
|
||
print(f"🧹 未找到录音文件")
|
||
return stats
|
||
|
||
# 计算总大小
|
||
for file_path in recording_files:
|
||
try:
|
||
file_size = os.path.getsize(file_path)
|
||
stats['total_size_mb'] += file_size / (1024 * 1024)
|
||
except:
|
||
pass
|
||
|
||
print(f"🧹 开始清理录音文件(保留 {retention_hours} 小时内的文件)")
|
||
print(f"📊 找到 {stats['total_files']} 个录音文件,总大小: {stats['total_size_mb']:.2f} MB")
|
||
|
||
if dry_run:
|
||
print(f"🔍 模拟运行模式,不会实际删除文件")
|
||
|
||
# 检查每个文件
|
||
for file_path in recording_files:
|
||
try:
|
||
# 获取文件修改时间
|
||
file_mtime = os.path.getmtime(file_path)
|
||
|
||
if file_mtime < cutoff_time:
|
||
# 文件已过期
|
||
stats['files_to_delete'] += 1
|
||
|
||
# 获取文件大小
|
||
file_size = os.path.getsize(file_path)
|
||
stats['freed_space_mb'] += file_size / (1024 * 1024)
|
||
|
||
if not dry_run:
|
||
# 实际删除文件
|
||
os.remove(file_path)
|
||
stats['files_deleted'] += 1
|
||
stats['deleted_files'].append(os.path.basename(file_path))
|
||
print(f"🗑️ 已删除: {file_path} ({file_size / 1024:.1f} KB)")
|
||
else:
|
||
print(f"🔍 标记删除: {file_path} ({file_size / 1024:.1f} KB)")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 处理文件 {file_path} 时出错: {e}")
|
||
continue
|
||
|
||
# 输出清理结果
|
||
print(f"📊 清理完成:")
|
||
print(f" - 总文件数: {stats['total_files']}")
|
||
print(f" - 应删除文件数: {stats['files_to_delete']}")
|
||
if not dry_run:
|
||
print(f" - 实际删除文件数: {stats['files_deleted']}")
|
||
print(f" - 释放空间: {stats['freed_space_mb']:.2f} MB")
|
||
|
||
return stats
|
||
|
||
except Exception as e:
|
||
print(f"❌ 清理录音文件时出错: {e}")
|
||
return {'error': str(e)}
|
||
|
||
def cleanup_recordings_by_count(max_files=50, dry_run=False):
|
||
"""
|
||
按文件数量清理录音文件,保留最新的N个文件
|
||
|
||
Args:
|
||
max_files: 最大保留文件数量,默认50个
|
||
dry_run: 是否只是模拟运行,不实际删除文件
|
||
|
||
Returns:
|
||
dict: 清理结果统计
|
||
"""
|
||
try:
|
||
# 查找所有录音文件并按修改时间排序
|
||
recording_files = glob.glob("recording_*.wav")
|
||
|
||
stats = {
|
||
'total_files': len(recording_files),
|
||
'files_to_delete': 0,
|
||
'files_deleted': 0,
|
||
'total_size_mb': 0.0,
|
||
'freed_space_mb': 0.0,
|
||
'deleted_files': []
|
||
}
|
||
|
||
if not recording_files:
|
||
print(f"🧹 未找到录音文件")
|
||
return stats
|
||
|
||
# 按修改时间排序(最新的在前)
|
||
recording_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
||
|
||
# 计算总大小
|
||
for file_path in recording_files:
|
||
try:
|
||
file_size = os.path.getsize(file_path)
|
||
stats['total_size_mb'] += file_size / (1024 * 1024)
|
||
except:
|
||
pass
|
||
|
||
print(f"🧹 开始按数量清理录音文件(保留最新的 {max_files} 个文件)")
|
||
print(f"📊 找到 {stats['total_files']} 个录音文件,总大小: {stats['total_size_mb']:.2f} MB")
|
||
|
||
if dry_run:
|
||
print(f"🔍 模拟运行模式,不会实际删除文件")
|
||
|
||
# 删除超出数量的文件
|
||
files_to_delete = recording_files[max_files:]
|
||
stats['files_to_delete'] = len(files_to_delete)
|
||
|
||
for file_path in files_to_delete:
|
||
try:
|
||
# 获取文件大小
|
||
file_size = os.path.getsize(file_path)
|
||
stats['freed_space_mb'] += file_size / (1024 * 1024)
|
||
|
||
if not dry_run:
|
||
# 实际删除文件
|
||
os.remove(file_path)
|
||
stats['files_deleted'] += 1
|
||
stats['deleted_files'].append(os.path.basename(file_path))
|
||
print(f"🗑️ 已删除: {file_path} ({file_size / 1024:.1f} KB)")
|
||
else:
|
||
print(f"🔍 标记删除: {file_path} ({file_size / 1024:.1f} KB)")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 删除文件 {file_path} 时出错: {e}")
|
||
continue
|
||
|
||
# 输出清理结果
|
||
print(f"📊 清理完成:")
|
||
print(f" - 总文件数: {stats['total_files']}")
|
||
print(f" - 应删除文件数: {stats['files_to_delete']}")
|
||
if not dry_run:
|
||
print(f" - 实际删除文件数: {stats['files_deleted']}")
|
||
print(f" - 释放空间: {stats['freed_space_mb']:.2f} MB")
|
||
|
||
return stats
|
||
|
||
except Exception as e:
|
||
print(f"❌ 按数量清理录音文件时出错: {e}")
|
||
return {'error': str(e)}
|
||
|
||
def list_recordings():
|
||
"""
|
||
列出所有录音文件及其信息
|
||
|
||
Returns:
|
||
list: 录音文件信息列表
|
||
"""
|
||
try:
|
||
recording_files = glob.glob("recording_*.wav")
|
||
file_info = []
|
||
|
||
for file_path in recording_files:
|
||
try:
|
||
stat = os.stat(file_path)
|
||
file_info.append({
|
||
'filename': os.path.basename(file_path),
|
||
'path': file_path,
|
||
'size_kb': stat.st_size / 1024,
|
||
'size_mb': stat.st_size / (1024 * 1024),
|
||
'created_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.st_ctime)),
|
||
'modified_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.st_mtime)),
|
||
'age_hours': (time.time() - stat.st_mtime) / 3600
|
||
})
|
||
except Exception as e:
|
||
print(f"❌ 获取文件信息 {file_path} 时出错: {e}")
|
||
continue
|
||
|
||
# 按修改时间排序(最新的在前)
|
||
file_info.sort(key=lambda x: x['modified_time'], reverse=True)
|
||
|
||
return file_info
|
||
|
||
except Exception as e:
|
||
print(f"❌ 列出录音文件时出错: {e}")
|
||
return []
|
||
|
||
def print_recording_summary():
|
||
"""打印录音文件摘要信息"""
|
||
recordings = list_recordings()
|
||
|
||
if not recordings:
|
||
print("📁 未找到录音文件")
|
||
return
|
||
|
||
total_size_mb = sum(r['size_mb'] for r in recordings)
|
||
oldest_file = min(recordings, key=lambda x: x['age_hours'])
|
||
newest_file = max(recordings, key=lambda x: x['age_hours'])
|
||
|
||
print(f"📁 录音文件摘要:")
|
||
print(f" - 文件数量: {len(recordings)}")
|
||
print(f" - 总大小: {total_size_mb:.2f} MB")
|
||
print(f" - 最新文件: {newest_file['filename']} ({newest_file['created_time']})")
|
||
print(f" - 最旧文件: {oldest_file['filename']} ({oldest_file['created_time']}, {oldest_file['age_hours']:.1f} 小时前)")
|
||
|
||
# 显示最近的5个文件
|
||
print(f"📄 最近的文件:")
|
||
for i, recording in enumerate(recordings[:5]):
|
||
print(f" {i+1}. {recording['filename']} - {recording['size_mb']:.2f} MB - {recording['created_time']}")
|
||
|
||
if len(recordings) > 5:
|
||
print(f" ... 还有 {len(recordings) - 5} 个文件")
|
||
|
||
|
||
|
||
class RecordingState(Enum):
|
||
"""录音状态枚举"""
|
||
IDLE = "idle"
|
||
RECORDING = "recording"
|
||
PROCESSING = "processing"
|
||
PLAYING = "playing"
|
||
|
||
@dataclass
|
||
class AudioSegment:
|
||
"""音频片段数据结构"""
|
||
audio_data: bytes
|
||
start_time: float
|
||
end_time: float
|
||
duration: float
|
||
metadata: Dict[str, Any] = None
|
||
|
||
@dataclass
|
||
class ControlCommand:
|
||
"""控制命令数据结构"""
|
||
command: str
|
||
parameters: Dict[str, Any] = None
|
||
|
||
@dataclass
|
||
class ProcessEvent:
|
||
"""进程事件数据结构"""
|
||
event_type: str
|
||
data: Optional[bytes] = None
|
||
metadata: Dict[str, Any] = None
|
||
|
||
class InputProcess:
|
||
"""输入进程 - 专门负责录音和语音检测"""
|
||
|
||
def __init__(self, command_queue: mp.Queue, event_queue: mp.Queue, config: Dict[str, Any] = None):
|
||
self.command_queue = command_queue # 主进程 → 输入进程
|
||
self.event_queue = event_queue # 输入进程 → 主进程
|
||
|
||
# 配置参数
|
||
self.config = config or self._get_default_config()
|
||
|
||
# 初始化日志记录器
|
||
self.logger = ProcessLogger("InputProcess")
|
||
|
||
# 音频参数
|
||
self.FORMAT = pyaudio.paInt16
|
||
self.CHANNELS = 1
|
||
self.RATE = 16000
|
||
self.CHUNK_SIZE = 1024
|
||
|
||
# 状态控制
|
||
self.recording_enabled = False # 是否允许录音(默认不启用,等待外部命令)
|
||
self.is_recording = False # 是否正在录音
|
||
self.recording_buffer = [] # 录音缓冲区
|
||
self.pre_record_buffer = [] # 预录音缓冲区
|
||
self.silence_start_time = None
|
||
self.recording_start_time = None
|
||
|
||
# 预录音参数
|
||
self.pre_record_duration = 2.0
|
||
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE)
|
||
|
||
# 增强语音检测器
|
||
self.voice_detector = EnhancedVoiceDetector(
|
||
sample_rate=self.RATE,
|
||
chunk_size=self.CHUNK_SIZE
|
||
)
|
||
|
||
# 静音检测参数
|
||
self.consecutive_silence_count = 0
|
||
self.silence_threshold_count = 30 # 约3秒
|
||
|
||
# PyAudio实例
|
||
self.audio = None
|
||
self.input_stream = None
|
||
|
||
# 运行状态
|
||
self.running = True
|
||
|
||
# TTS 工作线程
|
||
self.tts_worker_running = True
|
||
self.tts_worker_thread = None
|
||
self.tts_task_queue = mp.Queue(maxsize=20) # 增加到20个任务容量
|
||
|
||
# TTS 配置
|
||
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
|
||
self.tts_app_id = "8718217928"
|
||
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
|
||
self.tts_resource_id = "volc.service_type.10029"
|
||
self.tts_app_key = "aGjiRDfUWi"
|
||
self.tts_speaker = config.get('tts_speaker', "zh_female_wanqudashu_moon_bigtts") if config else "zh_female_wanqudashu_moon_bigtts"
|
||
|
||
|
||
# 启动 TTS 工作线程
|
||
self._start_tts_worker()
|
||
|
||
def _get_default_config(self) -> Dict[str, Any]:
|
||
"""获取默认配置"""
|
||
return {
|
||
'zcr_min': 2400, # 适应16kHz采样率的ZCR最小值
|
||
'zcr_max': 12000, # 适应16kHz采样率的ZCR最大值
|
||
'min_recording_time': 2.0, # 最小录音时间
|
||
'max_recording_time': 30.0,
|
||
'silence_threshold': 3.0,
|
||
'pre_record_duration': 2.0
|
||
}
|
||
|
||
def run(self):
|
||
"""输入进程主循环"""
|
||
self.logger.info("输入进程启动")
|
||
self._setup_audio()
|
||
|
||
try:
|
||
while self.running:
|
||
# 1. 检查主进程命令
|
||
self._check_commands()
|
||
|
||
# 2. 处理音频的条件:
|
||
# - 录音已启用,或者
|
||
# - 正在校准中(需要音频数据进行校准)
|
||
should_process_audio = (self.recording_enabled or
|
||
(hasattr(self.voice_detector, 'calibration_mode') and
|
||
self.voice_detector.calibration_mode))
|
||
|
||
if should_process_audio:
|
||
self._process_audio()
|
||
|
||
# 3. 短暂休眠,减少CPU占用
|
||
time.sleep(0.01)
|
||
|
||
except KeyboardInterrupt:
|
||
self.logger.info("输入进程收到中断信号")
|
||
except Exception as e:
|
||
self.logger.error(f"输入进程错误: {e}")
|
||
finally:
|
||
self._cleanup()
|
||
self.logger.info("输入进程退出")
|
||
|
||
def start_calibration(self):
|
||
"""开始校准语音检测器"""
|
||
if hasattr(self, 'voice_detector') and self.voice_detector:
|
||
self.voice_detector.reset()
|
||
self.voice_detector.calibration_mode = True
|
||
self.voice_detector.calibration_samples = 0
|
||
|
||
# 确保音频流已设置(校准需要音频数据)
|
||
if not self.input_stream:
|
||
self._setup_audio()
|
||
|
||
print("🎙️ 输入进程:开始语音检测器校准")
|
||
return True
|
||
return False
|
||
|
||
def start_monitoring(self):
|
||
"""开始监听音频"""
|
||
if not self.recording_enabled:
|
||
self.recording_enabled = True
|
||
# 重新初始化音频流
|
||
self._cleanup_audio_stream()
|
||
self.recording_buffer = []
|
||
self.pre_record_buffer = []
|
||
self.is_recording = False
|
||
self.silence_start_time = None
|
||
self.consecutive_silence_count = 0
|
||
self._setup_audio()
|
||
print("🎙️ 输入进程:开始音频监听")
|
||
return True
|
||
return False
|
||
|
||
def stop_monitoring(self):
|
||
"""停止监听音频"""
|
||
if self.recording_enabled:
|
||
self.recording_enabled = False
|
||
if self.is_recording:
|
||
self._stop_recording()
|
||
self._cleanup_audio_stream()
|
||
print("🎙️ 输入进程:停止音频监听")
|
||
return True
|
||
return False
|
||
|
||
def get_calibration_status(self):
|
||
"""获取校准状态"""
|
||
if hasattr(self, 'voice_detector') and self.voice_detector:
|
||
return {
|
||
'calibrating': self.voice_detector.calibration_mode,
|
||
'progress': self.voice_detector.calibration_samples / self.voice_detector.required_calibration,
|
||
'samples': self.voice_detector.calibration_samples,
|
||
'required': self.voice_detector.required_calibration
|
||
}
|
||
return {'calibrating': False, 'progress': 0, 'samples': 0, 'required': 0}
|
||
|
||
def get_monitoring_status(self):
|
||
"""获取监听状态"""
|
||
return {
|
||
'enabled': self.recording_enabled,
|
||
'recording': self.is_recording,
|
||
'audio_stream_active': self.input_stream is not None
|
||
}
|
||
|
||
def _setup_audio(self):
|
||
"""设置音频输入设备"""
|
||
try:
|
||
self.audio = pyaudio.PyAudio()
|
||
self.input_stream = self.audio.open(
|
||
format=self.FORMAT,
|
||
channels=self.CHANNELS,
|
||
rate=self.RATE,
|
||
input=True,
|
||
frames_per_buffer=self.CHUNK_SIZE
|
||
)
|
||
self.logger.info("音频设备初始化成功")
|
||
except Exception as e:
|
||
self.logger.error(f"音频设备初始化失败: {e}")
|
||
raise
|
||
|
||
def _cleanup_audio_stream(self):
|
||
"""清理音频流(但不关闭整个PyAudio实例)"""
|
||
try:
|
||
if self.input_stream:
|
||
print("🎙️ 输入进程:关闭音频输入流")
|
||
self.input_stream.stop_stream()
|
||
self.input_stream.close()
|
||
self.input_stream = None
|
||
except Exception as e:
|
||
self.logger.warning(f"关闭音频流时出错: {e}")
|
||
|
||
def _cleanup(self):
|
||
"""清理资源"""
|
||
print("🎙️ 输入进程:开始清理资源")
|
||
self._cleanup_audio_stream()
|
||
|
||
if self.audio:
|
||
try:
|
||
print("🎙️ 输入进程:终止PyAudio实例")
|
||
self.audio.terminate()
|
||
except:
|
||
pass
|
||
self.audio = None
|
||
|
||
print("🎙️ 输入进程:资源清理完成")
|
||
|
||
def _check_commands(self):
|
||
"""检查主进程控制命令"""
|
||
try:
|
||
while True:
|
||
command = self.command_queue.get_nowait()
|
||
|
||
if command.command == 'enable_recording':
|
||
if not self.recording_enabled:
|
||
# 从禁用到启用,需要重新初始化音频流
|
||
print("🎙️ 输入进程:重新启用录音功能,重新初始化音频流")
|
||
self._cleanup_audio_stream()
|
||
# 清空所有音频缓冲区,防止旧数据被录制
|
||
self.recording_buffer = []
|
||
self.pre_record_buffer = []
|
||
self.is_recording = False
|
||
self.silence_start_time = None
|
||
self.consecutive_silence_count = 0
|
||
print("🎙️ 输入进程:已清空所有音频缓冲区")
|
||
self._setup_audio()
|
||
self.recording_enabled = True
|
||
self.logger.info("录音功能已启用")
|
||
|
||
elif command.command == 'disable_recording':
|
||
self.recording_enabled = False
|
||
# 如果正在录音,立即停止并发送数据
|
||
if self.is_recording:
|
||
self._stop_recording()
|
||
# 禁用时关闭音频流,避免回声污染
|
||
self._cleanup_audio_stream()
|
||
self.logger.info("录音功能已禁用")
|
||
|
||
elif command.command == 'start_calibration':
|
||
success = self.start_calibration()
|
||
if success:
|
||
self.logger.info("校准已启动")
|
||
else:
|
||
self.logger.error("校准启动失败")
|
||
|
||
elif command.command == 'start_monitoring':
|
||
success = self.start_monitoring()
|
||
if success:
|
||
self.logger.info("监听已启动")
|
||
else:
|
||
self.logger.error("监听启动失败")
|
||
|
||
elif command.command == 'stop_monitoring':
|
||
success = self.stop_monitoring()
|
||
if success:
|
||
self.logger.info("监听已停止")
|
||
else:
|
||
self.logger.error("监听停止失败")
|
||
|
||
elif command.command == 'get_calibration_status':
|
||
status = self.get_calibration_status()
|
||
self.event_queue.put(ProcessEvent(
|
||
event_type='calibration_status',
|
||
metadata=status
|
||
))
|
||
|
||
elif command.command == 'get_monitoring_status':
|
||
status = self.get_monitoring_status()
|
||
self.event_queue.put(ProcessEvent(
|
||
event_type='monitoring_status',
|
||
metadata=status
|
||
))
|
||
|
||
elif command.command == 'shutdown':
|
||
self.logger.info("收到关闭命令")
|
||
self.running = False
|
||
return
|
||
|
||
elif command.command == 'emergency_stop':
|
||
# 紧急停止命令 - 立即停止所有音频活动
|
||
print("🚨 输入进程收到紧急停止命令!")
|
||
self._handle_emergency_stop()
|
||
return
|
||
|
||
elif command.command == 'cleanup_recordings':
|
||
# 清理录音文件命令
|
||
print("🧹 输入进程收到清理录音文件命令")
|
||
self._perform_cleanup()
|
||
|
||
elif command.command == 'list_recordings':
|
||
# 列出录音文件命令
|
||
print("📋 输入进程收到列出录音文件命令")
|
||
self._list_recordings_info()
|
||
|
||
except queue.Empty:
|
||
pass
|
||
|
||
def _process_audio(self):
|
||
"""处理音频数据"""
|
||
try:
|
||
# 检查音频流是否存在
|
||
if not self.input_stream:
|
||
return
|
||
|
||
data = self.input_stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
|
||
if len(data) == 0:
|
||
return
|
||
|
||
# 更新预录音缓冲区
|
||
self._update_pre_record_buffer(data)
|
||
|
||
# 使用增强语音检测器
|
||
detection_result = self.voice_detector.is_voice_advanced(data)
|
||
is_voice = detection_result['is_voice']
|
||
|
||
# 如果正在校准,确保语音检测器能处理数据
|
||
if hasattr(self.voice_detector, 'calibration_mode') and self.voice_detector.calibration_mode:
|
||
# 校准模式下显示进度
|
||
if hasattr(self.voice_detector, 'calibration_samples'):
|
||
progress = (self.voice_detector.calibration_samples /
|
||
self.voice_detector.required_calibration * 100) if hasattr(self.voice_detector, 'required_calibration') else 0
|
||
status = f"\r🎙️ 校准中... {progress:.1f}%"
|
||
print(status, end='', flush=True)
|
||
|
||
if self.is_recording:
|
||
# 录音模式
|
||
self.recording_buffer.append(data)
|
||
|
||
# 静音检测
|
||
if is_voice:
|
||
self.silence_start_time = None
|
||
self.consecutive_silence_count = 0
|
||
else:
|
||
self.consecutive_silence_count += 1
|
||
if self.silence_start_time is None:
|
||
self.silence_start_time = time.time()
|
||
|
||
# 检查是否应该停止录音
|
||
recording_duration = time.time() - self.recording_start_time
|
||
should_stop = False
|
||
|
||
# 静音检测
|
||
if (self.consecutive_silence_count >= self.silence_threshold_count and
|
||
recording_duration >= self.config['min_recording_time']):
|
||
should_stop = True
|
||
print(f"🎙️ 输入进程:静音检测触发停止录音")
|
||
|
||
# 最大时间检测
|
||
if recording_duration >= self.config['max_recording_time']:
|
||
should_stop = True
|
||
print(f"🎙️ 输入进程:达到最大录音时间")
|
||
|
||
if should_stop:
|
||
self._stop_recording()
|
||
else:
|
||
# 显示录音状态
|
||
confidence = detection_result.get('confidence', 0.0)
|
||
energy = detection_result.get('energy', 0.0)
|
||
zcr = detection_result.get('zcr', 0.0)
|
||
status = (f"\r🎙️ 录音中... {recording_duration:.1f}s | "
|
||
f"能量: {energy:.0f} | ZCR: {zcr:.0f} | "
|
||
f"语音: {is_voice} | 置信度: {confidence:.2f}")
|
||
print(status, end='', flush=True)
|
||
|
||
else:
|
||
# 监听模式
|
||
if is_voice:
|
||
# 检测到语音,开始录音
|
||
self._start_recording()
|
||
else:
|
||
# 显示监听状态
|
||
if detection_result['calibrating']:
|
||
progress = detection_result['calibration_progress'] * 100
|
||
status = f"\r🎙️ 校准中... {progress:.0f}%"
|
||
else:
|
||
confidence = detection_result.get('confidence', 0.0)
|
||
energy = detection_result.get('energy', 0.0)
|
||
zcr = detection_result.get('zcr', 0.0)
|
||
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
|
||
status = (f"\r🎙️ 监听中... 能量: {energy:.0f} | ZCR: {zcr:.0f} | "
|
||
f"语音: {is_voice} | 置信度: {confidence:.2f} | "
|
||
f"缓冲: {buffer_usage:.0f}%")
|
||
|
||
print(status, end='', flush=True)
|
||
|
||
except Exception as e:
|
||
print(f"🎙️ 输入进程音频处理错误: {e}")
|
||
# 如果音频流出现问题,尝试重新初始化
|
||
if "stream" in str(e).lower() or "audio" in str(e).lower():
|
||
print(f"🎙️ 输入进程:音频流异常,尝试重新初始化")
|
||
self._cleanup_audio_stream()
|
||
if self.recording_enabled:
|
||
self._setup_audio()
|
||
|
||
def _update_pre_record_buffer(self, audio_data: bytes):
|
||
"""更新预录音缓冲区"""
|
||
self.pre_record_buffer.append(audio_data)
|
||
|
||
# 保持缓冲区大小
|
||
if len(self.pre_record_buffer) > self.pre_record_max_frames:
|
||
self.pre_record_buffer.pop(0)
|
||
|
||
def _start_recording(self):
|
||
"""开始录音"""
|
||
if not self.recording_enabled:
|
||
return
|
||
|
||
self.is_recording = True
|
||
self.recording_buffer = []
|
||
self.recording_start_time = time.time()
|
||
self.silence_start_time = None
|
||
self.consecutive_silence_count = 0
|
||
|
||
# 将预录音缓冲区的内容添加到录音中
|
||
self.recording_buffer.extend(self.pre_record_buffer)
|
||
self.pre_record_buffer.clear()
|
||
|
||
print(f"🎙️ 输入进程:开始录音(包含预录音 {self.config['pre_record_duration']}秒)")
|
||
|
||
def _stop_recording(self):
|
||
"""停止录音并发送数据"""
|
||
if not self.is_recording:
|
||
return
|
||
|
||
self.is_recording = False
|
||
|
||
# 合并录音数据
|
||
if self.recording_buffer:
|
||
audio_data = b''.join(self.recording_buffer)
|
||
duration = len(audio_data) / (self.RATE * 2)
|
||
|
||
# 创建音频片段
|
||
segment = AudioSegment(
|
||
audio_data=audio_data,
|
||
start_time=self.recording_start_time,
|
||
end_time=time.time(),
|
||
duration=duration,
|
||
metadata={
|
||
'sample_rate': self.RATE,
|
||
'channels': self.CHANNELS,
|
||
'format': self.FORMAT,
|
||
'chunk_size': self.CHUNK_SIZE
|
||
}
|
||
)
|
||
|
||
# 保存录音文件(可选)
|
||
filename = self._save_recording(audio_data)
|
||
|
||
# 发送给主进程
|
||
self.event_queue.put(ProcessEvent(
|
||
event_type='recording_complete',
|
||
data=audio_data,
|
||
metadata={
|
||
'duration': duration,
|
||
'start_time': self.recording_start_time,
|
||
'filename': filename
|
||
}
|
||
))
|
||
|
||
print(f"📝 输入进程:录音完成,时长 {duration:.2f} 秒")
|
||
|
||
# 清空缓冲区
|
||
self.recording_buffer = []
|
||
self.pre_record_buffer = []
|
||
|
||
def _save_recording(self, audio_data: bytes) -> str:
|
||
"""保存录音文件"""
|
||
try:
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
filename = f"recording_{timestamp}.wav"
|
||
|
||
with wave.open(filename, 'wb') as wf:
|
||
wf.setnchannels(self.CHANNELS)
|
||
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
|
||
wf.setframerate(self.RATE)
|
||
wf.writeframes(audio_data)
|
||
|
||
print(f"💾 输入进程:录音已保存到 {filename}")
|
||
return filename
|
||
|
||
except Exception as e:
|
||
print(f"❌ 输入进程保存录音失败: {e}")
|
||
return None
|
||
|
||
def _list_recordings_info(self):
|
||
"""列出录音文件信息"""
|
||
try:
|
||
print_recording_summary()
|
||
except Exception as e:
|
||
print(f"❌ 列出录音文件信息时出错: {e}")
|
||
|
||
def _cleanup(self):
|
||
"""清理资源"""
|
||
# 停止 TTS 工作线程
|
||
self.tts_worker_running = False
|
||
if self.tts_worker_thread:
|
||
self.tts_task_queue.put(None) # 发送结束信号
|
||
self.tts_worker_thread.join(timeout=2.0)
|
||
|
||
if self.input_stream:
|
||
try:
|
||
self.input_stream.stop_stream()
|
||
self.input_stream.close()
|
||
except:
|
||
pass
|
||
|
||
if self.audio:
|
||
try:
|
||
self.audio.terminate()
|
||
except:
|
||
pass
|
||
|
||
def _start_tts_worker(self):
|
||
"""启动TTS工作线程"""
|
||
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
|
||
self.tts_worker_thread.start()
|
||
self.logger.info("TTS工作线程已启动")
|
||
|
||
def _tts_worker(self):
|
||
"""TTS工作线程 - 处理TTS任务队列"""
|
||
while self.tts_worker_running:
|
||
try:
|
||
# 从队列获取任务
|
||
task = self.tts_task_queue.get(timeout=1.0)
|
||
if task is None: # 结束信号
|
||
break
|
||
|
||
task_type, content = task
|
||
if task_type == "tts_sentence":
|
||
# 生成音频数据
|
||
self._generate_tts_audio(content)
|
||
|
||
except queue.Empty:
|
||
continue
|
||
except Exception as e:
|
||
self.logger.error(f"TTS工作线程错误: {e}")
|
||
time.sleep(0.1)
|
||
|
||
def _add_tts_task(self, content):
|
||
"""添加TTS任务到队列"""
|
||
try:
|
||
self.tts_task_queue.put_nowait(("tts_sentence", content))
|
||
return True
|
||
except queue.Full:
|
||
self.logger.warning("TTS任务队列已满,丢弃任务")
|
||
return False
|
||
|
||
def _generate_tts_audio(self, text):
|
||
"""生成TTS音频数据"""
|
||
try:
|
||
self.logger.info(f"生成TTS音频: {text[:50]}...")
|
||
|
||
# 构建请求头
|
||
headers = {
|
||
"X-Api-App-Id": self.tts_app_id,
|
||
"X-Api-Access-Key": self.tts_access_key,
|
||
"X-Api-Resource-Id": self.tts_resource_id,
|
||
"X-Api-App-Key": self.tts_app_key,
|
||
"Content-Type": "application/json",
|
||
"Connection": "keep-alive"
|
||
}
|
||
|
||
# 构建请求参数
|
||
payload = {
|
||
"user": {
|
||
"uid": "input_process_tts"
|
||
},
|
||
"req_params": {
|
||
"text": text,
|
||
"speaker": self.tts_speaker,
|
||
"audio_params": {
|
||
"format": "pcm",
|
||
"sample_rate": 16000,
|
||
"enable_timestamp": True
|
||
},
|
||
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
|
||
}
|
||
}
|
||
|
||
# 发送请求
|
||
session = requests.Session()
|
||
try:
|
||
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
|
||
|
||
if response.status_code != 200:
|
||
self.logger.error(f"TTS请求失败: {response.status_code}")
|
||
return False
|
||
|
||
# 处理流式响应
|
||
total_audio_size = 0
|
||
chunk_count = 0
|
||
|
||
self.logger.info("开始接收TTS音频流...")
|
||
|
||
for chunk in response.iter_lines(decode_unicode=True):
|
||
if not chunk:
|
||
continue
|
||
|
||
try:
|
||
data = json.loads(chunk)
|
||
|
||
if data.get("code", 0) == 0 and "data" in data and data["data"]:
|
||
chunk_audio = base64.b64decode(data["data"])
|
||
audio_size = len(chunk_audio)
|
||
total_audio_size += audio_size
|
||
chunk_count += 1
|
||
|
||
# 这里可以将音频数据发送到输出进程
|
||
# 或者直接处理音频数据
|
||
self.logger.debug(f"接收音频块: {audio_size} 字节")
|
||
|
||
# 减少进度显示频率
|
||
if chunk_count % 5 == 0:
|
||
progress = f"生成音频: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB"
|
||
self.logger.info(progress)
|
||
continue
|
||
|
||
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
|
||
# 处理句子信息
|
||
continue
|
||
|
||
if data.get("code", 0) == 20000000:
|
||
# 真正的结束信号
|
||
print(f"🎵 收到TTS流结束信号,总共处理了 {chunk_count} 个音频块")
|
||
print(f"🎵 总音频大小: {total_audio_size} 字节 ({total_audio_size/1024:.1f} KB)")
|
||
break
|
||
|
||
if data.get("code", 0) > 0:
|
||
self.logger.error(f"TTS错误响应: {data}")
|
||
break
|
||
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 总大小: {total_audio_size / 1024:.1f} KB")
|
||
|
||
return chunk_count > 0
|
||
|
||
finally:
|
||
response.close()
|
||
session.close()
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"TTS音频生成失败: {e}")
|
||
return False
|
||
|
||
def _handle_emergency_stop(self):
|
||
"""处理紧急停止命令 - 立即停止所有录音活动"""
|
||
print("🚨 输入进程开始执行紧急停止...")
|
||
|
||
try:
|
||
# 1. 立即停止录音
|
||
print("🛑 立即停止录音...")
|
||
if self.is_recording:
|
||
self.is_recording = False
|
||
print("✅ 录音状态已重置")
|
||
|
||
# 2. 立即禁用录音功能
|
||
print("🛑 立即禁用录音功能...")
|
||
self.recording_enabled = False
|
||
print("✅ 录音功能已禁用")
|
||
|
||
# 3. 清空所有录音缓冲区
|
||
print("🛑 清空所有录音缓冲区...")
|
||
self.recording_buffer.clear()
|
||
self.pre_record_buffer.clear()
|
||
print("✅ 录音缓冲区已清空")
|
||
|
||
# 4. 立即关闭音频流(关键步骤)
|
||
print("🛑 立即关闭音频流...")
|
||
self._cleanup_audio_stream()
|
||
print("✅ 音频流已关闭")
|
||
|
||
# 5. 重置所有录音相关状态
|
||
print("🛑 重置录音相关状态...")
|
||
self.silence_start_time = None
|
||
self.consecutive_silence_count = 0
|
||
self.recording_start_time = None
|
||
print("✅ 录音状态已重置")
|
||
|
||
# 6. 等待一小段时间确保音频设备完全停止
|
||
print("⏱️ 等待音频设备完全停止...")
|
||
time.sleep(0.3)
|
||
|
||
print("🚨 输入进程紧急停止完成")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 输入进程紧急停止时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def process_tts_request(self, text):
|
||
"""处理TTS请求的公共接口"""
|
||
return self._add_tts_task(text)
|
||
|
||
class OutputProcess:
|
||
"""输出进程 - 借鉴 recorder.py 的优秀架构"""
|
||
|
||
def __init__(self, audio_queue: mp.Queue, config: Dict[str, Any] = None, event_queue: mp.Queue = None):
|
||
print("🔊 OutputProcess __init__ 开始执行...")
|
||
self.audio_queue = audio_queue # 主进程 → 输出进程(统一音频播放队列)
|
||
self.event_queue = event_queue # 输出进程 → 主进程(事件通知)
|
||
self.config = config or self._get_default_config()
|
||
|
||
# 初始化日志记录器
|
||
self.logger = ProcessLogger("OutputProcess")
|
||
print("🔊 OutputProcess 基本初始化完成")
|
||
|
||
# 音频播放参数 - 完全借鉴 recorder.py 的优化参数
|
||
self.FORMAT = pyaudio.paInt16
|
||
self.CHANNELS = 1
|
||
self.RATE = 16000
|
||
self.CHUNK_SIZE = 1024 # 减小缓冲区大小,提高响应性
|
||
|
||
# 播放状态管理 - 借鉴 recorder.py 的状态管理模式
|
||
self.is_playing = False
|
||
self.currently_playing = False # 当前是否正在播放(双重状态检查)
|
||
self.playback_buffer = [] # 播放缓冲区
|
||
self.total_chunks_played = 0
|
||
self.total_audio_size = 0
|
||
self.last_playback_time = 0 # 最后播放时间戳
|
||
self.playback_cooldown_period = 0.05 # 播放冷却时间(秒)- 防止回声,减少到0.05秒
|
||
self.playback_completed = False # 播放完成标志
|
||
self.end_signal_received = False # 结束信号接收标志
|
||
self.end_signal_time = 0 # 结束信号接收时间
|
||
|
||
# 智能缓冲系统 - 借鉴 recorder.py 的智能句子累积策略
|
||
self.preload_buffer = [] # 预加载缓冲区(保留用于音频块)
|
||
self.preload_size = 3 # 预加载3个音频块(减少预加载时间,加快播放启动)
|
||
|
||
# 简化的音频系统 - 直接使用预加载缓冲区
|
||
self.audio_queue_lock = None # 音频队列操作锁(在run方法中初始化)
|
||
|
||
# 智能句子缓冲系统 - 从 recorder.py 借鉴的核心机制
|
||
self.tts_buffer = [] # 智能句子缓冲区
|
||
self.tts_buffer_max_size = 5 # 最多缓冲5个句子(增加缓冲减少卡顿)
|
||
self.tts_buffer_min_size = 2 # 最少2个句子(增加最小缓冲)
|
||
self.tts_accumulation_time = 0.15 # 150ms积累窗口(减少等待时间)
|
||
self.tts_last_trigger_time = 0 # 上次触发TTS的时间
|
||
self.tts_pending_sentences = [] # 待处理的句子
|
||
self.min_buffer_size = 2 # 最小缓冲区大小(增加最小缓冲)
|
||
|
||
self.audio_device_healthy = True # 音频设备健康状态
|
||
|
||
# 统一播放工作线程 - 核心改进
|
||
self.playback_worker_running = True
|
||
self.playback_worker_thread = None
|
||
|
||
# 播放完成检测
|
||
self.last_audio_time = 0 # 最后收到音频数据的时间
|
||
self.playback_timeout = 5.0 # 播放超时时间(秒)
|
||
self.completion_sent = False # 防止重复发送完成事件
|
||
|
||
# 性能监控
|
||
self.performance_stats = {
|
||
'total_chunks_played': 0,
|
||
'total_audio_size': 0,
|
||
'avg_buffer_size': 0,
|
||
'max_buffer_size': 0,
|
||
'tts_wait_time': 0,
|
||
'playback_delay': 0
|
||
}
|
||
|
||
# 增强的播放完成检测状态
|
||
self.llm_generation_complete = False # LLM生成是否完成
|
||
self.tts_generation_complete = False # TTS生成是否完成
|
||
self.all_audio_received = False # 所有音频数据是否已接收
|
||
self.pre_buffer_empty = False # 预缓冲区是否为空
|
||
self.playback_buffer_empty = False # 播放缓冲区是否为空
|
||
self.no_active_playback = False # 是否没有活跃的播放
|
||
self.last_audio_chunk_time = 0 # 最后一个音频块开始播放的时间,初始化为0表示尚未播放
|
||
|
||
# PyAudio实例
|
||
self.audio = None
|
||
self.output_stream = None
|
||
|
||
# 运行状态
|
||
self.running = True
|
||
|
||
# TTS 工作线程
|
||
self.tts_worker_running = True
|
||
self.tts_worker_thread = None
|
||
self.tts_task_queue = mp.Queue(maxsize=20) # 增加到20个任务容量
|
||
|
||
# TTS 配置
|
||
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
|
||
self.tts_app_id = "8718217928"
|
||
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
|
||
self.tts_resource_id = "volc.service_type.10029"
|
||
self.tts_app_key = "aGjiRDfUWi"
|
||
self.tts_speaker = config.get('tts_speaker', "zh_female_wanqudashu_moon_bigtts") if config else "zh_female_wanqudashu_moon_bigtts"
|
||
|
||
# 启动工作线程 - 先启动播放线程,再启动TTS线程
|
||
print("🔊 准备启动播放工作线程...")
|
||
try:
|
||
self._start_playback_worker()
|
||
print("🔊 播放工作线程已启动,准备启动TTS工作线程...")
|
||
self._start_tts_worker()
|
||
print("🔊 TTS工作线程已启动")
|
||
except Exception as e:
|
||
print(f"❌ 工作线程启动失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def _start_playback_worker(self):
|
||
"""启动播放工作线程 - 借鉴 recorder.py 的播放线程模式"""
|
||
print("🔊 创建播放工作线程...")
|
||
self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True)
|
||
print("🔊 启动播放工作线程...")
|
||
self.playback_worker_thread.start()
|
||
print("🔊 播放工作线程已启动")
|
||
self.logger.info("播放工作线程已启动")
|
||
|
||
def _playback_worker(self):
|
||
"""播放工作线程 - 专门负责音频播放,完全借鉴 recorder.py"""
|
||
print("🔊 播放工作线程开始运行...")
|
||
|
||
# 等待音频设备就绪和初始化完成
|
||
max_wait_time = 10 # 最多等待10秒
|
||
wait_start_time = time.time()
|
||
|
||
|
||
while (self.audio is None or not self.running) and (time.time() - wait_start_time) < max_wait_time:
|
||
time.sleep(0.1)
|
||
if (time.time() - wait_start_time) % 1 < 0.1: # 每秒打印一次状态
|
||
print(f"🔊 仍在等待音频设备就绪... audio={self.audio is not None}, running={self.running}")
|
||
|
||
if self.audio is None:
|
||
print("❌ 音频设备未就绪,播放工作线程退出")
|
||
self.logger.error("音频设备未就绪,播放工作线程退出")
|
||
return
|
||
|
||
print(f"✅ 音频设备已就绪,耗时: {time.time() - wait_start_time:.1f}秒")
|
||
|
||
print("🔊 音频设备已就绪,开始创建播放流")
|
||
|
||
# 创建音频播放流
|
||
playback_stream = None
|
||
try:
|
||
playback_stream = self.audio.open(
|
||
format=self.FORMAT,
|
||
channels=self.CHANNELS,
|
||
rate=self.RATE,
|
||
output=True,
|
||
frames_per_buffer=512,
|
||
start=True # 立即启动流
|
||
)
|
||
print("🔊 音频播放流已创建")
|
||
except Exception as e:
|
||
print(f"❌ 创建音频播放流失败: {e}")
|
||
self.logger.error(f"创建音频播放流失败: {e}")
|
||
return
|
||
|
||
chunks_played = 0
|
||
total_size = 0
|
||
|
||
try:
|
||
while self.playback_worker_running:
|
||
try:
|
||
# 从播放缓冲区获取音频数据
|
||
if self.playback_buffer:
|
||
audio_chunk = self.playback_buffer.pop(0)
|
||
|
||
if audio_chunk and len(audio_chunk) > 0:
|
||
# 检查播放冷却期
|
||
current_time = time.time()
|
||
time_since_last_play = current_time - self.last_playback_time
|
||
in_cooldown = (self.last_playback_time > 0 and
|
||
time_since_last_play < self.playback_cooldown_period)
|
||
|
||
if in_cooldown:
|
||
# 在冷却期内,跳过播放
|
||
if chunks_played == 0: # 只在第一次遇到冷却期时打印
|
||
print(f"🔊 播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)")
|
||
self.logger.debug(f"播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)")
|
||
continue
|
||
|
||
# 播放音频块
|
||
if chunks_played == 0: # 只在第一次播放时打印详细信息
|
||
print(f"🔊 开始播放音频块 {chunks_played + 1}")
|
||
print(f"🔊 添加0.5秒缓冲时间,避免破音...")
|
||
time.sleep(0.5) # 添加0.5秒缓冲时间
|
||
|
||
# 确保播放状态正确
|
||
if not self.currently_playing:
|
||
self.currently_playing = False # 先设置为False,确保状态正确变化
|
||
time.sleep(0.1) # 短暂等待
|
||
self.currently_playing = True
|
||
self.last_audio_chunk_time = time.time() # 记录最后播放时间
|
||
|
||
# 新增:只要有音频播放就设置all_audio_received为True
|
||
if not self.all_audio_received:
|
||
self.all_audio_received = True
|
||
print(f"🎵 音频开始播放,设置all_audio_received=True")
|
||
|
||
print(f"🎵 播放状态变化: currently_playing = True (开始播放)")
|
||
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
|
||
|
||
# 如果是第一次播放,不设置冷却期
|
||
if chunks_played == 0:
|
||
self.last_playback_time = 0 # 第一次播放不触发冷却期
|
||
else:
|
||
self.last_playback_time = current_time # 更新最后播放时间
|
||
|
||
# 播放音频(同步阻塞,直到播放完成)
|
||
playback_stream.write(audio_chunk)
|
||
chunks_played += 1
|
||
total_size += len(audio_chunk)
|
||
|
||
# 更新性能统计
|
||
self.performance_stats['total_chunks_played'] += 1
|
||
self.performance_stats['total_audio_size'] += len(audio_chunk)
|
||
|
||
# 减少进度显示频率
|
||
if chunks_played % 10 == 0 or chunks_played <= 3:
|
||
progress = f"🔊 播放工作: {chunks_played} 块 | {total_size / 1024:.1f} KB"
|
||
print(f"\r{progress}", end='', flush=True)
|
||
|
||
# 注意:不在这里重置 currently_playing 状态,保持为 True 直到真正确定播放完成
|
||
|
||
# 如果这是最后一个音频块,主动检查播放完成
|
||
if (len(self.playback_buffer) == 0 and
|
||
len(self.preload_buffer) == 0 and
|
||
self.tts_task_queue.qsize() == 0 and
|
||
not self.playback_completed): # 防止重复设置
|
||
self.playback_completed = True
|
||
# 不在这里直接调用_finish_playback,让主处理循环处理
|
||
else:
|
||
# 空音频块,不改变播放状态,继续下一个
|
||
continue
|
||
else:
|
||
# 缓冲区为空,短暂休眠,减少CPU占用
|
||
# 只有在确定两个缓冲区都为空且没有音频播放时才设置状态为 False
|
||
if self.currently_playing:
|
||
# 检查是否真的没有音频在播放(同时检查两个缓冲区)
|
||
time_since_last_chunk = time.time() - self.last_audio_chunk_time
|
||
# 只有在两个缓冲区都为空且超过1秒没有播放新音频时才停止播放
|
||
if (len(self.playback_buffer) == 0 and
|
||
len(self.preload_buffer) == 0 and
|
||
time_since_last_chunk > 1.0):
|
||
self.currently_playing = False
|
||
print(f"🎵 播放状态变化: currently_playing = False (播放缓冲区和预加载缓冲区都为空)")
|
||
elif len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0:
|
||
# 播放缓冲区为空但预加载缓冲区有数据,自动转移数据
|
||
transfer_count = min(3, len(self.preload_buffer)) # 一次转移3个块
|
||
for _ in range(transfer_count):
|
||
if self.preload_buffer:
|
||
self.playback_buffer.append(self.preload_buffer.pop(0))
|
||
# 保持 currently_playing 为 True,因为有新数据要播放
|
||
time.sleep(0.01)
|
||
continue
|
||
|
||
except Exception as e:
|
||
print(f"❌ 播放工作线程错误: {e}")
|
||
self.logger.error(f"播放工作线程错误: {e}")
|
||
# 异常情况下,只有在确定两个缓冲区都为空且音频停止播放时才重置状态
|
||
if self.currently_playing:
|
||
time_since_last_chunk = time.time() - self.last_audio_chunk_time
|
||
if (len(self.playback_buffer) == 0 and
|
||
len(self.preload_buffer) == 0 and
|
||
time_since_last_chunk > 1.5): # 异常情况下,等待更长时间且确保缓冲区为空
|
||
self.currently_playing = False
|
||
print(f"🎵 播放状态变化: currently_playing = False (异常情况处理,缓冲区为空且{time_since_last_chunk:.1f}秒无播放)")
|
||
else:
|
||
# 保持播放状态,继续处理
|
||
pass
|
||
time.sleep(0.1)
|
||
|
||
print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB")
|
||
|
||
finally:
|
||
# 线程结束时确保状态正确
|
||
if self.currently_playing:
|
||
self.currently_playing = False
|
||
print(f"🎵 播放状态变化: currently_playing = False (播放工作线程结束)")
|
||
if playback_stream:
|
||
try:
|
||
playback_stream.stop_stream()
|
||
playback_stream.close()
|
||
except:
|
||
pass
|
||
print("🔊 音频播放流已关闭")
|
||
|
||
def _get_default_config(self) -> Dict[str, Any]:
|
||
"""获取默认配置"""
|
||
return {
|
||
'buffer_size': 1000,
|
||
'show_progress': True,
|
||
'progress_interval': 100
|
||
}
|
||
|
||
def run(self):
|
||
"""输出进程主循环 - 借鉴 recorder.py 的优雅主循环"""
|
||
self.logger.info("输出进程启动")
|
||
self._setup_audio()
|
||
|
||
# 初始化线程锁(必须在run方法中初始化,避免pickle错误)
|
||
self.audio_queue_lock = threading.Lock()
|
||
|
||
try:
|
||
while self.running:
|
||
# 1. 处理音频队列(数据接收)
|
||
self._process_audio_queue()
|
||
|
||
# 2. 检查播放状态 - 使用增强播放完成检测
|
||
if self.end_signal_received:
|
||
if self._check_enhanced_playback_completion():
|
||
self._finish_playback()
|
||
|
||
# 3. 检查设备健康状态和冷却期 - 防止回声
|
||
current_time = time.time()
|
||
time_since_last_play = current_time - self.last_playback_time
|
||
in_cooldown = (self.last_playback_time > 0 and
|
||
time_since_last_play < self.playback_cooldown_period)
|
||
|
||
# 检查TTS和播放状态
|
||
tts_active = not self.tts_task_queue.empty()
|
||
playback_active = self.currently_playing or len(self.playback_buffer) > 0
|
||
|
||
# 如果设备不健康、正在播放、TTS生成中、或在冷却期内,显示状态但继续处理播放完成检测
|
||
if not self.audio_device_healthy or tts_active or playback_active or in_cooldown:
|
||
# 显示播放状态
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
queue_size = len(self.playback_buffer) + len(self.preload_buffer)
|
||
|
||
if not self.audio_device_healthy:
|
||
status = f"🔧 设备重置中... TTS: {tts_queue_size} 播放: {queue_size}"
|
||
elif tts_active:
|
||
status = f"🎵 TTS生成中... TTS: {tts_queue_size} 播放: {queue_size}"
|
||
elif in_cooldown:
|
||
cooldown_time = self.playback_cooldown_period - time_since_last_play
|
||
status = f"🔊 播放冷却中... {cooldown_time:.1f}s TTS: {tts_queue_size} 播放: {queue_size}"
|
||
else:
|
||
playing_status = "播放中" if self.currently_playing else "等待播放"
|
||
status = f"🔊 {playing_status}... TTS: {tts_queue_size} 播放: {queue_size}"
|
||
|
||
print(f"\r{status}", end='', flush=True)
|
||
|
||
# 关键修复:即使正在播放,也要检查播放完成
|
||
if self.end_signal_received:
|
||
# 使用增强的播放完成检测
|
||
if self._check_enhanced_playback_completion():
|
||
self._finish_playback()
|
||
|
||
time.sleep(0.1) # 播放时增加延迟减少CPU使用
|
||
continue
|
||
|
||
# 3. 主动检查预加载缓冲区,确保音频能及时播放
|
||
if (not self.is_playing and
|
||
len(self.preload_buffer) >= self.min_buffer_size and
|
||
len(self.playback_buffer) == 0):
|
||
# 立即将预加载数据移到播放缓冲区
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = time.time()
|
||
print(f"🎵 主循环检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}")
|
||
|
||
# 4. 显示播放进度和性能监控
|
||
self._show_progress()
|
||
self._update_performance_stats()
|
||
|
||
# 5. 主动检查播放完成(无论什么状态都要检查)
|
||
if self.end_signal_received:
|
||
# 使用增强的播放完成检测
|
||
if self._check_enhanced_playback_completion():
|
||
self._finish_playback()
|
||
|
||
# 6. 借鉴 recorder.py: 根据播放状态调整休眠时间,优化性能
|
||
if self.is_playing and (self.playback_buffer or self.preload_buffer):
|
||
time.sleep(0.005) # 播放时极短休眠,提高响应性
|
||
else:
|
||
time.sleep(0.02) # 非播放时稍长休眠,减少CPU占用
|
||
|
||
except KeyboardInterrupt:
|
||
self.logger.info("输出进程收到中断信号")
|
||
except Exception as e:
|
||
self.logger.error(f"输出进程错误: {e}")
|
||
import traceback
|
||
print(f"❌ 输出进程错误详情: {traceback.format_exc()}")
|
||
finally:
|
||
self._cleanup()
|
||
self.logger.info("输出进程退出")
|
||
|
||
def _setup_audio(self):
|
||
"""设置音频输出设备"""
|
||
try:
|
||
print("🔊 开始初始化音频设备...")
|
||
self.audio = pyaudio.PyAudio()
|
||
print(f"🔊 PyAudio实例已创建: {self.audio}")
|
||
|
||
# 主进程不需要创建输出流,由播放工作线程负责
|
||
# 这里只创建PyAudio实例供播放工作线程使用
|
||
self.output_stream = None # 标记为None,表明主进程不直接使用输出流
|
||
|
||
self.logger.info("音频设备初始化成功")
|
||
print("🔊 音频设备初始化完成")
|
||
except Exception as e:
|
||
self.logger.error(f"音频设备初始化失败: {e}")
|
||
print(f"❌ 音频设备初始化失败: {e}")
|
||
raise
|
||
|
||
def _process_audio_queue(self):
|
||
"""处理来自主进程的音频数据 - 借鉴 recorder.py 的优雅队列处理"""
|
||
try:
|
||
processed_count = 0
|
||
end_signal_received = False
|
||
|
||
while self.running:
|
||
try:
|
||
# 使用较短的超时,保持响应性
|
||
audio_data = self.audio_queue.get(timeout=0.05)
|
||
processed_count += 1
|
||
|
||
# 减少日志输出频率,提高性能
|
||
if processed_count <= 3 or processed_count % 50 == 0:
|
||
print(f"📥 输出进程收到第 {processed_count} 个数据包")
|
||
|
||
if audio_data is None:
|
||
# 结束信号处理
|
||
if end_signal_received:
|
||
print(f"📥 输出进程已收到过结束信号,忽略重复信号")
|
||
continue
|
||
|
||
print(f"📥 输出进程收到结束信号")
|
||
end_signal_received = True
|
||
self.end_signal_received = True
|
||
self.end_signal_time = time.time() # 记录收到结束信号的时间
|
||
|
||
# 关键修复:如果已经通过ALL_AUDIO_RECEIVED信号设置了True,则不要重置为False
|
||
# 这解决了语音识别失败时all_audio_received被重置的问题
|
||
if not self.all_audio_received:
|
||
# 只有在all_audio_received为False时才设置为False
|
||
# 如果已经是True(如语音识别失败处理),保持True
|
||
self.all_audio_received = False
|
||
else:
|
||
print(f"🔧 保持all_audio_received=True(已通过语音识别失败处理设置)")
|
||
print(f"📥 收到结束信号,状态变化:")
|
||
print(f" - end_signal_received: True")
|
||
print(f" - all_audio_received: {self.all_audio_received} ({'保持True' if self.all_audio_received else '延迟设置'})")
|
||
print(f" - completion_sent: False")
|
||
print(f" - playback_completed: False")
|
||
|
||
# 重置完成事件标记
|
||
self.completion_sent = False
|
||
# 重置播放完成标志
|
||
self.playback_completed = False
|
||
print(f"📥 已重置所有播放完成相关标志")
|
||
|
||
# 使用增强的播放完成检测
|
||
if self._check_enhanced_playback_completion():
|
||
self._finish_playback()
|
||
return
|
||
else:
|
||
print(f"📥 延迟处理结束信号 - 等待LLM、TTS和播放完成")
|
||
# 重新放回队列,稍后重试
|
||
self.audio_queue.put(None)
|
||
time.sleep(0.05)
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("METADATA:"):
|
||
# 处理元数据
|
||
metadata = audio_data[9:]
|
||
print(f"📥 输出进程收到元数据: {metadata}")
|
||
continue
|
||
|
||
# 处理来自ControlSystem的流式文本命令
|
||
if isinstance(audio_data, str) and audio_data.startswith("STREAMING_TEXT:"):
|
||
# 流式文本处理 - 智能缓冲
|
||
streaming_text = audio_data[15:] # 移除 "STREAMING_TEXT:" 前缀
|
||
print(f"📥 输出进程收到流式文本: {streaming_text}")
|
||
|
||
# 检查是否需要重置状态(新的对话开始)
|
||
if self.end_signal_received:
|
||
print(f"📥 检测到新对话开始,重置end_signal_received状态")
|
||
self.end_signal_received = False
|
||
self.all_audio_received = False
|
||
self.completion_sent = False
|
||
|
||
self.process_streaming_text(streaming_text)
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("GREETING_TEXT:"):
|
||
# 打招呼文本处理 - 带缓存支持
|
||
try:
|
||
greeting_parts = audio_data[14:].split(':', 2) # 移除 "GREETING_TEXT:" 前缀
|
||
greeting_text = greeting_parts[0]
|
||
character_name = greeting_parts[1] if len(greeting_parts) > 1 else "unknown"
|
||
print(f"📥 输出进程收到打招呼文本: {greeting_text} (角色: {character_name})")
|
||
|
||
# 检查是否需要重置状态(新的对话开始)
|
||
if self.end_signal_received:
|
||
print(f"📥 检测到新对话开始,重置end_signal_received状态")
|
||
self.end_signal_received = False
|
||
self.all_audio_received = False
|
||
self.completion_sent = False
|
||
|
||
# 使用带缓存的TTS处理
|
||
print(f"🔊 准备处理打招呼文本,当前tts_speaker: {self.tts_speaker}")
|
||
self.process_greeting_text(greeting_text, character_name)
|
||
print(f"✅ 打招呼文本处理完成")
|
||
except Exception as e:
|
||
print(f"❌ 处理打招呼文本失败: {e}")
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("COMPLETE_TEXT:"):
|
||
# 完整文本处理 - 强制刷新缓冲区
|
||
complete_text = audio_data[14:] # 移除 "COMPLETE_TEXT:" 前缀
|
||
print(f"📥 输出进程收到完整文本: {complete_text}")
|
||
|
||
# 检查是否需要重置状态(新的对话开始)
|
||
if self.end_signal_received:
|
||
print(f"📥 检测到新对话开始,重置end_signal_received状态")
|
||
self.end_signal_received = False
|
||
self.all_audio_received = False
|
||
self.completion_sent = False
|
||
|
||
print(f"🔊 准备处理完整文本,当前tts_speaker: {self.tts_speaker}")
|
||
self.process_complete_text(complete_text)
|
||
print(f"✅ 完整文本处理完成")
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("UPDATE_TTS_SPEAKER:"):
|
||
# 更新TTS speaker配置
|
||
new_speaker = audio_data[19:] # 移除 "UPDATE_TTS_SPEAKER:" 前缀(19个字符)
|
||
print(f"📥 输出进程收到TTS speaker更新: {new_speaker}")
|
||
self.tts_speaker = new_speaker
|
||
print(f"✅ TTS speaker已更新为: {self.tts_speaker}")
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("FLUSH_TTS_BUFFER:"):
|
||
# 刷新TTS缓冲区 - 确保所有内容都被处理
|
||
print(f"📥 输出进程收到刷新缓冲区命令")
|
||
self._flush_tts_buffer()
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("LLM_COMPLETE:"):
|
||
# LLM生成完成信号
|
||
print(f"📥 输出进程收到LLM生成完成信号")
|
||
self.llm_generation_complete = True
|
||
# LLM完成后,如果还没有开始TTS,重置TTS完成状态
|
||
if self.tts_generation_complete and self.tts_task_queue.qsize() == 0:
|
||
print(f"📥 LLM完成但TTS队列为空,重置TTS完成状态为False")
|
||
self.tts_generation_complete = False
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("TTS_COMPLETE:"):
|
||
# TTS生成完成信号
|
||
print(f"📥 输出进程收到TTS生成完成信号")
|
||
self.tts_generation_complete = True
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("ALL_AUDIO_RECEIVED:"):
|
||
# 语音识别失败时设置的all_audio_received信号
|
||
print(f"📥 输出进程收到ALL_AUDIO_RECEIVED信号")
|
||
self.all_audio_received = True
|
||
print(f"🔧 设置all_audio_received=True(语音识别失败处理)")
|
||
continue
|
||
|
||
if isinstance(audio_data, str) and audio_data.startswith("EMERGENCY_STOP:"):
|
||
# 紧急停止命令 - 用于NFC切换时立即停止所有音频活动
|
||
print(f"🚨 输出进程收到紧急停止命令!")
|
||
self._handle_emergency_stop()
|
||
continue
|
||
|
||
# 音频数据处理
|
||
if isinstance(audio_data, bytes):
|
||
# 更新最后收到音频数据的时间
|
||
self.last_audio_time = time.time()
|
||
|
||
# 如果之前已经收到结束信号,现在又收到音频数据,重置状态
|
||
if self.all_audio_received:
|
||
print(f"📥 收到新音频数据,重置all_audio_received状态为False")
|
||
self.all_audio_received = False
|
||
|
||
# 减少日志输出,提高性能
|
||
if processed_count <= 3 or processed_count % 50 == 0:
|
||
print(f"📥 输出进程收到音频数据: {len(audio_data)} 字节")
|
||
|
||
# 直接添加到预加载缓冲区
|
||
self.preload_buffer.append(audio_data)
|
||
print(f"🎵 音频数据已添加到预加载缓冲区,当前大小: {len(self.preload_buffer)}/{self.preload_size}")
|
||
|
||
# 检查是否应该开始播放或补充播放缓冲区
|
||
if not self.is_playing and len(self.preload_buffer) >= self.preload_size:
|
||
print(f"🎵 预加载缓冲区达到{self.preload_size}个块,开始首次播放...")
|
||
# 首次启动播放
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期
|
||
|
||
# 关键修复:预加载完成时也设置last_audio_chunk_time
|
||
self.last_audio_chunk_time = time.time()
|
||
print(f"🎵 预加载完成时设置last_audio_chunk_time = {self.last_audio_chunk_time}")
|
||
|
||
# 备用:在缓冲区转移时也设置all_audio_received为True(兜底机制)
|
||
if not self.all_audio_received:
|
||
self.all_audio_received = True
|
||
print(f"🎵 缓冲区转移完成,设置all_audio_received=True(备用机制)")
|
||
# 确保播放工作线程知道有数据要播放
|
||
if not self.currently_playing:
|
||
# 播放工作线程会自动检测播放缓冲区并开始播放
|
||
pass
|
||
print(f"🎵 开始播放音频(预加载完成),播放缓冲区大小: {len(self.playback_buffer)}")
|
||
elif self.is_playing and len(self.playback_buffer) < 4 and len(self.preload_buffer) > 0:
|
||
# 正在播放时,保持播放缓冲区有足够的数据(增加到4个块)
|
||
transfer_count = min(3, len(self.preload_buffer)) # 每次转移3个块
|
||
for _ in range(transfer_count):
|
||
if self.preload_buffer:
|
||
self.playback_buffer.append(self.preload_buffer.pop(0))
|
||
elif end_signal_received and not self.is_playing and len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0:
|
||
# 关键修复:收到结束信号后,如果播放缓冲区为空但预加载缓冲区有数据,强制转移
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = 0
|
||
|
||
# 关键修复:强制转移时也设置last_audio_chunk_time
|
||
self.last_audio_chunk_time = time.time()
|
||
print(f"🎵 强制转移时设置last_audio_chunk_time = {self.last_audio_chunk_time}")
|
||
|
||
# 备用:强制转移后也设置all_audio_received为True(兜底机制)
|
||
if not self.all_audio_received:
|
||
self.all_audio_received = True
|
||
print(f"🎵 强制转移预加载缓冲区后,设置all_audio_received=True(备用机制)")
|
||
|
||
print(f"🎵 强制开始播放音频,播放缓冲区大小: {len(self.playback_buffer)}")
|
||
|
||
else:
|
||
print(f"📥 输出进程收到未知类型数据: {type(audio_data)}")
|
||
|
||
except queue.Empty:
|
||
# 队列为空时的处理
|
||
if end_signal_received:
|
||
# 使用增强的播放完成检测
|
||
if self._check_enhanced_playback_completion():
|
||
self._finish_playback()
|
||
return
|
||
else:
|
||
# 如果还有数据要播放,继续等待
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer)
|
||
if playback_queue_size > 0:
|
||
print(f"📥 还有 {playback_queue_size} 个音频块待播放,等待播放完成")
|
||
time.sleep(0.2) # 增加等待时间
|
||
elif tts_queue_size == 0 and len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0:
|
||
# 关键修复:播放缓冲区为空但预加载缓冲区还有数据,需要转移数据
|
||
print(f"📥 播放缓冲区为空但预加载缓冲区有 {len(self.preload_buffer)} 个数据块,转移数据到播放缓冲区")
|
||
transfer_count = min(len(self.preload_buffer), 3) # 一次转移最多3个块
|
||
for _ in range(transfer_count):
|
||
if self.preload_buffer:
|
||
self.playback_buffer.append(self.preload_buffer.pop(0))
|
||
print(f"📥 已转移 {transfer_count} 个数据块到播放缓冲区")
|
||
# 如果播放工作线程没有在播放,需要确保状态正确
|
||
if not self.currently_playing and len(self.playback_buffer) > 0:
|
||
print(f"📥 转移数据后,确保播放状态正确(播放缓冲区有数据但currently_playing=False)")
|
||
# 播放工作线程会自动检测并开始播放
|
||
time.sleep(0.2) # 增加等待时间
|
||
|
||
# 检查是否应该补充播放缓冲区的数据
|
||
if not self.is_playing and len(self.preload_buffer) >= self.min_buffer_size:
|
||
# 首次启动播放(最小缓冲区模式)
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期
|
||
|
||
# 关键修复:最小缓冲区模式下也设置last_audio_chunk_time
|
||
self.last_audio_chunk_time = time.time()
|
||
print(f"🎵 最小缓冲区模式下设置last_audio_chunk_time = {self.last_audio_chunk_time}")
|
||
|
||
# 备用:最小缓冲区模式下也设置all_audio_received为True(兜底机制)
|
||
if not self.all_audio_received:
|
||
self.all_audio_received = True
|
||
print(f"🎵 音频开始播放(最小缓冲区模式),设置all_audio_received=True(备用机制)")
|
||
print(f"🎵 开始播放音频(最小缓冲区满足)")
|
||
print(f"🔍 已重置last_playback_time,避免立即触发冷却期")
|
||
elif self.is_playing and len(self.playback_buffer) < 2 and len(self.preload_buffer) > 0:
|
||
# 正在播放时补充数据,避免播放缓冲区耗尽
|
||
transfer_count = min(1, len(self.preload_buffer)) # 每次转移1个块
|
||
for _ in range(transfer_count):
|
||
if self.preload_buffer:
|
||
self.playback_buffer.append(self.preload_buffer.pop(0))
|
||
|
||
# 退出循环,避免过度占用CPU
|
||
if processed_count > 0:
|
||
break
|
||
else:
|
||
# 关键修复:即使没有处理数据,也要检查播放完成
|
||
if self.end_signal_received:
|
||
# 使用增强的播放完成检测
|
||
if self._check_enhanced_playback_completion():
|
||
self._finish_playback()
|
||
|
||
time.sleep(0.01)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"处理音频队列时出错: {e}")
|
||
print(f"❌ 输出进程处理队列时出错: {e}")
|
||
|
||
def _play_audio(self):
|
||
"""播放音频数据 - 简化版本,主要由播放工作线程处理"""
|
||
# 现在播放逻辑主要由专门的播放工作线程处理
|
||
# 这个方法只负责一些状态检查和协调工作
|
||
|
||
# 检查是否应该将预加载缓冲区的数据移到播放缓冲区
|
||
if (not self.is_playing and
|
||
len(self.preload_buffer) >= self.min_buffer_size and
|
||
len(self.playback_buffer) == 0):
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = time.time()
|
||
print(f"🎵 主线程检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}")
|
||
|
||
# 如果缓冲区为空且之前在播放,检查是否应该发送完成事件
|
||
if (self.is_playing and
|
||
len(self.playback_buffer) == 0 and
|
||
len(self.preload_buffer) == 0 and
|
||
not self.currently_playing):
|
||
# 短暂等待,确保所有音频都播放完成
|
||
time.sleep(0.1)
|
||
if len(self.playback_buffer) == 0 and len(self.preload_buffer) == 0:
|
||
self.is_playing = False
|
||
print(f"🎵 播放缓冲区已清空,准备发送完成事件")
|
||
# 注意:不在这里直接发送完成事件,由_process_audio_queue中的结束信号处理
|
||
|
||
def _show_progress(self):
|
||
"""显示播放进度"""
|
||
if (self.config['show_progress'] and
|
||
self.total_chunks_played > 0 and
|
||
self.total_chunks_played % self.config['progress_interval'] == 0):
|
||
|
||
progress = f"🔊 播放进度: {self.total_chunks_played} 块 | {self.total_audio_size / 1024:.1f} KB"
|
||
print(f"\r{progress}", end='', flush=True)
|
||
|
||
def _finish_playback(self):
|
||
"""完成播放 - 借鉴 recorder.py 的优雅完成机制"""
|
||
# 防止重复发送完成事件
|
||
if self.completion_sent:
|
||
print("📡 输出进程:完成事件已发送过,跳过重复发送")
|
||
return
|
||
|
||
print("📡 输出进程:开始执行播放完成逻辑")
|
||
|
||
|
||
self.is_playing = False
|
||
self.playback_buffer.clear()
|
||
self.preload_buffer.clear()
|
||
self.last_playback_time = 0
|
||
|
||
# 重置所有增强的状态标志
|
||
self.llm_generation_complete = False
|
||
self.tts_generation_complete = False
|
||
self.all_audio_received = False
|
||
self.pre_buffer_empty = False
|
||
self.playback_buffer_empty = False
|
||
self.no_active_playback = False
|
||
self.end_signal_received = False
|
||
self.playback_completed = False
|
||
self.end_signal_time = 0 # 重置结束信号时间
|
||
|
||
print("📡 输出进程:已重置所有播放完成状态标志")
|
||
|
||
# 通知主进程播放完成
|
||
if self.event_queue:
|
||
try:
|
||
# 发送播放完成事件到主进程
|
||
completion_event = ProcessEvent(
|
||
event_type='playback_complete',
|
||
metadata={
|
||
'completion_time': time.time()
|
||
}
|
||
)
|
||
self.event_queue.put(completion_event)
|
||
print("📡 输出进程:已发送播放完成事件到主进程")
|
||
self.completion_sent = True
|
||
except Exception as e:
|
||
print(f"❌ 输出进程:发送播放完成事件失败: {e}")
|
||
self.logger.error(f"发送播放完成事件失败: {e}")
|
||
else:
|
||
print("⚠️ 输出进程:未设置事件队列,无法通知主进程播放完成")
|
||
|
||
# 额外等待确保音频设备完全停止
|
||
print("📡 等待1秒确保音频完全播放完毕...")
|
||
time.sleep(1.0)
|
||
print("📡 输出进程:播放完成逻辑执行完毕")
|
||
|
||
def _check_enhanced_playback_completion(self):
|
||
"""增强的播放完成检测 - 考虑LLM、TTS和缓冲区状态"""
|
||
if not self.end_signal_received:
|
||
return False
|
||
|
||
# print(f"🔍 增强播放完成检查开始...")
|
||
|
||
# 更新状态变量
|
||
self.pre_buffer_empty = (len(self.preload_buffer) == 0)
|
||
self.playback_buffer_empty = (len(self.playback_buffer) == 0)
|
||
self.no_active_playback = (not self.currently_playing) # 同步更新状态变量
|
||
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
|
||
# 计算时间差
|
||
current_time = time.time()
|
||
time_since_last_chunk = current_time - self.last_audio_chunk_time
|
||
|
||
# print(f"🔍 增强播放完成检查详情:")
|
||
# print(f" - end_signal_received: {self.end_signal_received}")
|
||
# print(f" - llm_generation_complete: {self.llm_generation_complete}")
|
||
# print(f" - tts_generation_complete: {self.tts_generation_complete}")
|
||
# print(f" - all_audio_received: {self.all_audio_received}")
|
||
# print(f" - tts_queue_size: {tts_queue_size}")
|
||
# print(f" - tts_buffer_size: {len(self.tts_buffer)}")
|
||
# print(f" - playback_buffer: {len(self.playback_buffer)}")
|
||
# print(f" - preload_buffer: {len(self.preload_buffer)}")
|
||
# print(f" - currently_playing: {self.currently_playing}")
|
||
# print(f" - is_playing: {self.is_playing}")
|
||
# print(f" - last_audio_chunk_time: {self.last_audio_chunk_time}")
|
||
# print(f" - time_since_last_chunk: {time_since_last_chunk:.3f}秒")
|
||
|
||
|
||
# 检查TTS是否正在生成 - 移除自动修正逻辑
|
||
# 不再自动修正TTS生成状态,等待真正的TTS_COMPLETE信号
|
||
|
||
# TTS正在生成的条件:队列中有任务 或 还有待处理的缓冲区内容 或 TTS生成未完成
|
||
tts_is_generating = (tts_queue_size > 0 or len(self.tts_buffer) > 0 or not self.tts_generation_complete)
|
||
# print(f" - tts_is_generating: {tts_is_generating}")
|
||
# print(f" - pre_buffer_empty: {self.pre_buffer_empty}")
|
||
# print(f" - playback_buffer_empty: {self.playback_buffer_empty}")
|
||
# print(f" - no_active_playback: {self.no_active_playback}")
|
||
|
||
# 特殊处理1:如果没有任何音频数据,暂时不设置all_audio_received,等待TTS生成
|
||
# 注意:这里不自动设置all_audio_received,因为可能TTS还在生成中
|
||
# 等待播放完成检测中的超时机制来处理这种情况
|
||
|
||
# 特殊处理2:如果all_audio_received为False但其他条件都满足,强制设置为True
|
||
# 移除自动修正all_audio_received的逻辑,等待真正的音频数据
|
||
# print(f"🔍 播放状态检查(移除自动修正机制):")
|
||
# print(f" - llm_generation_complete: {self.llm_generation_complete}")
|
||
# print(f" - tts_generation_complete: {self.tts_generation_complete}")
|
||
# print(f" - all_audio_received: {self.all_audio_received}")
|
||
# print(f" - pre_buffer_empty: {self.pre_buffer_empty}")
|
||
# print(f" - playback_buffer_empty: {self.playback_buffer_empty}")
|
||
# print(f" - no_active_playback: {self.no_active_playback}")
|
||
# print(f" - tts_queue_size == 0: {tts_queue_size == 0}")
|
||
# print(f" - not tts_is_generating: {not tts_is_generating}")
|
||
|
||
# 检查是否所有条件都满足 - 使用更新的状态变量,添加TTS生成状态检查
|
||
all_conditions_met = (
|
||
self.llm_generation_complete and
|
||
self.tts_generation_complete and
|
||
self.all_audio_received and
|
||
self.pre_buffer_empty and
|
||
self.playback_buffer_empty and
|
||
self.no_active_playback and # 使用状态变量
|
||
tts_queue_size == 0 and
|
||
not tts_is_generating # 新增:确保TTS不在生成中
|
||
)
|
||
|
||
# print(f"🔍 最终条件检查结果:")
|
||
# print(f" - all_conditions_met: {all_conditions_met}")
|
||
# print(f" - 各个条件详情:")
|
||
# print(f" * llm_generation_complete: {self.llm_generation_complete}")
|
||
# print(f" * tts_generation_complete: {self.tts_generation_complete}")
|
||
# print(f" * all_audio_received: {self.all_audio_received}")
|
||
# print(f" * pre_buffer_empty: {self.pre_buffer_empty}")
|
||
# print(f" * playback_buffer_empty: {self.playback_buffer_empty}")
|
||
# print(f" * no_active_playback: {self.no_active_playback}")
|
||
# print(f" * tts_queue_size == 0: {tts_queue_size == 0}")
|
||
# print(f" * not tts_is_generating: {not tts_is_generating}")
|
||
|
||
if all_conditions_met:
|
||
print(f"✅ 所有播放完成条件已满足,进行时间检查...")
|
||
print(f" - last_audio_chunk_time: {self.last_audio_chunk_time}")
|
||
print(f" - time_since_last_chunk: {time_since_last_chunk:.3f}秒")
|
||
print(f" - 需要等待时间: 1.0秒")
|
||
|
||
# 额外时间检查:确保音频真正播放完成
|
||
if self.last_audio_chunk_time > 0 and time_since_last_chunk > 1.0: # 至少1.0秒没有新音频播放(增加等待时间确保音频完成)
|
||
print(f"✅ 所有播放完成条件已满足,且{time_since_last_chunk:.2f}秒无新音频,可以结束播放")
|
||
return True
|
||
elif self.last_audio_chunk_time > 0 and time_since_last_chunk <= 1.0:
|
||
print(f"⏳ 所有条件满足但等待时间不足(已等待{time_since_last_chunk:.2f}秒,需要1.0秒)...")
|
||
return False
|
||
elif self.last_audio_chunk_time == 0:
|
||
print(f"🔍 从未开始播放的情况,检查是否有音频数据...")
|
||
# 如果从未开始播放,检查是否有音频数据
|
||
if len(self.playback_buffer) == 0 and len(self.preload_buffer) == 0:
|
||
print(f"🔍 播放缓冲区和预加载缓冲区都为空")
|
||
# 检查是否有TTS任务在排队
|
||
if self.tts_task_queue.qsize() == 0 and len(self.tts_buffer) == 0:
|
||
print(f"🔍 TTS队列和缓冲区都为空")
|
||
|
||
# 检查是否刚刚收到结束信号,需要给TTS一些时间生成音频
|
||
time_since_end_signal = time.time() - getattr(self, 'end_signal_time', 0)
|
||
if not hasattr(self, 'end_signal_time'):
|
||
self.end_signal_time = time.time()
|
||
|
||
print(f"🔍 距离收到结束信号: {time_since_end_signal:.2f}秒")
|
||
|
||
# 如果距离收到结束信号时间很短(< 5秒),可能是TTS还在生成中
|
||
if time_since_end_signal < 5.0:
|
||
print(f"⏳ 刚收到结束信号,等待TTS生成音频(已等待{time_since_end_signal:.1f}秒)...")
|
||
return False
|
||
elif time_since_end_signal < 10.0:
|
||
print(f"⏳ 已等待{time_since_end_signal:.1f}秒,继续等待TTS生成...")
|
||
return False
|
||
else:
|
||
# 等待超过10秒,确实没有音频数据,可能是TTS失败
|
||
print(f"⚠️ 已等待{time_since_end_signal:.1f}秒仍无音频数据,可能TTS失败,强制结束")
|
||
return True
|
||
else:
|
||
print(f"🔍 还有TTS任务待处理: tts_queue_size={tts_queue_size}, tts_buffer_size={len(self.tts_buffer)}")
|
||
# 还有TTS任务待处理
|
||
print(f"⏳ 从未开始播放但还有TTS任务待处理,等待TTS生成...")
|
||
return False
|
||
else:
|
||
print(f"🔍 还有音频数据: playback_buffer={len(self.playback_buffer)}, preload_buffer={len(self.preload_buffer)}")
|
||
print(f"⏳ 从未开始播放但还有音频数据,等待播放开始...")
|
||
return False
|
||
else:
|
||
print(f"⏳ 所有条件满足但等待音频完全播放(最后播放于{time_since_last_chunk:.2f}秒前)...")
|
||
return False
|
||
|
||
# 如果LLM和TTS都完成了,但还有音频数据,等待播放完成
|
||
if (self.llm_generation_complete and
|
||
self.tts_generation_complete and
|
||
self.all_audio_received and
|
||
tts_queue_size == 0 and
|
||
not tts_is_generating): # 新增:确保TTS不在生成中
|
||
|
||
if self.pre_buffer_empty and self.playback_buffer_empty:
|
||
if self.no_active_playback: # 使用状态变量
|
||
# 额外检查:确保最后播放的音频已经完成播放
|
||
if self.last_audio_chunk_time > 0:
|
||
time_since_last_chunk = time.time() - self.last_audio_chunk_time
|
||
if time_since_last_chunk > 0.8: # 增加到0.8秒确保音频完全播放
|
||
print(f"✅ LLM和TTS完成,所有缓冲区已清空,播放器空闲,最后播放于{time_since_last_chunk:.1f}秒前")
|
||
return True
|
||
else:
|
||
print(f"⏳ 等待最后音频播放完成(最后播放于{time_since_last_chunk:.1f}秒前,需要0.8秒)...")
|
||
return False
|
||
else:
|
||
# 从未开始播放的情况
|
||
print(f"⚠️ LLM和TTS完成,缓冲区清空,但从未开始播放,可能播放失败")
|
||
return True
|
||
else:
|
||
print(f"⏳ 等待最后的音频播放完成(currently_playing={self.currently_playing})...")
|
||
time.sleep(0.3)
|
||
# 重新更新状态
|
||
self.no_active_playback = (not self.currently_playing)
|
||
if self.no_active_playback:
|
||
if self.last_audio_chunk_time > 0:
|
||
time_since_last_chunk = time.time() - self.last_audio_chunk_time
|
||
if time_since_last_chunk > 0.5:
|
||
print(f"✅ 最后的音频播放完成(最后播放于{time_since_last_chunk:.1f}秒前)")
|
||
return True
|
||
else:
|
||
print(f"⏳ 仍在等待音频完全播放完成(最后播放于{time_since_last_chunk:.1f}秒前)...")
|
||
return False
|
||
else:
|
||
print(f"⚠️ 播放器空闲但从未开始播放,可能播放失败")
|
||
return True
|
||
else:
|
||
print(f"⏳ 播放器仍在活跃状态,继续等待...")
|
||
return False
|
||
else:
|
||
print(f"⏳ 等待缓冲区数据播放完成 - 预缓冲: {len(self.preload_buffer)}, 播放缓冲: {len(self.playback_buffer)}")
|
||
return False
|
||
|
||
# 如果LLM还未完成,但其他条件满足,等待LLM完成
|
||
if not self.llm_generation_complete:
|
||
print(f"⏳ 等待LLM生成完成...")
|
||
return False
|
||
|
||
# 如果TTS还未完成,但LLM已完成,等待TTS完成
|
||
if not self.tts_generation_complete:
|
||
return False
|
||
|
||
# 如果音频还未完全接收,等待接收完成
|
||
if not self.all_audio_received:
|
||
print(f"⏳ 等待所有音频接收完成...")
|
||
return False
|
||
|
||
return False
|
||
|
||
|
||
# 注意:简化播放完成检测已移除,统一使用增强播放完成检测 _check_enhanced_playback_completion()
|
||
|
||
|
||
def _cleanup(self):
|
||
"""清理资源"""
|
||
print("🔊 开始清理输出进程资源...")
|
||
|
||
# 停止播放工作线程
|
||
self.playback_worker_running = False
|
||
if self.playback_worker_thread:
|
||
print("🔊 等待播放工作线程退出...")
|
||
self.playback_worker_thread.join(timeout=3.0)
|
||
|
||
# 停止 TTS 工作线程
|
||
self.tts_worker_running = False
|
||
if self.tts_worker_thread:
|
||
print("🔊 等待TTS工作线程退出...")
|
||
self.tts_task_queue.put(None) # 发送结束信号
|
||
self.tts_worker_thread.join(timeout=2.0)
|
||
|
||
# 清理主进程的输出流(如果存在)
|
||
if self.output_stream:
|
||
try:
|
||
self.output_stream.stop_stream()
|
||
self.output_stream.close()
|
||
print("🔊 主进程输出流已关闭")
|
||
except Exception as e:
|
||
print(f"⚠️ 关闭主进程输出流时出错: {e}")
|
||
|
||
# 清理PyAudio实例
|
||
if self.audio:
|
||
try:
|
||
self.audio.terminate()
|
||
print("🔊 PyAudio实例已终止")
|
||
except Exception as e:
|
||
print(f"⚠️ 终止PyAudio实例时出错: {e}")
|
||
|
||
print("🔊 输出进程资源清理完成")
|
||
|
||
def _start_tts_worker(self):
|
||
"""启动TTS工作线程"""
|
||
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
|
||
self.tts_worker_thread.start()
|
||
self.logger.info("TTS工作线程已启动")
|
||
|
||
def _tts_worker(self):
|
||
"""TTS工作线程 - 处理TTS任务队列"""
|
||
while self.tts_worker_running:
|
||
try:
|
||
# 从队列获取任务
|
||
task = self.tts_task_queue.get(timeout=1.0)
|
||
if task is None: # 结束信号
|
||
break
|
||
|
||
task_type = task[0]
|
||
if task_type == "tts_sentence":
|
||
# 生成音频数据并发送到播放队列
|
||
if len(task) == 3:
|
||
# 带角色名称的TTS任务
|
||
_, text, character_name = task
|
||
self._generate_tts_audio(text, character_name)
|
||
else:
|
||
# 普通TTS任务
|
||
_, content = task
|
||
self._generate_tts_audio(content)
|
||
elif task_type == "cached_audio":
|
||
# 处理缓存音频
|
||
_, text, audio_data, character_name = task
|
||
self._process_cached_audio(text, audio_data, character_name)
|
||
|
||
except queue.Empty:
|
||
continue
|
||
except Exception as e:
|
||
self.logger.error(f"TTS工作线程错误: {e}")
|
||
time.sleep(0.1)
|
||
|
||
def _handle_emergency_stop(self):
|
||
"""处理紧急停止命令 - 立即停止所有音频活动"""
|
||
print("🚨 输出进程开始执行紧急停止...")
|
||
|
||
try:
|
||
# 1. 立即停止音频播放
|
||
print("🛑 立即停止音频播放...")
|
||
if self.currently_playing:
|
||
self.currently_playing = False
|
||
print("✅ 播放状态已重置")
|
||
|
||
if self.is_playing:
|
||
self.is_playing = False
|
||
print("✅ 播放状态标志已重置")
|
||
|
||
# 2. 清空所有音频缓冲区
|
||
print("🛑 清空所有音频缓冲区...")
|
||
self.playback_buffer.clear()
|
||
self.preload_buffer.clear()
|
||
self.tts_buffer.clear()
|
||
print("✅ 音频缓冲区已清空")
|
||
|
||
# 3. 停止所有TTS生成
|
||
print("🛑 停止所有TTS生成...")
|
||
self.tts_generation_complete = True
|
||
self.llm_generation_complete = True
|
||
self.all_audio_received = True
|
||
print("✅ TTS生成状态已重置")
|
||
|
||
# 4. 重置所有播放完成检测状态
|
||
print("🛑 重置播放完成检测状态...")
|
||
self.end_signal_received = False
|
||
self.completion_sent = False
|
||
self.playback_completed = False
|
||
self.end_signal_time = 0
|
||
self.last_audio_chunk_time = 0
|
||
self.last_audio_time = 0
|
||
print("✅ 播放完成检测状态已重置")
|
||
|
||
# 5. 重置首次播放状态
|
||
print("🛑 重置首次播放状态...")
|
||
if hasattr(self, '_first_playback_started'):
|
||
self._first_playback_started = False
|
||
if hasattr(self, '_first_text_time'):
|
||
delattr(self, '_first_text_time')
|
||
print("✅ 首次播放状态已重置")
|
||
|
||
# 6. 清空TTS任务队列
|
||
print("🛑 清空TTS任务队列...")
|
||
try:
|
||
while not self.tts_task_queue.empty():
|
||
try:
|
||
self.tts_task_queue.get_nowait()
|
||
except queue.Empty:
|
||
break
|
||
print("✅ TTS任务队列已清空")
|
||
except Exception as e:
|
||
print(f"⚠️ 清空TTS队列时出错: {e}")
|
||
|
||
# 7. 重置播放冷却期
|
||
print("🛑 重置播放冷却期...")
|
||
self.last_playback_time = 0
|
||
print("✅ 播放冷却期已重置")
|
||
|
||
# 8. 等待一小段时间确保所有音频停止
|
||
print("⏱️ 等待音频完全停止...")
|
||
time.sleep(0.3)
|
||
|
||
print("🚨 输出进程紧急停止完成")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 输出进程紧急停止时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def _add_tts_task(self, content, character_name=None):
|
||
"""添加TTS任务到队列"""
|
||
try:
|
||
# 如果指定了角色名称,先检查缓存
|
||
if character_name:
|
||
cached_audio = load_cached_audio(character_name)
|
||
if cached_audio:
|
||
print(f"🎵 使用缓存音频,角色: {character_name}")
|
||
self.tts_task_queue.put_nowait(("cached_audio", content, cached_audio, character_name))
|
||
return True
|
||
else:
|
||
print(f"🎵 无缓存音频,正常生成TTS,角色: {character_name}")
|
||
self.tts_task_queue.put_nowait(("tts_sentence", content, character_name))
|
||
return True
|
||
else:
|
||
# 正常TTS任务
|
||
self.tts_task_queue.put_nowait(("tts_sentence", content))
|
||
return True
|
||
except queue.Full:
|
||
self.logger.warning("TTS任务队列已满,丢弃任务")
|
||
return False
|
||
|
||
def _process_cached_audio(self, text, audio_data, character_name):
|
||
"""处理缓存音频数据 - 直接发送到播放队列"""
|
||
try:
|
||
print(f"🎵 处理缓存音频: {text[:30]}... (角色: {character_name})")
|
||
print(f"🎵 缓存音频大小: {len(audio_data)} 字节")
|
||
|
||
# 重置播放状态(与TTS生成保持一致)
|
||
self.completion_sent = False
|
||
self._last_audio_size = 0
|
||
self.tts_generation_complete = False
|
||
|
||
# 将缓存音频数据直接添加到预加载缓冲区
|
||
print(f"🎵 将缓存音频添加到预加载缓冲区")
|
||
self.preload_buffer.append(audio_data)
|
||
|
||
# 立即设置TTS生成完成状态(因为缓存音频相当于已经生成完成)
|
||
self.tts_generation_complete = True
|
||
|
||
# 重要修复:不在这里立即设置all_audio_received=True
|
||
# 等待音频实际开始播放时再设置,避免时序问题
|
||
print(f"🎵 缓存音频已准备好,等待播放开始时设置all_audio_received")
|
||
|
||
# 检查是否应该开始播放(关键修复)
|
||
if (not self.is_playing and len(self.preload_buffer) >= self.preload_size):
|
||
print(f"🎵 缓存音频预加载完成,开始播放...")
|
||
# 将预加载的数据移到播放缓冲区
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = 0 # 重置播放时间,避免冷却期
|
||
|
||
# 关键修复:在音频开始播放时设置last_audio_chunk_time
|
||
self.last_audio_chunk_time = time.time()
|
||
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
|
||
|
||
# 现在可以设置all_audio_received为True
|
||
self.all_audio_received = True
|
||
print(f"🎵 缓存音频开始播放,设置all_audio_received=True")
|
||
|
||
self.logger.info("开始播放缓存音频(预加载完成)")
|
||
elif (not self.is_playing and len(self.preload_buffer) > 0):
|
||
print(f"🎵 缓存音频已添加但未达到预加载大小,强制开始播放...")
|
||
# 对于缓存音频,直接开始播放
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = 0 # 重置播放时间,避免冷却期
|
||
|
||
# 关键修复:在音频开始播放时设置last_audio_chunk_time
|
||
self.last_audio_chunk_time = time.time()
|
||
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
|
||
|
||
# 现在可以设置all_audio_received为True
|
||
self.all_audio_received = True
|
||
print(f"🎵 缓存音频强制开始播放,设置all_audio_received=True")
|
||
|
||
self.logger.info("强制开始播放缓存音频")
|
||
|
||
# 发送TTS完成信号到主队列
|
||
try:
|
||
tts_complete_command = "TTS_COMPLETE:"
|
||
self.audio_queue.put(tts_complete_command)
|
||
print(f"🎵 缓存音频已发送TTS完成信号到主队列")
|
||
except Exception as e:
|
||
print(f"❌ 发送TTS完成信号失败: {e}")
|
||
|
||
print(f"✅ 缓存音频处理完成")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"缓存音频处理失败: {e}")
|
||
# 即使失败也要设置TTS完成状态,避免系统卡住
|
||
self.tts_generation_complete = True
|
||
# 失败时也要设置all_audio_received避免死锁
|
||
self.all_audio_received = True
|
||
print(f"🔧 缓存音频处理失败,设置完成状态为True")
|
||
|
||
def _generate_tts_audio(self, text, character_name=None):
|
||
"""生成TTS音频数据并发送到统一播放队列 - 借鉴 recorder.py 的流式处理"""
|
||
try:
|
||
print(f"🎵 OutputProcess开始生成TTS音频: {text[:50]}...")
|
||
print(f"🎵 文本总长度: {len(text)} 字符")
|
||
|
||
# 简化:直接使用预加载缓冲区
|
||
print(f"🎵 开始生成TTS音频,将直接添加到预加载缓冲区")
|
||
|
||
# 重置播放状态
|
||
self.completion_sent = False
|
||
self._last_audio_size = 0
|
||
self.tts_generation_complete = False
|
||
|
||
# 用于收集音频数据以保存缓存
|
||
all_audio_data = []
|
||
|
||
# 构建请求头
|
||
headers = {
|
||
"X-Api-App-Id": self.tts_app_id,
|
||
"X-Api-Access-Key": self.tts_access_key,
|
||
"X-Api-Resource-Id": self.tts_resource_id,
|
||
"X-Api-App-Key": self.tts_app_key,
|
||
"Content-Type": "application/json",
|
||
"Connection": "keep-alive"
|
||
}
|
||
|
||
# 构建请求参数
|
||
payload = {
|
||
"user": {
|
||
"uid": "output_process_tts"
|
||
},
|
||
"req_params": {
|
||
"text": text,
|
||
"speaker": self.tts_speaker,
|
||
"audio_params": {
|
||
"format": "pcm",
|
||
"sample_rate": 16000,
|
||
"enable_timestamp": True
|
||
},
|
||
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
|
||
}
|
||
}
|
||
|
||
# 发送请求
|
||
session = requests.Session()
|
||
try:
|
||
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
|
||
|
||
if response.status_code != 200:
|
||
self.logger.error(f"TTS请求失败: {response.status_code}")
|
||
return False
|
||
|
||
print(f"🎵 TTS请求成功,状态码: {response.status_code}")
|
||
|
||
# 处理流式响应 - 借鉴 recorder.py 的优化策略
|
||
total_audio_size = 0
|
||
chunk_count = 0
|
||
success_count = 0
|
||
|
||
print(f"🎵 开始接收TTS音频流...")
|
||
|
||
for chunk in response.iter_lines(decode_unicode=True):
|
||
if not chunk:
|
||
continue
|
||
|
||
try:
|
||
data = json.loads(chunk)
|
||
if data.get("code", 0) == 0 and "data" in data and data["data"]:
|
||
chunk_audio = base64.b64decode(data["data"])
|
||
audio_size = len(chunk_audio)
|
||
total_audio_size += audio_size
|
||
chunk_count += 1
|
||
|
||
# 检查音频数据是否异常小
|
||
if audio_size < 100:
|
||
print(f"⚠️ 警告:音频块 {chunk_count} 大小异常小: {audio_size} 字节")
|
||
|
||
# 检查是否连续收到小音频块(但不警告最后一个块,通常较小)
|
||
if hasattr(self, '_last_audio_size') and self._last_audio_size > 0 and chunk_count < 15: # 不检查最后几个块
|
||
if audio_size < self._last_audio_size * 0.5:
|
||
print(f"⚠️ 警告:音频块 {chunk_count} 大小突然减小: {self._last_audio_size} -> {audio_size}")
|
||
self._last_audio_size = audio_size
|
||
|
||
# 简化:直接添加到预加载缓冲区
|
||
try:
|
||
# 添加到预加载缓冲区
|
||
self.preload_buffer.append(chunk_audio)
|
||
# 收集音频数据用于缓存
|
||
all_audio_data.append(chunk_audio)
|
||
success_count += 1
|
||
|
||
|
||
# 检查是否应该开始播放
|
||
if (not self.is_playing and
|
||
len(self.preload_buffer) >= self.preload_size):
|
||
# 将预加载的数据移到播放缓冲区
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
self.is_playing = True
|
||
self.last_playback_time = time.time()
|
||
self.logger.info("开始播放TTS音频(预加载完成)")
|
||
|
||
# 减少进度显示频率
|
||
if chunk_count % 50 == 0: # 进一步减少显示频率
|
||
progress = (f"🎵 TTS生成中: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB")
|
||
print(f"\r{progress}", end='', flush=True)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"添加音频到预加载缓冲区失败: {e}")
|
||
continue
|
||
|
||
continue
|
||
|
||
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
|
||
# 处理句子信息
|
||
continue
|
||
|
||
if data.get("code", 0) == 20000000:
|
||
# 真正的结束信号
|
||
print(f"🎵 收到TTS流结束信号,总共处理了 {chunk_count} 个音频块")
|
||
print(f"🎵 总音频大小: {total_audio_size} 字节 ({total_audio_size/1024:.1f} KB)")
|
||
break
|
||
|
||
if data.get("code", 0) > 0:
|
||
self.logger.error(f"TTS错误响应: {data}")
|
||
break
|
||
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
# 处理剩余的预加载数据
|
||
if self.preload_buffer:
|
||
print(f"🔊 将剩余的 {len(self.preload_buffer)} 个音频块转移到播放缓冲区")
|
||
self.playback_buffer.extend(self.preload_buffer)
|
||
self.preload_buffer.clear()
|
||
if not self.is_playing:
|
||
self.is_playing = True
|
||
self.last_playback_time = time.time()
|
||
self.logger.info("开始播放TTS音频(处理剩余预加载)")
|
||
else:
|
||
print(f"⚠️ TTS生成完成但预加载缓冲区为空!")
|
||
|
||
success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0
|
||
self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB")
|
||
|
||
# 保存音频到缓存(如果指定了角色名称且有音频数据)
|
||
if character_name and all_audio_data and success_count > 0:
|
||
try:
|
||
# 合并所有音频数据
|
||
complete_audio = b''.join(all_audio_data)
|
||
if save_greeting_cache(character_name, complete_audio):
|
||
print(f"✅ TTS音频已保存到缓存,角色: {character_name}")
|
||
else:
|
||
print(f"⚠️ TTS音频缓存保存失败,角色: {character_name}")
|
||
except Exception as e:
|
||
print(f"❌ 保存TTS音频缓存时出错: {e}")
|
||
|
||
# 通知自己TTS生成已完成
|
||
self.tts_generation_complete = True
|
||
print(f"🎵 OutputProcess TTS生成已完成")
|
||
|
||
# 关键修复:如果TTS生成完成但没有生成任何音频数据,设置all_audio_received为True
|
||
# 这解决了语音转文字失败时的死锁问题
|
||
if success_count == 0:
|
||
print(f"🔧 TTS生成完成但没有音频数据,设置all_audio_received=True以避免死锁")
|
||
self.all_audio_received = True
|
||
|
||
# 发送TTS完成信号到主队列
|
||
try:
|
||
tts_complete_command = "TTS_COMPLETE:"
|
||
self.audio_queue.put(tts_complete_command)
|
||
print(f"🎵 已发送TTS完成信号到主队列")
|
||
except Exception as e:
|
||
print(f"❌ 发送TTS完成信号失败: {e}")
|
||
|
||
# 简化:直接使用统一播放完成检测机制
|
||
self.logger.info("TTS生成完成,等待统一播放完成检测机制处理...")
|
||
|
||
return success_count > 0
|
||
|
||
finally:
|
||
response.close()
|
||
session.close()
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"TTS音频生成失败: {e}")
|
||
# 即使失败也要设置TTS完成状态,避免系统卡住
|
||
self.tts_generation_complete = True
|
||
# 关键修复:TTS生成失败时也要设置all_audio_received为True以避免死锁
|
||
print(f"🔧 TTS生成失败,设置all_audio_received=True以避免死锁")
|
||
self.all_audio_received = True
|
||
# 发送TTS完成信号到主队列
|
||
try:
|
||
tts_complete_command = "TTS_COMPLETE:"
|
||
self.audio_queue.put(tts_complete_command)
|
||
print(f"🎵 TTS生成失败,但已发送TTS完成信号到主队列")
|
||
except Exception as send_error:
|
||
print(f"❌ 发送TTS完成信号失败: {send_error}")
|
||
return False
|
||
|
||
# ========== 智能句子缓冲系统 - 从 recorder.py 借鉴 ==========
|
||
|
||
def _should_trigger_tts(self, sentence):
|
||
"""智能判断是否应该触发TTS - 确保首句足够长且积累足够内容"""
|
||
current_time = time.time()
|
||
|
||
# 修改策略:允许合理的TTS并行生成,但控制队列长度
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
if tts_queue_size >= 3: # 允许最多3个TTS任务在队列中
|
||
print(f"🎵 TTS队列达到上限: 当前队列大小={tts_queue_size}, 暂时跳过")
|
||
return False
|
||
|
||
# 首次播放的特殊处理:确保有足够内容
|
||
if not hasattr(self, '_first_playback_started') or not self._first_playback_started:
|
||
total_buffered_text = ''.join(self.tts_buffer) + sentence
|
||
|
||
# 首次播放必须满足以下条件之一:
|
||
# 1. 总文本长度超过40字符且至少有2个句子
|
||
if len(total_buffered_text) >= 40 and len(self.tts_buffer) >= 1:
|
||
print(f"🎵 首次播放触发:总长度{len(total_buffered_text)}字符,{len(self.tts_buffer)+1}个句子")
|
||
self._first_playback_started = True
|
||
return True
|
||
# 2. 有1个完整长句子(超过25字符)
|
||
elif len(sentence) >= 25 and self._is_complete_sentence(sentence) and len(self.tts_buffer) >= 1:
|
||
print(f"🎵 首次播放触发:长句子{len(sentence)}字符+缓冲内容")
|
||
self._first_playback_started = True
|
||
return True
|
||
# 3. 缓冲区达到最大值
|
||
elif len(self.tts_buffer) >= self.tts_buffer_max_size:
|
||
print(f"🎵 首次播放触发:缓冲区达到最大值{len(self.tts_buffer)}")
|
||
self._first_playback_started = True
|
||
return True
|
||
# 4. 超过5秒还没触发(防止无限等待)
|
||
elif hasattr(self, '_first_text_time') and (current_time - self._first_text_time) > 5.0:
|
||
print(f"🎵 首次播放触发:超时5秒,当前长度{len(total_buffered_text)}字符")
|
||
self._first_playback_started = True
|
||
return True
|
||
else:
|
||
# 首次播放前记录第一个文本的时间
|
||
if not hasattr(self, '_first_text_time'):
|
||
self._first_text_time = current_time
|
||
return False
|
||
|
||
# 非首次播放的正常逻辑
|
||
|
||
# 再次检查TTS队列状态(防止在首次播放检查后队列又有变化)
|
||
tts_queue_size = self.tts_task_queue.qsize()
|
||
if tts_queue_size >= 3: # 保持一致的队列限制
|
||
print(f"🎵 TTS队列达到上限(非首次): 当前队列大小={tts_queue_size}, 暂时跳过")
|
||
return False
|
||
|
||
# 检查缓冲区大小
|
||
if len(self.tts_buffer) >= self.tts_buffer_max_size:
|
||
return True
|
||
|
||
# 检查时间窗口
|
||
time_since_last = current_time - self.tts_last_trigger_time
|
||
if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size:
|
||
return True
|
||
|
||
# 检查是否为完整句子(使用严格的检测)
|
||
if self._is_complete_sentence(sentence):
|
||
return True
|
||
|
||
# 检查句子特征 - 优化:降低长句子触发阈值(30字符以上)
|
||
if len(sentence) > 30: # 超过30字符的句子立即触发
|
||
return True
|
||
|
||
# 中等长度句子(20-30字符)如果有结束标点也触发
|
||
if len(sentence) > 20:
|
||
end_punctuations = ['。', '!', '?', '.', '!', '?']
|
||
if any(sentence.strip().endswith(p) for p in end_punctuations):
|
||
return True
|
||
|
||
# 优化:即使短句子,如果缓冲区有内容且时间过半也触发
|
||
if len(self.tts_buffer) > 0 and time_since_last >= (self.tts_accumulation_time * 0.7):
|
||
return True
|
||
|
||
# 短句子只在缓冲区较多或时间窗口到期时触发
|
||
return False
|
||
|
||
def _process_tts_buffer(self):
|
||
"""处理TTS缓冲区 - 发送累积的句子到TTS"""
|
||
if not self.tts_buffer:
|
||
return
|
||
|
||
# 合并缓冲区的句子
|
||
combined_text = ''.join(self.tts_buffer)
|
||
print(f"🔊 合并后的文本: '{combined_text}' (长度: {len(combined_text)})")
|
||
|
||
# 重置TTS生成完成状态 - 关键修复
|
||
self.tts_generation_complete = False
|
||
|
||
# 添加到TTS任务队列
|
||
if self._add_tts_task(combined_text):
|
||
print(f"🎵 触发TTS: {combined_text[:50]}...")
|
||
self.tts_last_trigger_time = time.time()
|
||
else:
|
||
print(f"❌ 添加TTS任务失败")
|
||
|
||
# 清空缓冲区
|
||
self.tts_buffer.clear()
|
||
|
||
def _add_sentence_to_buffer(self, sentence):
|
||
"""添加句子到智能缓冲区 - 核心方法"""
|
||
if not sentence.strip():
|
||
print(f"🔊 句子为空,不添加到缓冲区")
|
||
return
|
||
|
||
self.tts_buffer.append(sentence)
|
||
|
||
# 检查是否应该触发TTS
|
||
should_trigger = self._should_trigger_tts(sentence)
|
||
if should_trigger:
|
||
self._process_tts_buffer()
|
||
|
||
def _flush_tts_buffer(self):
|
||
"""强制刷新TTS缓冲区 - 确保所有内容都被处理"""
|
||
if self.tts_buffer:
|
||
self._process_tts_buffer()
|
||
|
||
def _is_complete_sentence(self, text):
|
||
"""检测是否为完整句子 - 从 recorder.py 完全移植"""
|
||
import re
|
||
|
||
if not text or len(text.strip()) == 0:
|
||
return False
|
||
|
||
# 增加最小长度要求 - 至少10个字符才考虑作为完整句子
|
||
if len(text.strip()) < 10:
|
||
return False
|
||
|
||
# 句子结束标点符号
|
||
sentence_endings = r'[。!?.!?]'
|
||
|
||
# 检查是否以句子结束符结尾
|
||
if re.search(sentence_endings + r'\s*$', text):
|
||
# 对于以结束符结尾的句子,要求至少15个字符
|
||
if len(text.strip()) >= 15:
|
||
return True
|
||
|
||
# 检查是否包含句子结束符(可能在句子中间)
|
||
if re.search(sentence_endings, text):
|
||
# 如果后面有其他标点或字符,可能不是完整句子
|
||
remaining_text = re.split(sentence_endings, text, 1)[-1]
|
||
if len(remaining_text.strip()) > 0:
|
||
return False
|
||
# 对于包含结束符的句子,要求至少20个字符
|
||
if len(text.strip()) >= 20:
|
||
return True
|
||
|
||
# 对于较长的文本(超过50字符),即使没有结束符也可以考虑
|
||
if len(text.strip()) >= 50:
|
||
return True
|
||
|
||
# 对于中等长度的文本,如果包含常见完整句式模式
|
||
if len(text.strip()) >= 20:
|
||
common_patterns = [
|
||
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
|
||
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
|
||
r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得
|
||
]
|
||
|
||
for pattern in common_patterns:
|
||
if re.match(pattern, text):
|
||
return True
|
||
|
||
return False
|
||
|
||
def _filter_parentheses_content(self, text):
|
||
"""过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植"""
|
||
import re
|
||
|
||
# 移除中文括号内容:(内容)
|
||
filtered_text = re.sub(r'([^)]*)', '', text)
|
||
# 移除英文括号内容:(content)
|
||
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
|
||
# 移除方括号内容:【内容】
|
||
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
|
||
# 移除方括号内容:[content]
|
||
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
|
||
# 移除书名号内容:「内容」
|
||
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
|
||
|
||
# 清理多余空格
|
||
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
|
||
|
||
return filtered_text
|
||
|
||
def process_streaming_text(self, text):
|
||
"""处理流式文本 - 新增的公共接口,用于与LLM流式输出集成"""
|
||
print(f"🔊 收到流式文本: '{text}' (长度: {len(text)})")
|
||
if not text.strip():
|
||
print(f"🔊 流式文本为空,跳过处理")
|
||
return
|
||
|
||
# 过滤括号内容
|
||
filtered_text = self._filter_parentheses_content(text.strip())
|
||
print(f"🔊 过滤后的文本: '{filtered_text}' (长度: {len(filtered_text)})")
|
||
|
||
if filtered_text:
|
||
# 检查是否是第一次收到流式文本(即所有状态都还是完成状态)
|
||
if (self.llm_generation_complete and
|
||
self.tts_generation_complete and
|
||
self.all_audio_received):
|
||
print(f"🔊 首次收到流式文本,重置所有播放完成检测状态")
|
||
self.llm_generation_complete = False
|
||
self.tts_generation_complete = False
|
||
self.all_audio_received = False
|
||
self.end_signal_received = False
|
||
self.completion_sent = False
|
||
# 重置首次播放状态
|
||
self._first_playback_started = False
|
||
if hasattr(self, '_first_text_time'):
|
||
delattr(self, '_first_text_time')
|
||
print(f"🔊 已重置首次播放状态,新的对话将重新积累内容")
|
||
|
||
# 使用智能句子缓冲系统
|
||
print(f"🔊 添加文本到智能缓冲区")
|
||
self._add_sentence_to_buffer(filtered_text)
|
||
else:
|
||
print(f"🔊 过滤后文本为空,不添加到缓冲区")
|
||
|
||
def process_complete_text(self, text):
|
||
"""处理完整文本 - 强制刷新缓冲区"""
|
||
print(f"🔊 process_complete_text 被调用,输入文本: {text}")
|
||
if not text.strip():
|
||
print("🔊 文本为空,返回")
|
||
return
|
||
|
||
# 过滤括号内容
|
||
filtered_text = self._filter_parentheses_content(text.strip())
|
||
print(f"🔊 过滤后文本: {filtered_text}")
|
||
|
||
if filtered_text:
|
||
# 重置所有播放完成检测状态 - 开始新的对话
|
||
self.llm_generation_complete = False
|
||
self.tts_generation_complete = False
|
||
self.all_audio_received = False
|
||
self.end_signal_received = False
|
||
self.completion_sent = False
|
||
# 重置首次播放状态
|
||
self._first_playback_started = False
|
||
if hasattr(self, '_first_text_time'):
|
||
delattr(self, '_first_text_time')
|
||
print(f"🔊 处理完整文本:已重置所有播放完成检测状态")
|
||
|
||
# 直接添加到缓冲区并强制处理
|
||
print(f"🔊 添加文本到TTS缓冲区: {filtered_text}")
|
||
self.tts_buffer.append(filtered_text)
|
||
print(f"🔊 当前TTS缓冲区大小: {len(self.tts_buffer)}")
|
||
print(f"🔊 开始处理TTS缓冲区...")
|
||
self._process_tts_buffer()
|
||
print(f"🔊 TTS缓冲区处理完成")
|
||
else:
|
||
print("🔊 过滤后文本为空,不添加到缓冲区")
|
||
|
||
def process_greeting_text(self, text, character_name):
|
||
"""处理打招呼文本 - 带缓存支持"""
|
||
print(f"🔊 process_greeting_text 被调用,输入文本: {text} (角色: {character_name})")
|
||
if not text.strip():
|
||
print("🔊 文本为空,返回")
|
||
return
|
||
|
||
# 过滤括号内容
|
||
filtered_text = self._filter_parentheses_content(text.strip())
|
||
print(f"🔊 过滤后文本: {filtered_text}")
|
||
|
||
if filtered_text:
|
||
# 重置所有播放完成检测状态 - 开始新的对话
|
||
self.llm_generation_complete = False
|
||
self.tts_generation_complete = False
|
||
self.all_audio_received = False
|
||
self.end_signal_received = False
|
||
self.completion_sent = False
|
||
# 重置首次播放状态
|
||
self._first_playback_started = False
|
||
if hasattr(self, '_first_text_time'):
|
||
delattr(self, '_first_text_time')
|
||
print(f"🔊 处理打招呼文本:已重置所有播放完成检测状态")
|
||
|
||
# 添加到TTS缓冲区,并传递角色名称用于缓存
|
||
print(f"🔊 添加打招呼文本到TTS缓冲区: {filtered_text} (角色: {character_name})")
|
||
self.tts_buffer.append(filtered_text)
|
||
print(f"🔊 当前TTS缓冲区大小: {len(self.tts_buffer)}")
|
||
print(f"🔊 开始处理TTS缓冲区(带缓存支持)...")
|
||
|
||
# 修改_process_tts_buffer调用,传递角色名称
|
||
self._process_tts_buffer_with_cache(character_name)
|
||
print(f"🔊 TTS缓冲区处理完成")
|
||
else:
|
||
print("🔊 过滤后文本为空,不添加到缓冲区")
|
||
|
||
def _process_tts_buffer_with_cache(self, character_name):
|
||
"""处理TTS缓冲区 - 发送累积的句子到TTS(带缓存支持)"""
|
||
if not self.tts_buffer:
|
||
return
|
||
|
||
# 合并缓冲区的句子
|
||
combined_text = ''.join(self.tts_buffer)
|
||
print(f"🔊 合并后的文本: '{combined_text}' (长度: {len(combined_text)})")
|
||
|
||
# 重置TTS生成完成状态 - 关键修复
|
||
self.tts_generation_complete = False
|
||
|
||
# 添加到TTS任务队列(带角色名称用于缓存)
|
||
if self._add_tts_task(combined_text, character_name):
|
||
print(f"🎵 触发TTS: {combined_text[:50]}... (角色: {character_name})")
|
||
self.tts_last_trigger_time = time.time()
|
||
else:
|
||
print(f"❌ 添加TTS任务失败")
|
||
|
||
# 清空缓冲区
|
||
self.tts_buffer.clear()
|
||
|
||
# ========== 原有方法保持不变 ==========
|
||
|
||
def _update_performance_stats(self):
|
||
"""更新性能统计信息"""
|
||
current_buffer_size = len(self.playback_buffer) + len(self.preload_buffer)
|
||
self.performance_stats['avg_buffer_size'] = (
|
||
self.performance_stats['avg_buffer_size'] * 0.9 + current_buffer_size * 0.1
|
||
)
|
||
if current_buffer_size > self.performance_stats['max_buffer_size']:
|
||
self.performance_stats['max_buffer_size'] = current_buffer_size
|
||
|
||
# 每5秒打印一次性能统计
|
||
if hasattr(self, '_last_stats_time'):
|
||
if time.time() - self._last_stats_time >= 5.0:
|
||
self._print_performance_stats()
|
||
self._last_stats_time = time.time()
|
||
else:
|
||
self._last_stats_time = time.time()
|
||
|
||
def _print_performance_stats(self):
|
||
"""打印性能统计信息"""
|
||
stats = self.performance_stats
|
||
print(f"📊 性能统计: 平均缓冲={stats['avg_buffer_size']:.1f}, "
|
||
f"最大缓冲={stats['max_buffer_size']}, 已播放={stats['total_chunks_played']}块, "
|
||
f"音频大小={stats['total_audio_size']/1024:.1f}KB")
|
||
|
||
def process_tts_request(self, text):
|
||
"""处理TTS请求的公共接口 - 兼容原有接口"""
|
||
# 使用新的智能缓冲系统
|
||
self.process_complete_text(text)
|
||
return True
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
print("音频进程模块测试")
|
||
print("这个模块应该在多进程环境中运行")
|