Local-Voice/audio_processes.py
2025-09-26 13:21:15 +08:00

2831 lines
133 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程音频处理模块
定义输入进程和输出进程的类
使用增强版语音检测器
"""
import base64
import glob
import gzip
import json
import multiprocessing as mp
import os
import queue
import threading
import time
import wave
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
import numpy as np
import pyaudio
import requests
from enhanced_voice_detector import EnhancedVoiceDetector
from process_logger import ProcessLogger
# ========== Greeting缓存管理工具函数 ==========
def get_greeting_cache_path(character_name):
"""获取角色greeting缓存文件路径"""
return os.path.join("greeting_cache", f"{character_name}.wav")
def greeting_cache_exists(character_name):
"""检查角色greeting缓存是否存在"""
cache_path = get_greeting_cache_path(character_name)
return os.path.exists(cache_path)
def load_cached_audio(character_name):
"""加载缓存的音频数据"""
# 确保缓存目录存在
cache_dir = "greeting_cache"
os.makedirs(cache_dir, exist_ok=True)
cache_path = get_greeting_cache_path(character_name)
if not os.path.exists(cache_path):
return None
try:
with open(cache_path, 'rb') as f:
audio_data = f.read()
print(f"✅ 已加载角色 {character_name} 的缓存音频: {len(audio_data)} 字节")
return audio_data
except Exception as e:
print(f"❌ 加载缓存音频失败: {e}")
return None
def save_greeting_cache(character_name, audio_data):
"""保存greeting音频到缓存"""
try:
# 确保缓存目录存在
os.makedirs("greeting_cache", exist_ok=True)
cache_path = get_greeting_cache_path(character_name)
with open(cache_path, 'wb') as f:
f.write(audio_data)
print(f"✅ 已保存角色 {character_name} 的greeting音频到缓存: {len(audio_data)} 字节")
return True
except Exception as e:
print(f"❌ 保存缓存音频失败: {e}")
return False
# ========== 录音文件清理机制 ==========
def cleanup_recordings(retention_hours=24, dry_run=False):
"""
清理过期的录音文件
Args:
retention_hours: 保留时间小时默认24小时
dry_run: 是否只是模拟运行,不实际删除文件
Returns:
dict: 清理结果统计
"""
try:
# 计算截止时间
cutoff_time = time.time() - (retention_hours * 3600)
# 查找所有录音文件
recording_files = glob.glob("recording_*.wav")
stats = {
'total_files': len(recording_files),
'files_to_delete': 0,
'files_deleted': 0,
'total_size_mb': 0.0,
'freed_space_mb': 0.0,
'deleted_files': []
}
if not recording_files:
print(f"🧹 未找到录音文件")
return stats
# 计算总大小
for file_path in recording_files:
try:
file_size = os.path.getsize(file_path)
stats['total_size_mb'] += file_size / (1024 * 1024)
except:
pass
print(f"🧹 开始清理录音文件(保留 {retention_hours} 小时内的文件)")
print(f"📊 找到 {stats['total_files']} 个录音文件,总大小: {stats['total_size_mb']:.2f} MB")
if dry_run:
print(f"🔍 模拟运行模式,不会实际删除文件")
# 检查每个文件
for file_path in recording_files:
try:
# 获取文件修改时间
file_mtime = os.path.getmtime(file_path)
if file_mtime < cutoff_time:
# 文件已过期
stats['files_to_delete'] += 1
# 获取文件大小
file_size = os.path.getsize(file_path)
stats['freed_space_mb'] += file_size / (1024 * 1024)
if not dry_run:
# 实际删除文件
os.remove(file_path)
stats['files_deleted'] += 1
stats['deleted_files'].append(os.path.basename(file_path))
print(f"🗑️ 已删除: {file_path} ({file_size / 1024:.1f} KB)")
else:
print(f"🔍 标记删除: {file_path} ({file_size / 1024:.1f} KB)")
except Exception as e:
print(f"❌ 处理文件 {file_path} 时出错: {e}")
continue
# 输出清理结果
print(f"📊 清理完成:")
print(f" - 总文件数: {stats['total_files']}")
print(f" - 应删除文件数: {stats['files_to_delete']}")
if not dry_run:
print(f" - 实际删除文件数: {stats['files_deleted']}")
print(f" - 释放空间: {stats['freed_space_mb']:.2f} MB")
return stats
except Exception as e:
print(f"❌ 清理录音文件时出错: {e}")
return {'error': str(e)}
def cleanup_recordings_by_count(max_files=50, dry_run=False):
"""
按文件数量清理录音文件保留最新的N个文件
Args:
max_files: 最大保留文件数量默认50个
dry_run: 是否只是模拟运行,不实际删除文件
Returns:
dict: 清理结果统计
"""
try:
# 查找所有录音文件并按修改时间排序
recording_files = glob.glob("recording_*.wav")
stats = {
'total_files': len(recording_files),
'files_to_delete': 0,
'files_deleted': 0,
'total_size_mb': 0.0,
'freed_space_mb': 0.0,
'deleted_files': []
}
if not recording_files:
print(f"🧹 未找到录音文件")
return stats
# 按修改时间排序(最新的在前)
recording_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
# 计算总大小
for file_path in recording_files:
try:
file_size = os.path.getsize(file_path)
stats['total_size_mb'] += file_size / (1024 * 1024)
except:
pass
print(f"🧹 开始按数量清理录音文件(保留最新的 {max_files} 个文件)")
print(f"📊 找到 {stats['total_files']} 个录音文件,总大小: {stats['total_size_mb']:.2f} MB")
if dry_run:
print(f"🔍 模拟运行模式,不会实际删除文件")
# 删除超出数量的文件
files_to_delete = recording_files[max_files:]
stats['files_to_delete'] = len(files_to_delete)
for file_path in files_to_delete:
try:
# 获取文件大小
file_size = os.path.getsize(file_path)
stats['freed_space_mb'] += file_size / (1024 * 1024)
if not dry_run:
# 实际删除文件
os.remove(file_path)
stats['files_deleted'] += 1
stats['deleted_files'].append(os.path.basename(file_path))
print(f"🗑️ 已删除: {file_path} ({file_size / 1024:.1f} KB)")
else:
print(f"🔍 标记删除: {file_path} ({file_size / 1024:.1f} KB)")
except Exception as e:
print(f"❌ 删除文件 {file_path} 时出错: {e}")
continue
# 输出清理结果
print(f"📊 清理完成:")
print(f" - 总文件数: {stats['total_files']}")
print(f" - 应删除文件数: {stats['files_to_delete']}")
if not dry_run:
print(f" - 实际删除文件数: {stats['files_deleted']}")
print(f" - 释放空间: {stats['freed_space_mb']:.2f} MB")
return stats
except Exception as e:
print(f"❌ 按数量清理录音文件时出错: {e}")
return {'error': str(e)}
def list_recordings():
"""
列出所有录音文件及其信息
Returns:
list: 录音文件信息列表
"""
try:
recording_files = glob.glob("recording_*.wav")
file_info = []
for file_path in recording_files:
try:
stat = os.stat(file_path)
file_info.append({
'filename': os.path.basename(file_path),
'path': file_path,
'size_kb': stat.st_size / 1024,
'size_mb': stat.st_size / (1024 * 1024),
'created_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.st_ctime)),
'modified_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.st_mtime)),
'age_hours': (time.time() - stat.st_mtime) / 3600
})
except Exception as e:
print(f"❌ 获取文件信息 {file_path} 时出错: {e}")
continue
# 按修改时间排序(最新的在前)
file_info.sort(key=lambda x: x['modified_time'], reverse=True)
return file_info
except Exception as e:
print(f"❌ 列出录音文件时出错: {e}")
return []
def print_recording_summary():
"""打印录音文件摘要信息"""
recordings = list_recordings()
if not recordings:
print("📁 未找到录音文件")
return
total_size_mb = sum(r['size_mb'] for r in recordings)
oldest_file = min(recordings, key=lambda x: x['age_hours'])
newest_file = max(recordings, key=lambda x: x['age_hours'])
print(f"📁 录音文件摘要:")
print(f" - 文件数量: {len(recordings)}")
print(f" - 总大小: {total_size_mb:.2f} MB")
print(f" - 最新文件: {newest_file['filename']} ({newest_file['created_time']})")
print(f" - 最旧文件: {oldest_file['filename']} ({oldest_file['created_time']}, {oldest_file['age_hours']:.1f} 小时前)")
# 显示最近的5个文件
print(f"📄 最近的文件:")
for i, recording in enumerate(recordings[:5]):
print(f" {i+1}. {recording['filename']} - {recording['size_mb']:.2f} MB - {recording['created_time']}")
if len(recordings) > 5:
print(f" ... 还有 {len(recordings) - 5} 个文件")
class RecordingState(Enum):
"""录音状态枚举"""
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
PLAYING = "playing"
@dataclass
class AudioSegment:
"""音频片段数据结构"""
audio_data: bytes
start_time: float
end_time: float
duration: float
metadata: Dict[str, Any] = None
@dataclass
class ControlCommand:
"""控制命令数据结构"""
command: str
parameters: Dict[str, Any] = None
@dataclass
class ProcessEvent:
"""进程事件数据结构"""
event_type: str
data: Optional[bytes] = None
metadata: Dict[str, Any] = None
class InputProcess:
"""输入进程 - 专门负责录音和语音检测"""
def __init__(self, command_queue: mp.Queue, event_queue: mp.Queue, config: Dict[str, Any] = None):
self.command_queue = command_queue # 主进程 → 输入进程
self.event_queue = event_queue # 输入进程 → 主进程
# 配置参数
self.config = config or self._get_default_config()
# 初始化日志记录器
self.logger = ProcessLogger("InputProcess")
# 音频参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 1024
# 状态控制
self.recording_enabled = False # 是否允许录音(默认不启用,等待外部命令)
self.is_recording = False # 是否正在录音
self.recording_buffer = [] # 录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
self.silence_start_time = None
self.recording_start_time = None
# 预录音参数
self.pre_record_duration = 2.0
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE)
# 增强语音检测器
self.voice_detector = EnhancedVoiceDetector(
sample_rate=self.RATE,
chunk_size=self.CHUNK_SIZE
)
# 静音检测参数
self.consecutive_silence_count = 0
self.silence_threshold_count = 30 # 约3秒
# PyAudio实例
self.audio = None
self.input_stream = None
# 运行状态
self.running = True
# TTS 工作线程
self.tts_worker_running = True
self.tts_worker_thread = None
self.tts_task_queue = mp.Queue(maxsize=20) # 增加到20个任务容量
# TTS 配置
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
self.tts_app_id = "8718217928"
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.tts_resource_id = "volc.service_type.10029"
self.tts_app_key = "aGjiRDfUWi"
self.tts_speaker = config.get('tts_speaker', "zh_female_wanqudashu_moon_bigtts") if config else "zh_female_wanqudashu_moon_bigtts"
# 启动 TTS 工作线程
self._start_tts_worker()
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
'zcr_min': 2400, # 适应16kHz采样率的ZCR最小值
'zcr_max': 12000, # 适应16kHz采样率的ZCR最大值
'min_recording_time': 2.0, # 最小录音时间
'max_recording_time': 30.0,
'silence_threshold': 3.0,
'pre_record_duration': 2.0
}
def run(self):
"""输入进程主循环"""
self.logger.info("输入进程启动")
self._setup_audio()
try:
while self.running:
# 1. 检查主进程命令
self._check_commands()
# 2. 处理音频的条件:
# - 录音已启用,或者
# - 正在校准中(需要音频数据进行校准)
should_process_audio = (self.recording_enabled or
(hasattr(self.voice_detector, 'calibration_mode') and
self.voice_detector.calibration_mode))
if should_process_audio:
self._process_audio()
# 3. 短暂休眠减少CPU占用
time.sleep(0.01)
except KeyboardInterrupt:
self.logger.info("输入进程收到中断信号")
except Exception as e:
self.logger.error(f"输入进程错误: {e}")
finally:
self._cleanup()
self.logger.info("输入进程退出")
def start_calibration(self):
"""开始校准语音检测器"""
if hasattr(self, 'voice_detector') and self.voice_detector:
self.voice_detector.reset()
self.voice_detector.calibration_mode = True
self.voice_detector.calibration_samples = 0
# 确保音频流已设置(校准需要音频数据)
if not self.input_stream:
self._setup_audio()
print("🎙️ 输入进程:开始语音检测器校准")
return True
return False
def start_monitoring(self):
"""开始监听音频"""
if not self.recording_enabled:
self.recording_enabled = True
# 重新初始化音频流
self._cleanup_audio_stream()
self.recording_buffer = []
self.pre_record_buffer = []
self.is_recording = False
self.silence_start_time = None
self.consecutive_silence_count = 0
self._setup_audio()
print("🎙️ 输入进程:开始音频监听")
return True
return False
def stop_monitoring(self):
"""停止监听音频"""
if self.recording_enabled:
self.recording_enabled = False
if self.is_recording:
self._stop_recording()
self._cleanup_audio_stream()
print("🎙️ 输入进程:停止音频监听")
return True
return False
def get_calibration_status(self):
"""获取校准状态"""
if hasattr(self, 'voice_detector') and self.voice_detector:
return {
'calibrating': self.voice_detector.calibration_mode,
'progress': self.voice_detector.calibration_samples / self.voice_detector.required_calibration,
'samples': self.voice_detector.calibration_samples,
'required': self.voice_detector.required_calibration
}
return {'calibrating': False, 'progress': 0, 'samples': 0, 'required': 0}
def get_monitoring_status(self):
"""获取监听状态"""
return {
'enabled': self.recording_enabled,
'recording': self.is_recording,
'audio_stream_active': self.input_stream is not None
}
def _setup_audio(self):
"""设置音频输入设备"""
try:
self.audio = pyaudio.PyAudio()
self.input_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
self.logger.info("音频设备初始化成功")
except Exception as e:
self.logger.error(f"音频设备初始化失败: {e}")
raise
def _cleanup_audio_stream(self):
"""清理音频流但不关闭整个PyAudio实例"""
try:
if self.input_stream:
print("🎙️ 输入进程:关闭音频输入流")
self.input_stream.stop_stream()
self.input_stream.close()
self.input_stream = None
except Exception as e:
self.logger.warning(f"关闭音频流时出错: {e}")
def _cleanup(self):
"""清理资源"""
print("🎙️ 输入进程:开始清理资源")
self._cleanup_audio_stream()
if self.audio:
try:
print("🎙️ 输入进程终止PyAudio实例")
self.audio.terminate()
except:
pass
self.audio = None
print("🎙️ 输入进程:资源清理完成")
def _check_commands(self):
"""检查主进程控制命令"""
try:
while True:
command = self.command_queue.get_nowait()
if command.command == 'enable_recording':
if not self.recording_enabled:
# 从禁用到启用,需要重新初始化音频流
print("🎙️ 输入进程:重新启用录音功能,重新初始化音频流")
self._cleanup_audio_stream()
# 清空所有音频缓冲区,防止旧数据被录制
self.recording_buffer = []
self.pre_record_buffer = []
self.is_recording = False
self.silence_start_time = None
self.consecutive_silence_count = 0
print("🎙️ 输入进程:已清空所有音频缓冲区")
self._setup_audio()
self.recording_enabled = True
self.logger.info("录音功能已启用")
elif command.command == 'disable_recording':
self.recording_enabled = False
# 如果正在录音,立即停止并发送数据
if self.is_recording:
self._stop_recording()
# 禁用时关闭音频流,避免回声污染
self._cleanup_audio_stream()
self.logger.info("录音功能已禁用")
elif command.command == 'start_calibration':
success = self.start_calibration()
if success:
self.logger.info("校准已启动")
else:
self.logger.error("校准启动失败")
elif command.command == 'start_monitoring':
success = self.start_monitoring()
if success:
self.logger.info("监听已启动")
else:
self.logger.error("监听启动失败")
elif command.command == 'stop_monitoring':
success = self.stop_monitoring()
if success:
self.logger.info("监听已停止")
else:
self.logger.error("监听停止失败")
elif command.command == 'get_calibration_status':
status = self.get_calibration_status()
self.event_queue.put(ProcessEvent(
event_type='calibration_status',
metadata=status
))
elif command.command == 'get_monitoring_status':
status = self.get_monitoring_status()
self.event_queue.put(ProcessEvent(
event_type='monitoring_status',
metadata=status
))
elif command.command == 'shutdown':
self.logger.info("收到关闭命令")
self.running = False
return
elif command.command == 'emergency_stop':
# 紧急停止命令 - 立即停止所有音频活动
print("🚨 输入进程收到紧急停止命令!")
self._handle_emergency_stop()
return
elif command.command == 'cleanup_recordings':
# 清理录音文件命令
print("🧹 输入进程收到清理录音文件命令")
self._perform_cleanup()
elif command.command == 'list_recordings':
# 列出录音文件命令
print("📋 输入进程收到列出录音文件命令")
self._list_recordings_info()
except queue.Empty:
pass
def _process_audio(self):
"""处理音频数据"""
try:
# 检查音频流是否存在
if not self.input_stream:
return
data = self.input_stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
return
# 更新预录音缓冲区
self._update_pre_record_buffer(data)
# 使用增强语音检测器
detection_result = self.voice_detector.is_voice_advanced(data)
is_voice = detection_result['is_voice']
# 如果正在校准,确保语音检测器能处理数据
if hasattr(self.voice_detector, 'calibration_mode') and self.voice_detector.calibration_mode:
# 校准模式下显示进度
if hasattr(self.voice_detector, 'calibration_samples'):
progress = (self.voice_detector.calibration_samples /
self.voice_detector.required_calibration * 100) if hasattr(self.voice_detector, 'required_calibration') else 0
status = f"\r🎙️ 校准中... {progress:.1f}%"
print(status, end='', flush=True)
if self.is_recording:
# 录音模式
self.recording_buffer.append(data)
# 静音检测
if is_voice:
self.silence_start_time = None
self.consecutive_silence_count = 0
else:
self.consecutive_silence_count += 1
if self.silence_start_time is None:
self.silence_start_time = time.time()
# 检查是否应该停止录音
recording_duration = time.time() - self.recording_start_time
should_stop = False
# 静音检测
if (self.consecutive_silence_count >= self.silence_threshold_count and
recording_duration >= self.config['min_recording_time']):
should_stop = True
print(f"🎙️ 输入进程:静音检测触发停止录音")
# 最大时间检测
if recording_duration >= self.config['max_recording_time']:
should_stop = True
print(f"🎙️ 输入进程:达到最大录音时间")
if should_stop:
self._stop_recording()
else:
# 显示录音状态
confidence = detection_result.get('confidence', 0.0)
energy = detection_result.get('energy', 0.0)
zcr = detection_result.get('zcr', 0.0)
status = (f"\r🎙️ 录音中... {recording_duration:.1f}s | "
f"能量: {energy:.0f} | ZCR: {zcr:.0f} | "
f"语音: {is_voice} | 置信度: {confidence:.2f}")
print(status, end='', flush=True)
else:
# 监听模式
if is_voice:
# 检测到语音,开始录音
self._start_recording()
else:
# 显示监听状态
if detection_result['calibrating']:
progress = detection_result['calibration_progress'] * 100
status = f"\r🎙️ 校准中... {progress:.0f}%"
else:
confidence = detection_result.get('confidence', 0.0)
energy = detection_result.get('energy', 0.0)
zcr = detection_result.get('zcr', 0.0)
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = (f"\r🎙️ 监听中... 能量: {energy:.0f} | ZCR: {zcr:.0f} | "
f"语音: {is_voice} | 置信度: {confidence:.2f} | "
f"缓冲: {buffer_usage:.0f}%")
print(status, end='', flush=True)
except Exception as e:
print(f"🎙️ 输入进程音频处理错误: {e}")
# 如果音频流出现问题,尝试重新初始化
if "stream" in str(e).lower() or "audio" in str(e).lower():
print(f"🎙️ 输入进程:音频流异常,尝试重新初始化")
self._cleanup_audio_stream()
if self.recording_enabled:
self._setup_audio()
def _update_pre_record_buffer(self, audio_data: bytes):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
# 保持缓冲区大小
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def _start_recording(self):
"""开始录音"""
if not self.recording_enabled:
return
self.is_recording = True
self.recording_buffer = []
self.recording_start_time = time.time()
self.silence_start_time = None
self.consecutive_silence_count = 0
# 将预录音缓冲区的内容添加到录音中
self.recording_buffer.extend(self.pre_record_buffer)
self.pre_record_buffer.clear()
print(f"🎙️ 输入进程:开始录音(包含预录音 {self.config['pre_record_duration']}秒)")
def _stop_recording(self):
"""停止录音并发送数据"""
if not self.is_recording:
return
self.is_recording = False
# 合并录音数据
if self.recording_buffer:
audio_data = b''.join(self.recording_buffer)
duration = len(audio_data) / (self.RATE * 2)
# 创建音频片段
segment = AudioSegment(
audio_data=audio_data,
start_time=self.recording_start_time,
end_time=time.time(),
duration=duration,
metadata={
'sample_rate': self.RATE,
'channels': self.CHANNELS,
'format': self.FORMAT,
'chunk_size': self.CHUNK_SIZE
}
)
# 保存录音文件(可选)
filename = self._save_recording(audio_data)
# 发送给主进程
self.event_queue.put(ProcessEvent(
event_type='recording_complete',
data=audio_data,
metadata={
'duration': duration,
'start_time': self.recording_start_time,
'filename': filename
}
))
print(f"📝 输入进程:录音完成,时长 {duration:.2f}")
# 清空缓冲区
self.recording_buffer = []
self.pre_record_buffer = []
def _save_recording(self, audio_data: bytes) -> str:
"""保存录音文件"""
try:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"💾 输入进程:录音已保存到 {filename}")
return filename
except Exception as e:
print(f"❌ 输入进程保存录音失败: {e}")
return None
def _list_recordings_info(self):
"""列出录音文件信息"""
try:
print_recording_summary()
except Exception as e:
print(f"❌ 列出录音文件信息时出错: {e}")
def _cleanup(self):
"""清理资源"""
# 停止 TTS 工作线程
self.tts_worker_running = False
if self.tts_worker_thread:
self.tts_task_queue.put(None) # 发送结束信号
self.tts_worker_thread.join(timeout=2.0)
if self.input_stream:
try:
self.input_stream.stop_stream()
self.input_stream.close()
except:
pass
if self.audio:
try:
self.audio.terminate()
except:
pass
def _start_tts_worker(self):
"""启动TTS工作线程"""
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
self.tts_worker_thread.start()
self.logger.info("TTS工作线程已启动")
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
# 从队列获取任务
task = self.tts_task_queue.get(timeout=1.0)
if task is None: # 结束信号
break
task_type, content = task
if task_type == "tts_sentence":
# 生成音频数据
self._generate_tts_audio(content)
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
time.sleep(0.1)
def _add_tts_task(self, content):
"""添加TTS任务到队列"""
try:
self.tts_task_queue.put_nowait(("tts_sentence", content))
return True
except queue.Full:
self.logger.warning("TTS任务队列已满丢弃任务")
return False
def _generate_tts_audio(self, text):
"""生成TTS音频数据"""
try:
self.logger.info(f"生成TTS音频: {text[:50]}...")
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "input_process_tts"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
self.logger.error(f"TTS请求失败: {response.status_code}")
return False
# 处理流式响应
total_audio_size = 0
chunk_count = 0
self.logger.info("开始接收TTS音频流...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 这里可以将音频数据发送到输出进程
# 或者直接处理音频数据
self.logger.debug(f"接收音频块: {audio_size} 字节")
# 减少进度显示频率
if chunk_count % 5 == 0:
progress = f"生成音频: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB"
self.logger.info(progress)
continue
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
# 处理句子信息
continue
if data.get("code", 0) == 20000000:
# 真正的结束信号
print(f"🎵 收到TTS流结束信号总共处理了 {chunk_count} 个音频块")
print(f"🎵 总音频大小: {total_audio_size} 字节 ({total_audio_size/1024:.1f} KB)")
break
if data.get("code", 0) > 0:
self.logger.error(f"TTS错误响应: {data}")
break
except json.JSONDecodeError:
continue
self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 总大小: {total_audio_size / 1024:.1f} KB")
return chunk_count > 0
finally:
response.close()
session.close()
except Exception as e:
self.logger.error(f"TTS音频生成失败: {e}")
return False
def _handle_emergency_stop(self):
"""处理紧急停止命令 - 立即停止所有录音活动"""
print("🚨 输入进程开始执行紧急停止...")
try:
# 1. 立即停止录音
print("🛑 立即停止录音...")
if self.is_recording:
self.is_recording = False
print("✅ 录音状态已重置")
# 2. 立即禁用录音功能
print("🛑 立即禁用录音功能...")
self.recording_enabled = False
print("✅ 录音功能已禁用")
# 3. 清空所有录音缓冲区
print("🛑 清空所有录音缓冲区...")
self.recording_buffer.clear()
self.pre_record_buffer.clear()
print("✅ 录音缓冲区已清空")
# 4. 立即关闭音频流(关键步骤)
print("🛑 立即关闭音频流...")
self._cleanup_audio_stream()
print("✅ 音频流已关闭")
# 5. 重置所有录音相关状态
print("🛑 重置录音相关状态...")
self.silence_start_time = None
self.consecutive_silence_count = 0
self.recording_start_time = None
print("✅ 录音状态已重置")
# 6. 等待一小段时间确保音频设备完全停止
print("⏱️ 等待音频设备完全停止...")
time.sleep(0.3)
print("🚨 输入进程紧急停止完成")
except Exception as e:
print(f"❌ 输入进程紧急停止时出错: {e}")
import traceback
traceback.print_exc()
def process_tts_request(self, text):
"""处理TTS请求的公共接口"""
return self._add_tts_task(text)
class OutputProcess:
"""输出进程 - 借鉴 recorder.py 的优秀架构"""
def __init__(self, audio_queue: mp.Queue, config: Dict[str, Any] = None, event_queue: mp.Queue = None):
print("🔊 OutputProcess __init__ 开始执行...")
self.audio_queue = audio_queue # 主进程 → 输出进程(统一音频播放队列)
self.event_queue = event_queue # 输出进程 → 主进程(事件通知)
self.config = config or self._get_default_config()
# 初始化日志记录器
self.logger = ProcessLogger("OutputProcess")
print("🔊 OutputProcess 基本初始化完成")
# 音频播放参数 - 完全借鉴 recorder.py 的优化参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 1024 # 减小缓冲区大小,提高响应性
# 播放状态管理 - 借鉴 recorder.py 的状态管理模式
self.is_playing = False
self.currently_playing = False # 当前是否正在播放(双重状态检查)
self.playback_buffer = [] # 播放缓冲区
self.total_chunks_played = 0
self.total_audio_size = 0
self.last_playback_time = 0 # 最后播放时间戳
self.playback_cooldown_period = 0.05 # 播放冷却时间(秒)- 防止回声减少到0.05秒
self.playback_completed = False # 播放完成标志
self.end_signal_received = False # 结束信号接收标志
self.end_signal_time = 0 # 结束信号接收时间
# 智能缓冲系统 - 借鉴 recorder.py 的智能句子累积策略
self.preload_buffer = [] # 预加载缓冲区(保留用于音频块)
self.preload_size = 3 # 预加载3个音频块减少预加载时间加快播放启动
# 简化的音频系统 - 直接使用预加载缓冲区
self.audio_queue_lock = None # 音频队列操作锁在run方法中初始化
# 智能句子缓冲系统 - 从 recorder.py 借鉴的核心机制
self.tts_buffer = [] # 智能句子缓冲区
self.tts_buffer_max_size = 5 # 最多缓冲5个句子增加缓冲减少卡顿
self.tts_buffer_min_size = 2 # 最少2个句子增加最小缓冲
self.tts_accumulation_time = 0.15 # 150ms积累窗口减少等待时间
self.tts_last_trigger_time = 0 # 上次触发TTS的时间
self.tts_pending_sentences = [] # 待处理的句子
self.min_buffer_size = 2 # 最小缓冲区大小(增加最小缓冲)
self.audio_device_healthy = True # 音频设备健康状态
# 统一播放工作线程 - 核心改进
self.playback_worker_running = True
self.playback_worker_thread = None
# 播放完成检测
self.last_audio_time = 0 # 最后收到音频数据的时间
self.playback_timeout = 5.0 # 播放超时时间(秒)
self.completion_sent = False # 防止重复发送完成事件
# 性能监控
self.performance_stats = {
'total_chunks_played': 0,
'total_audio_size': 0,
'avg_buffer_size': 0,
'max_buffer_size': 0,
'tts_wait_time': 0,
'playback_delay': 0
}
# 增强的播放完成检测状态
self.llm_generation_complete = False # LLM生成是否完成
self.tts_generation_complete = False # TTS生成是否完成
self.all_audio_received = False # 所有音频数据是否已接收
self.pre_buffer_empty = False # 预缓冲区是否为空
self.playback_buffer_empty = False # 播放缓冲区是否为空
self.no_active_playback = False # 是否没有活跃的播放
self.last_audio_chunk_time = 0 # 最后一个音频块开始播放的时间初始化为0表示尚未播放
# PyAudio实例
self.audio = None
self.output_stream = None
# 运行状态
self.running = True
# TTS 工作线程
self.tts_worker_running = True
self.tts_worker_thread = None
self.tts_task_queue = mp.Queue(maxsize=20) # 增加到20个任务容量
# TTS 配置
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
self.tts_app_id = "8718217928"
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.tts_resource_id = "volc.service_type.10029"
self.tts_app_key = "aGjiRDfUWi"
self.tts_speaker = config.get('tts_speaker', "zh_female_wanqudashu_moon_bigtts") if config else "zh_female_wanqudashu_moon_bigtts"
# 启动工作线程 - 先启动播放线程再启动TTS线程
print("🔊 准备启动播放工作线程...")
try:
self._start_playback_worker()
print("🔊 播放工作线程已启动准备启动TTS工作线程...")
self._start_tts_worker()
print("🔊 TTS工作线程已启动")
except Exception as e:
print(f"❌ 工作线程启动失败: {e}")
import traceback
traceback.print_exc()
def _start_playback_worker(self):
"""启动播放工作线程 - 借鉴 recorder.py 的播放线程模式"""
print("🔊 创建播放工作线程...")
self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True)
print("🔊 启动播放工作线程...")
self.playback_worker_thread.start()
print("🔊 播放工作线程已启动")
self.logger.info("播放工作线程已启动")
def _playback_worker(self):
"""播放工作线程 - 专门负责音频播放,完全借鉴 recorder.py"""
print("🔊 播放工作线程开始运行...")
# 等待音频设备就绪和初始化完成
max_wait_time = 10 # 最多等待10秒
wait_start_time = time.time()
while (self.audio is None or not self.running) and (time.time() - wait_start_time) < max_wait_time:
time.sleep(0.1)
if (time.time() - wait_start_time) % 1 < 0.1: # 每秒打印一次状态
print(f"🔊 仍在等待音频设备就绪... audio={self.audio is not None}, running={self.running}")
if self.audio is None:
print("❌ 音频设备未就绪,播放工作线程退出")
self.logger.error("音频设备未就绪,播放工作线程退出")
return
print(f"✅ 音频设备已就绪,耗时: {time.time() - wait_start_time:.1f}")
print("🔊 音频设备已就绪,开始创建播放流")
# 创建音频播放流
playback_stream = None
try:
playback_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=512,
start=True # 立即启动流
)
print("🔊 音频播放流已创建")
except Exception as e:
print(f"❌ 创建音频播放流失败: {e}")
self.logger.error(f"创建音频播放流失败: {e}")
return
chunks_played = 0
total_size = 0
try:
while self.playback_worker_running:
try:
# 从播放缓冲区获取音频数据
if self.playback_buffer:
audio_chunk = self.playback_buffer.pop(0)
if audio_chunk and len(audio_chunk) > 0:
# 检查播放冷却期
current_time = time.time()
time_since_last_play = current_time - self.last_playback_time
in_cooldown = (self.last_playback_time > 0 and
time_since_last_play < self.playback_cooldown_period)
if in_cooldown:
# 在冷却期内,跳过播放
if chunks_played == 0: # 只在第一次遇到冷却期时打印
print(f"🔊 播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)")
self.logger.debug(f"播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)")
continue
# 播放音频块
if chunks_played == 0: # 只在第一次播放时打印详细信息
print(f"🔊 开始播放音频块 {chunks_played + 1}")
print(f"🔊 添加0.5秒缓冲时间,避免破音...")
time.sleep(0.5) # 添加0.5秒缓冲时间
# 确保播放状态正确
if not self.currently_playing:
self.currently_playing = False # 先设置为False确保状态正确变化
time.sleep(0.1) # 短暂等待
self.currently_playing = True
self.last_audio_chunk_time = time.time() # 记录最后播放时间
# 新增只要有音频播放就设置all_audio_received为True
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 音频开始播放设置all_audio_received=True")
print(f"🎵 播放状态变化: currently_playing = True (开始播放)")
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 如果是第一次播放,不设置冷却期
if chunks_played == 0:
self.last_playback_time = 0 # 第一次播放不触发冷却期
else:
self.last_playback_time = current_time # 更新最后播放时间
# 播放音频(同步阻塞,直到播放完成)
playback_stream.write(audio_chunk)
chunks_played += 1
total_size += len(audio_chunk)
# 更新性能统计
self.performance_stats['total_chunks_played'] += 1
self.performance_stats['total_audio_size'] += len(audio_chunk)
# 减少进度显示频率
if chunks_played % 10 == 0 or chunks_played <= 3:
progress = f"🔊 播放工作: {chunks_played} 块 | {total_size / 1024:.1f} KB"
print(f"\r{progress}", end='', flush=True)
# 注意:不在这里重置 currently_playing 状态,保持为 True 直到真正确定播放完成
# 如果这是最后一个音频块,主动检查播放完成
if (len(self.playback_buffer) == 0 and
len(self.preload_buffer) == 0 and
self.tts_task_queue.qsize() == 0 and
not self.playback_completed): # 防止重复设置
self.playback_completed = True
# 不在这里直接调用_finish_playback让主处理循环处理
else:
# 空音频块,不改变播放状态,继续下一个
continue
else:
# 缓冲区为空短暂休眠减少CPU占用
# 只有在确定两个缓冲区都为空且没有音频播放时才设置状态为 False
if self.currently_playing:
# 检查是否真的没有音频在播放(同时检查两个缓冲区)
time_since_last_chunk = time.time() - self.last_audio_chunk_time
# 只有在两个缓冲区都为空且超过1秒没有播放新音频时才停止播放
if (len(self.playback_buffer) == 0 and
len(self.preload_buffer) == 0 and
time_since_last_chunk > 1.0):
self.currently_playing = False
print(f"🎵 播放状态变化: currently_playing = False (播放缓冲区和预加载缓冲区都为空)")
elif len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0:
# 播放缓冲区为空但预加载缓冲区有数据,自动转移数据
transfer_count = min(3, len(self.preload_buffer)) # 一次转移3个块
for _ in range(transfer_count):
if self.preload_buffer:
self.playback_buffer.append(self.preload_buffer.pop(0))
# 保持 currently_playing 为 True因为有新数据要播放
time.sleep(0.01)
continue
except Exception as e:
print(f"❌ 播放工作线程错误: {e}")
self.logger.error(f"播放工作线程错误: {e}")
# 异常情况下,只有在确定两个缓冲区都为空且音频停止播放时才重置状态
if self.currently_playing:
time_since_last_chunk = time.time() - self.last_audio_chunk_time
if (len(self.playback_buffer) == 0 and
len(self.preload_buffer) == 0 and
time_since_last_chunk > 1.5): # 异常情况下,等待更长时间且确保缓冲区为空
self.currently_playing = False
print(f"🎵 播放状态变化: currently_playing = False (异常情况处理,缓冲区为空且{time_since_last_chunk:.1f}秒无播放)")
else:
# 保持播放状态,继续处理
pass
time.sleep(0.1)
print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB")
finally:
# 线程结束时确保状态正确
if self.currently_playing:
self.currently_playing = False
print(f"🎵 播放状态变化: currently_playing = False (播放工作线程结束)")
if playback_stream:
try:
playback_stream.stop_stream()
playback_stream.close()
except:
pass
print("🔊 音频播放流已关闭")
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
'buffer_size': 1000,
'show_progress': True,
'progress_interval': 100
}
def run(self):
"""输出进程主循环 - 借鉴 recorder.py 的优雅主循环"""
self.logger.info("输出进程启动")
self._setup_audio()
# 初始化线程锁必须在run方法中初始化避免pickle错误
self.audio_queue_lock = threading.Lock()
try:
while self.running:
# 1. 处理音频队列(数据接收)
self._process_audio_queue()
# 2. 检查播放状态 - 使用增强播放完成检测
if self.end_signal_received:
if self._check_enhanced_playback_completion():
self._finish_playback()
# 3. 检查设备健康状态和冷却期 - 防止回声
current_time = time.time()
time_since_last_play = current_time - self.last_playback_time
in_cooldown = (self.last_playback_time > 0 and
time_since_last_play < self.playback_cooldown_period)
# 检查TTS和播放状态
tts_active = not self.tts_task_queue.empty()
playback_active = self.currently_playing or len(self.playback_buffer) > 0
# 如果设备不健康、正在播放、TTS生成中、或在冷却期内显示状态但继续处理播放完成检测
if not self.audio_device_healthy or tts_active or playback_active or in_cooldown:
# 显示播放状态
tts_queue_size = self.tts_task_queue.qsize()
queue_size = len(self.playback_buffer) + len(self.preload_buffer)
if not self.audio_device_healthy:
status = f"🔧 设备重置中... TTS: {tts_queue_size} 播放: {queue_size}"
elif tts_active:
status = f"🎵 TTS生成中... TTS: {tts_queue_size} 播放: {queue_size}"
elif in_cooldown:
cooldown_time = self.playback_cooldown_period - time_since_last_play
status = f"🔊 播放冷却中... {cooldown_time:.1f}s TTS: {tts_queue_size} 播放: {queue_size}"
else:
playing_status = "播放中" if self.currently_playing else "等待播放"
status = f"🔊 {playing_status}... TTS: {tts_queue_size} 播放: {queue_size}"
print(f"\r{status}", end='', flush=True)
# 关键修复:即使正在播放,也要检查播放完成
if self.end_signal_received:
# 使用增强的播放完成检测
if self._check_enhanced_playback_completion():
self._finish_playback()
time.sleep(0.1) # 播放时增加延迟减少CPU使用
continue
# 3. 主动检查预加载缓冲区,确保音频能及时播放
if (not self.is_playing and
len(self.preload_buffer) >= self.min_buffer_size and
len(self.playback_buffer) == 0):
# 立即将预加载数据移到播放缓冲区
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
print(f"🎵 主循环检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}")
# 4. 显示播放进度和性能监控
self._show_progress()
self._update_performance_stats()
# 5. 主动检查播放完成(无论什么状态都要检查)
if self.end_signal_received:
# 使用增强的播放完成检测
if self._check_enhanced_playback_completion():
self._finish_playback()
# 6. 借鉴 recorder.py: 根据播放状态调整休眠时间,优化性能
if self.is_playing and (self.playback_buffer or self.preload_buffer):
time.sleep(0.005) # 播放时极短休眠,提高响应性
else:
time.sleep(0.02) # 非播放时稍长休眠减少CPU占用
except KeyboardInterrupt:
self.logger.info("输出进程收到中断信号")
except Exception as e:
self.logger.error(f"输出进程错误: {e}")
import traceback
print(f"❌ 输出进程错误详情: {traceback.format_exc()}")
finally:
self._cleanup()
self.logger.info("输出进程退出")
def _setup_audio(self):
"""设置音频输出设备"""
try:
print("🔊 开始初始化音频设备...")
self.audio = pyaudio.PyAudio()
print(f"🔊 PyAudio实例已创建: {self.audio}")
# 主进程不需要创建输出流,由播放工作线程负责
# 这里只创建PyAudio实例供播放工作线程使用
self.output_stream = None # 标记为None表明主进程不直接使用输出流
self.logger.info("音频设备初始化成功")
print("🔊 音频设备初始化完成")
except Exception as e:
self.logger.error(f"音频设备初始化失败: {e}")
print(f"❌ 音频设备初始化失败: {e}")
raise
def _process_audio_queue(self):
"""处理来自主进程的音频数据 - 借鉴 recorder.py 的优雅队列处理"""
try:
processed_count = 0
end_signal_received = False
while self.running:
try:
# 使用较短的超时,保持响应性
audio_data = self.audio_queue.get(timeout=0.05)
processed_count += 1
# 减少日志输出频率,提高性能
if processed_count <= 3 or processed_count % 50 == 0:
print(f"📥 输出进程收到第 {processed_count} 个数据包")
if audio_data is None:
# 结束信号处理
if end_signal_received:
print(f"📥 输出进程已收到过结束信号,忽略重复信号")
continue
print(f"📥 输出进程收到结束信号")
end_signal_received = True
self.end_signal_received = True
self.end_signal_time = time.time() # 记录收到结束信号的时间
# 关键修复如果已经通过ALL_AUDIO_RECEIVED信号设置了True则不要重置为False
# 这解决了语音识别失败时all_audio_received被重置的问题
if not self.all_audio_received:
# 只有在all_audio_received为False时才设置为False
# 如果已经是True如语音识别失败处理保持True
self.all_audio_received = False
else:
print(f"🔧 保持all_audio_received=True已通过语音识别失败处理设置")
print(f"📥 收到结束信号,状态变化:")
print(f" - end_signal_received: True")
print(f" - all_audio_received: {self.all_audio_received} ({'保持True' if self.all_audio_received else '延迟设置'})")
print(f" - completion_sent: False")
print(f" - playback_completed: False")
# 重置完成事件标记
self.completion_sent = False
# 重置播放完成标志
self.playback_completed = False
print(f"📥 已重置所有播放完成相关标志")
# 使用增强的播放完成检测
if self._check_enhanced_playback_completion():
self._finish_playback()
return
else:
print(f"📥 延迟处理结束信号 - 等待LLM、TTS和播放完成")
# 重新放回队列,稍后重试
self.audio_queue.put(None)
time.sleep(0.05)
continue
if isinstance(audio_data, str) and audio_data.startswith("METADATA:"):
# 处理元数据
metadata = audio_data[9:]
print(f"📥 输出进程收到元数据: {metadata}")
continue
# 处理来自ControlSystem的流式文本命令
if isinstance(audio_data, str) and audio_data.startswith("STREAMING_TEXT:"):
# 流式文本处理 - 智能缓冲
streaming_text = audio_data[15:] # 移除 "STREAMING_TEXT:" 前缀
print(f"📥 输出进程收到流式文本: {streaming_text}")
# 检查是否需要重置状态(新的对话开始)
if self.end_signal_received:
print(f"📥 检测到新对话开始重置end_signal_received状态")
self.end_signal_received = False
self.all_audio_received = False
self.completion_sent = False
self.process_streaming_text(streaming_text)
continue
if isinstance(audio_data, str) and audio_data.startswith("GREETING_TEXT:"):
# 打招呼文本处理 - 带缓存支持
try:
greeting_parts = audio_data[14:].split(':', 2) # 移除 "GREETING_TEXT:" 前缀
greeting_text = greeting_parts[0]
character_name = greeting_parts[1] if len(greeting_parts) > 1 else "unknown"
print(f"📥 输出进程收到打招呼文本: {greeting_text} (角色: {character_name})")
# 检查是否需要重置状态(新的对话开始)
if self.end_signal_received:
print(f"📥 检测到新对话开始重置end_signal_received状态")
self.end_signal_received = False
self.all_audio_received = False
self.completion_sent = False
# 使用带缓存的TTS处理
print(f"🔊 准备处理打招呼文本当前tts_speaker: {self.tts_speaker}")
self.process_greeting_text(greeting_text, character_name)
print(f"✅ 打招呼文本处理完成")
except Exception as e:
print(f"❌ 处理打招呼文本失败: {e}")
continue
if isinstance(audio_data, str) and audio_data.startswith("COMPLETE_TEXT:"):
# 完整文本处理 - 强制刷新缓冲区
complete_text = audio_data[14:] # 移除 "COMPLETE_TEXT:" 前缀
print(f"📥 输出进程收到完整文本: {complete_text}")
# 检查是否需要重置状态(新的对话开始)
if self.end_signal_received:
print(f"📥 检测到新对话开始重置end_signal_received状态")
self.end_signal_received = False
self.all_audio_received = False
self.completion_sent = False
print(f"🔊 准备处理完整文本当前tts_speaker: {self.tts_speaker}")
self.process_complete_text(complete_text)
print(f"✅ 完整文本处理完成")
continue
if isinstance(audio_data, str) and audio_data.startswith("UPDATE_TTS_SPEAKER:"):
# 更新TTS speaker配置
new_speaker = audio_data[19:] # 移除 "UPDATE_TTS_SPEAKER:" 前缀19个字符
print(f"📥 输出进程收到TTS speaker更新: {new_speaker}")
self.tts_speaker = new_speaker
print(f"✅ TTS speaker已更新为: {self.tts_speaker}")
continue
if isinstance(audio_data, str) and audio_data.startswith("FLUSH_TTS_BUFFER:"):
# 刷新TTS缓冲区 - 确保所有内容都被处理
print(f"📥 输出进程收到刷新缓冲区命令")
self._flush_tts_buffer()
continue
if isinstance(audio_data, str) and audio_data.startswith("LLM_COMPLETE:"):
# LLM生成完成信号
print(f"📥 输出进程收到LLM生成完成信号")
self.llm_generation_complete = True
# LLM完成后如果还没有开始TTS重置TTS完成状态
if self.tts_generation_complete and self.tts_task_queue.qsize() == 0:
print(f"📥 LLM完成但TTS队列为空重置TTS完成状态为False")
self.tts_generation_complete = False
continue
if isinstance(audio_data, str) and audio_data.startswith("TTS_COMPLETE:"):
# TTS生成完成信号
print(f"📥 输出进程收到TTS生成完成信号")
self.tts_generation_complete = True
continue
if isinstance(audio_data, str) and audio_data.startswith("ALL_AUDIO_RECEIVED:"):
# 语音识别失败时设置的all_audio_received信号
print(f"📥 输出进程收到ALL_AUDIO_RECEIVED信号")
self.all_audio_received = True
print(f"🔧 设置all_audio_received=True语音识别失败处理")
continue
if isinstance(audio_data, str) and audio_data.startswith("EMERGENCY_STOP:"):
# 紧急停止命令 - 用于NFC切换时立即停止所有音频活动
print(f"🚨 输出进程收到紧急停止命令!")
self._handle_emergency_stop()
continue
# 音频数据处理
if isinstance(audio_data, bytes):
# 更新最后收到音频数据的时间
self.last_audio_time = time.time()
# 如果之前已经收到结束信号,现在又收到音频数据,重置状态
if self.all_audio_received:
print(f"📥 收到新音频数据重置all_audio_received状态为False")
self.all_audio_received = False
# 减少日志输出,提高性能
if processed_count <= 3 or processed_count % 50 == 0:
print(f"📥 输出进程收到音频数据: {len(audio_data)} 字节")
# 直接添加到预加载缓冲区
self.preload_buffer.append(audio_data)
print(f"🎵 音频数据已添加到预加载缓冲区,当前大小: {len(self.preload_buffer)}/{self.preload_size}")
# 检查是否应该开始播放或补充播放缓冲区
if not self.is_playing and len(self.preload_buffer) >= self.preload_size:
print(f"🎵 预加载缓冲区达到{self.preload_size}个块,开始首次播放...")
# 首次启动播放
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期
# 关键修复预加载完成时也设置last_audio_chunk_time
self.last_audio_chunk_time = time.time()
print(f"🎵 预加载完成时设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 备用在缓冲区转移时也设置all_audio_received为True兜底机制
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 缓冲区转移完成设置all_audio_received=True备用机制")
# 确保播放工作线程知道有数据要播放
if not self.currently_playing:
# 播放工作线程会自动检测播放缓冲区并开始播放
pass
print(f"🎵 开始播放音频(预加载完成),播放缓冲区大小: {len(self.playback_buffer)}")
elif self.is_playing and len(self.playback_buffer) < 4 and len(self.preload_buffer) > 0:
# 正在播放时保持播放缓冲区有足够的数据增加到4个块
transfer_count = min(3, len(self.preload_buffer)) # 每次转移3个块
for _ in range(transfer_count):
if self.preload_buffer:
self.playback_buffer.append(self.preload_buffer.pop(0))
elif end_signal_received and not self.is_playing and len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0:
# 关键修复:收到结束信号后,如果播放缓冲区为空但预加载缓冲区有数据,强制转移
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = 0
# 关键修复强制转移时也设置last_audio_chunk_time
self.last_audio_chunk_time = time.time()
print(f"🎵 强制转移时设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 备用强制转移后也设置all_audio_received为True兜底机制
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 强制转移预加载缓冲区后设置all_audio_received=True备用机制")
print(f"🎵 强制开始播放音频,播放缓冲区大小: {len(self.playback_buffer)}")
else:
print(f"📥 输出进程收到未知类型数据: {type(audio_data)}")
except queue.Empty:
# 队列为空时的处理
if end_signal_received:
# 使用增强的播放完成检测
if self._check_enhanced_playback_completion():
self._finish_playback()
return
else:
# 如果还有数据要播放,继续等待
tts_queue_size = self.tts_task_queue.qsize()
playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer)
if playback_queue_size > 0:
print(f"📥 还有 {playback_queue_size} 个音频块待播放,等待播放完成")
time.sleep(0.2) # 增加等待时间
elif tts_queue_size == 0 and len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0:
# 关键修复:播放缓冲区为空但预加载缓冲区还有数据,需要转移数据
print(f"📥 播放缓冲区为空但预加载缓冲区有 {len(self.preload_buffer)} 个数据块,转移数据到播放缓冲区")
transfer_count = min(len(self.preload_buffer), 3) # 一次转移最多3个块
for _ in range(transfer_count):
if self.preload_buffer:
self.playback_buffer.append(self.preload_buffer.pop(0))
print(f"📥 已转移 {transfer_count} 个数据块到播放缓冲区")
# 如果播放工作线程没有在播放,需要确保状态正确
if not self.currently_playing and len(self.playback_buffer) > 0:
print(f"📥 转移数据后确保播放状态正确播放缓冲区有数据但currently_playing=False")
# 播放工作线程会自动检测并开始播放
time.sleep(0.2) # 增加等待时间
# 检查是否应该补充播放缓冲区的数据
if not self.is_playing and len(self.preload_buffer) >= self.min_buffer_size:
# 首次启动播放(最小缓冲区模式)
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期
# 关键修复最小缓冲区模式下也设置last_audio_chunk_time
self.last_audio_chunk_time = time.time()
print(f"🎵 最小缓冲区模式下设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 备用最小缓冲区模式下也设置all_audio_received为True兜底机制
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 音频开始播放最小缓冲区模式设置all_audio_received=True备用机制")
print(f"🎵 开始播放音频(最小缓冲区满足)")
print(f"🔍 已重置last_playback_time避免立即触发冷却期")
elif self.is_playing and len(self.playback_buffer) < 2 and len(self.preload_buffer) > 0:
# 正在播放时补充数据,避免播放缓冲区耗尽
transfer_count = min(1, len(self.preload_buffer)) # 每次转移1个块
for _ in range(transfer_count):
if self.preload_buffer:
self.playback_buffer.append(self.preload_buffer.pop(0))
# 退出循环避免过度占用CPU
if processed_count > 0:
break
else:
# 关键修复:即使没有处理数据,也要检查播放完成
if self.end_signal_received:
# 使用增强的播放完成检测
if self._check_enhanced_playback_completion():
self._finish_playback()
time.sleep(0.01)
except Exception as e:
self.logger.error(f"处理音频队列时出错: {e}")
print(f"❌ 输出进程处理队列时出错: {e}")
def _play_audio(self):
"""播放音频数据 - 简化版本,主要由播放工作线程处理"""
# 现在播放逻辑主要由专门的播放工作线程处理
# 这个方法只负责一些状态检查和协调工作
# 检查是否应该将预加载缓冲区的数据移到播放缓冲区
if (not self.is_playing and
len(self.preload_buffer) >= self.min_buffer_size and
len(self.playback_buffer) == 0):
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
print(f"🎵 主线程检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}")
# 如果缓冲区为空且之前在播放,检查是否应该发送完成事件
if (self.is_playing and
len(self.playback_buffer) == 0 and
len(self.preload_buffer) == 0 and
not self.currently_playing):
# 短暂等待,确保所有音频都播放完成
time.sleep(0.1)
if len(self.playback_buffer) == 0 and len(self.preload_buffer) == 0:
self.is_playing = False
print(f"🎵 播放缓冲区已清空,准备发送完成事件")
# 注意不在这里直接发送完成事件由_process_audio_queue中的结束信号处理
def _show_progress(self):
"""显示播放进度"""
if (self.config['show_progress'] and
self.total_chunks_played > 0 and
self.total_chunks_played % self.config['progress_interval'] == 0):
progress = f"🔊 播放进度: {self.total_chunks_played} 块 | {self.total_audio_size / 1024:.1f} KB"
print(f"\r{progress}", end='', flush=True)
def _finish_playback(self):
"""完成播放 - 借鉴 recorder.py 的优雅完成机制"""
# 防止重复发送完成事件
if self.completion_sent:
print("📡 输出进程:完成事件已发送过,跳过重复发送")
return
print("📡 输出进程:开始执行播放完成逻辑")
self.is_playing = False
self.playback_buffer.clear()
self.preload_buffer.clear()
self.last_playback_time = 0
# 重置所有增强的状态标志
self.llm_generation_complete = False
self.tts_generation_complete = False
self.all_audio_received = False
self.pre_buffer_empty = False
self.playback_buffer_empty = False
self.no_active_playback = False
self.end_signal_received = False
self.playback_completed = False
self.end_signal_time = 0 # 重置结束信号时间
print("📡 输出进程:已重置所有播放完成状态标志")
# 通知主进程播放完成
if self.event_queue:
try:
# 发送播放完成事件到主进程
completion_event = ProcessEvent(
event_type='playback_complete',
metadata={
'completion_time': time.time()
}
)
self.event_queue.put(completion_event)
print("📡 输出进程:已发送播放完成事件到主进程")
self.completion_sent = True
except Exception as e:
print(f"❌ 输出进程:发送播放完成事件失败: {e}")
self.logger.error(f"发送播放完成事件失败: {e}")
else:
print("⚠️ 输出进程:未设置事件队列,无法通知主进程播放完成")
# 额外等待确保音频设备完全停止
print("📡 等待1秒确保音频完全播放完毕...")
time.sleep(1.0)
print("📡 输出进程:播放完成逻辑执行完毕")
def _check_enhanced_playback_completion(self):
"""增强的播放完成检测 - 考虑LLM、TTS和缓冲区状态"""
if not self.end_signal_received:
return False
# print(f"🔍 增强播放完成检查开始...")
# 更新状态变量
self.pre_buffer_empty = (len(self.preload_buffer) == 0)
self.playback_buffer_empty = (len(self.playback_buffer) == 0)
self.no_active_playback = (not self.currently_playing) # 同步更新状态变量
tts_queue_size = self.tts_task_queue.qsize()
# 计算时间差
current_time = time.time()
time_since_last_chunk = current_time - self.last_audio_chunk_time
# print(f"🔍 增强播放完成检查详情:")
# print(f" - end_signal_received: {self.end_signal_received}")
# print(f" - llm_generation_complete: {self.llm_generation_complete}")
# print(f" - tts_generation_complete: {self.tts_generation_complete}")
# print(f" - all_audio_received: {self.all_audio_received}")
# print(f" - tts_queue_size: {tts_queue_size}")
# print(f" - tts_buffer_size: {len(self.tts_buffer)}")
# print(f" - playback_buffer: {len(self.playback_buffer)}")
# print(f" - preload_buffer: {len(self.preload_buffer)}")
# print(f" - currently_playing: {self.currently_playing}")
# print(f" - is_playing: {self.is_playing}")
# print(f" - last_audio_chunk_time: {self.last_audio_chunk_time}")
# print(f" - time_since_last_chunk: {time_since_last_chunk:.3f}秒")
# 检查TTS是否正在生成 - 移除自动修正逻辑
# 不再自动修正TTS生成状态等待真正的TTS_COMPLETE信号
# TTS正在生成的条件队列中有任务 或 还有待处理的缓冲区内容 或 TTS生成未完成
tts_is_generating = (tts_queue_size > 0 or len(self.tts_buffer) > 0 or not self.tts_generation_complete)
# print(f" - tts_is_generating: {tts_is_generating}")
# print(f" - pre_buffer_empty: {self.pre_buffer_empty}")
# print(f" - playback_buffer_empty: {self.playback_buffer_empty}")
# print(f" - no_active_playback: {self.no_active_playback}")
# 特殊处理1如果没有任何音频数据暂时不设置all_audio_received等待TTS生成
# 注意这里不自动设置all_audio_received因为可能TTS还在生成中
# 等待播放完成检测中的超时机制来处理这种情况
# 特殊处理2如果all_audio_received为False但其他条件都满足强制设置为True
# 移除自动修正all_audio_received的逻辑等待真正的音频数据
# print(f"🔍 播放状态检查(移除自动修正机制):")
# print(f" - llm_generation_complete: {self.llm_generation_complete}")
# print(f" - tts_generation_complete: {self.tts_generation_complete}")
# print(f" - all_audio_received: {self.all_audio_received}")
# print(f" - pre_buffer_empty: {self.pre_buffer_empty}")
# print(f" - playback_buffer_empty: {self.playback_buffer_empty}")
# print(f" - no_active_playback: {self.no_active_playback}")
# print(f" - tts_queue_size == 0: {tts_queue_size == 0}")
# print(f" - not tts_is_generating: {not tts_is_generating}")
# 检查是否所有条件都满足 - 使用更新的状态变量添加TTS生成状态检查
all_conditions_met = (
self.llm_generation_complete and
self.tts_generation_complete and
self.all_audio_received and
self.pre_buffer_empty and
self.playback_buffer_empty and
self.no_active_playback and # 使用状态变量
tts_queue_size == 0 and
not tts_is_generating # 新增确保TTS不在生成中
)
# print(f"🔍 最终条件检查结果:")
# print(f" - all_conditions_met: {all_conditions_met}")
# print(f" - 各个条件详情:")
# print(f" * llm_generation_complete: {self.llm_generation_complete}")
# print(f" * tts_generation_complete: {self.tts_generation_complete}")
# print(f" * all_audio_received: {self.all_audio_received}")
# print(f" * pre_buffer_empty: {self.pre_buffer_empty}")
# print(f" * playback_buffer_empty: {self.playback_buffer_empty}")
# print(f" * no_active_playback: {self.no_active_playback}")
# print(f" * tts_queue_size == 0: {tts_queue_size == 0}")
# print(f" * not tts_is_generating: {not tts_is_generating}")
if all_conditions_met:
print(f"✅ 所有播放完成条件已满足,进行时间检查...")
print(f" - last_audio_chunk_time: {self.last_audio_chunk_time}")
print(f" - time_since_last_chunk: {time_since_last_chunk:.3f}")
print(f" - 需要等待时间: 1.0秒")
# 额外时间检查:确保音频真正播放完成
if self.last_audio_chunk_time > 0 and time_since_last_chunk > 1.0: # 至少1.0秒没有新音频播放(增加等待时间确保音频完成)
print(f"✅ 所有播放完成条件已满足,且{time_since_last_chunk:.2f}秒无新音频,可以结束播放")
return True
elif self.last_audio_chunk_time > 0 and time_since_last_chunk <= 1.0:
print(f"⏳ 所有条件满足但等待时间不足(已等待{time_since_last_chunk:.2f}需要1.0秒)...")
return False
elif self.last_audio_chunk_time == 0:
print(f"🔍 从未开始播放的情况,检查是否有音频数据...")
# 如果从未开始播放,检查是否有音频数据
if len(self.playback_buffer) == 0 and len(self.preload_buffer) == 0:
print(f"🔍 播放缓冲区和预加载缓冲区都为空")
# 检查是否有TTS任务在排队
if self.tts_task_queue.qsize() == 0 and len(self.tts_buffer) == 0:
print(f"🔍 TTS队列和缓冲区都为空")
# 检查是否刚刚收到结束信号需要给TTS一些时间生成音频
time_since_end_signal = time.time() - getattr(self, 'end_signal_time', 0)
if not hasattr(self, 'end_signal_time'):
self.end_signal_time = time.time()
print(f"🔍 距离收到结束信号: {time_since_end_signal:.2f}")
# 如果距离收到结束信号时间很短(< 5秒可能是TTS还在生成中
if time_since_end_signal < 5.0:
print(f"⏳ 刚收到结束信号等待TTS生成音频已等待{time_since_end_signal:.1f}秒)...")
return False
elif time_since_end_signal < 10.0:
print(f"⏳ 已等待{time_since_end_signal:.1f}继续等待TTS生成...")
return False
else:
# 等待超过10秒确实没有音频数据可能是TTS失败
print(f"⚠️ 已等待{time_since_end_signal:.1f}秒仍无音频数据可能TTS失败强制结束")
return True
else:
print(f"🔍 还有TTS任务待处理: tts_queue_size={tts_queue_size}, tts_buffer_size={len(self.tts_buffer)}")
# 还有TTS任务待处理
print(f"⏳ 从未开始播放但还有TTS任务待处理等待TTS生成...")
return False
else:
print(f"🔍 还有音频数据: playback_buffer={len(self.playback_buffer)}, preload_buffer={len(self.preload_buffer)}")
print(f"⏳ 从未开始播放但还有音频数据,等待播放开始...")
return False
else:
print(f"⏳ 所有条件满足但等待音频完全播放(最后播放于{time_since_last_chunk:.2f}秒前)...")
return False
# 如果LLM和TTS都完成了但还有音频数据等待播放完成
if (self.llm_generation_complete and
self.tts_generation_complete and
self.all_audio_received and
tts_queue_size == 0 and
not tts_is_generating): # 新增确保TTS不在生成中
if self.pre_buffer_empty and self.playback_buffer_empty:
if self.no_active_playback: # 使用状态变量
# 额外检查:确保最后播放的音频已经完成播放
if self.last_audio_chunk_time > 0:
time_since_last_chunk = time.time() - self.last_audio_chunk_time
if time_since_last_chunk > 0.8: # 增加到0.8秒确保音频完全播放
print(f"✅ LLM和TTS完成所有缓冲区已清空播放器空闲最后播放于{time_since_last_chunk:.1f}秒前")
return True
else:
print(f"⏳ 等待最后音频播放完成(最后播放于{time_since_last_chunk:.1f}秒前需要0.8秒)...")
return False
else:
# 从未开始播放的情况
print(f"⚠️ LLM和TTS完成缓冲区清空但从未开始播放可能播放失败")
return True
else:
print(f"⏳ 等待最后的音频播放完成currently_playing={self.currently_playing}...")
time.sleep(0.3)
# 重新更新状态
self.no_active_playback = (not self.currently_playing)
if self.no_active_playback:
if self.last_audio_chunk_time > 0:
time_since_last_chunk = time.time() - self.last_audio_chunk_time
if time_since_last_chunk > 0.5:
print(f"✅ 最后的音频播放完成(最后播放于{time_since_last_chunk:.1f}秒前)")
return True
else:
print(f"⏳ 仍在等待音频完全播放完成(最后播放于{time_since_last_chunk:.1f}秒前)...")
return False
else:
print(f"⚠️ 播放器空闲但从未开始播放,可能播放失败")
return True
else:
print(f"⏳ 播放器仍在活跃状态,继续等待...")
return False
else:
print(f"⏳ 等待缓冲区数据播放完成 - 预缓冲: {len(self.preload_buffer)}, 播放缓冲: {len(self.playback_buffer)}")
return False
# 如果LLM还未完成但其他条件满足等待LLM完成
if not self.llm_generation_complete:
print(f"⏳ 等待LLM生成完成...")
return False
# 如果TTS还未完成但LLM已完成等待TTS完成
if not self.tts_generation_complete:
return False
# 如果音频还未完全接收,等待接收完成
if not self.all_audio_received:
print(f"⏳ 等待所有音频接收完成...")
return False
return False
# 注意:简化播放完成检测已移除,统一使用增强播放完成检测 _check_enhanced_playback_completion()
def _cleanup(self):
"""清理资源"""
print("🔊 开始清理输出进程资源...")
# 停止播放工作线程
self.playback_worker_running = False
if self.playback_worker_thread:
print("🔊 等待播放工作线程退出...")
self.playback_worker_thread.join(timeout=3.0)
# 停止 TTS 工作线程
self.tts_worker_running = False
if self.tts_worker_thread:
print("🔊 等待TTS工作线程退出...")
self.tts_task_queue.put(None) # 发送结束信号
self.tts_worker_thread.join(timeout=2.0)
# 清理主进程的输出流(如果存在)
if self.output_stream:
try:
self.output_stream.stop_stream()
self.output_stream.close()
print("🔊 主进程输出流已关闭")
except Exception as e:
print(f"⚠️ 关闭主进程输出流时出错: {e}")
# 清理PyAudio实例
if self.audio:
try:
self.audio.terminate()
print("🔊 PyAudio实例已终止")
except Exception as e:
print(f"⚠️ 终止PyAudio实例时出错: {e}")
print("🔊 输出进程资源清理完成")
def _start_tts_worker(self):
"""启动TTS工作线程"""
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
self.tts_worker_thread.start()
self.logger.info("TTS工作线程已启动")
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
# 从队列获取任务
task = self.tts_task_queue.get(timeout=1.0)
if task is None: # 结束信号
break
task_type = task[0]
if task_type == "tts_sentence":
# 生成音频数据并发送到播放队列
if len(task) == 3:
# 带角色名称的TTS任务
_, text, character_name = task
self._generate_tts_audio(text, character_name)
else:
# 普通TTS任务
_, content = task
self._generate_tts_audio(content)
elif task_type == "cached_audio":
# 处理缓存音频
_, text, audio_data, character_name = task
self._process_cached_audio(text, audio_data, character_name)
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
time.sleep(0.1)
def _handle_emergency_stop(self):
"""处理紧急停止命令 - 立即停止所有音频活动"""
print("🚨 输出进程开始执行紧急停止...")
try:
# 1. 立即停止音频播放
print("🛑 立即停止音频播放...")
if self.currently_playing:
self.currently_playing = False
print("✅ 播放状态已重置")
if self.is_playing:
self.is_playing = False
print("✅ 播放状态标志已重置")
# 2. 清空所有音频缓冲区
print("🛑 清空所有音频缓冲区...")
self.playback_buffer.clear()
self.preload_buffer.clear()
self.tts_buffer.clear()
print("✅ 音频缓冲区已清空")
# 3. 停止所有TTS生成
print("🛑 停止所有TTS生成...")
self.tts_generation_complete = True
self.llm_generation_complete = True
self.all_audio_received = True
print("✅ TTS生成状态已重置")
# 4. 重置所有播放完成检测状态
print("🛑 重置播放完成检测状态...")
self.end_signal_received = False
self.completion_sent = False
self.playback_completed = False
self.end_signal_time = 0
self.last_audio_chunk_time = 0
self.last_audio_time = 0
print("✅ 播放完成检测状态已重置")
# 5. 重置首次播放状态
print("🛑 重置首次播放状态...")
if hasattr(self, '_first_playback_started'):
self._first_playback_started = False
if hasattr(self, '_first_text_time'):
delattr(self, '_first_text_time')
print("✅ 首次播放状态已重置")
# 6. 清空TTS任务队列
print("🛑 清空TTS任务队列...")
try:
while not self.tts_task_queue.empty():
try:
self.tts_task_queue.get_nowait()
except queue.Empty:
break
print("✅ TTS任务队列已清空")
except Exception as e:
print(f"⚠️ 清空TTS队列时出错: {e}")
# 7. 重置播放冷却期
print("🛑 重置播放冷却期...")
self.last_playback_time = 0
print("✅ 播放冷却期已重置")
# 8. 等待一小段时间确保所有音频停止
print("⏱️ 等待音频完全停止...")
time.sleep(0.3)
print("🚨 输出进程紧急停止完成")
except Exception as e:
print(f"❌ 输出进程紧急停止时出错: {e}")
import traceback
traceback.print_exc()
def _add_tts_task(self, content, character_name=None):
"""添加TTS任务到队列"""
try:
# 如果指定了角色名称,先检查缓存
if character_name:
cached_audio = load_cached_audio(character_name)
if cached_audio:
print(f"🎵 使用缓存音频,角色: {character_name}")
self.tts_task_queue.put_nowait(("cached_audio", content, cached_audio, character_name))
return True
else:
print(f"🎵 无缓存音频正常生成TTS角色: {character_name}")
self.tts_task_queue.put_nowait(("tts_sentence", content, character_name))
return True
else:
# 正常TTS任务
self.tts_task_queue.put_nowait(("tts_sentence", content))
return True
except queue.Full:
self.logger.warning("TTS任务队列已满丢弃任务")
return False
def _process_cached_audio(self, text, audio_data, character_name):
"""处理缓存音频数据 - 直接发送到播放队列"""
try:
print(f"🎵 处理缓存音频: {text[:30]}... (角色: {character_name})")
print(f"🎵 缓存音频大小: {len(audio_data)} 字节")
# 重置播放状态与TTS生成保持一致
self.completion_sent = False
self._last_audio_size = 0
self.tts_generation_complete = False
# 将缓存音频数据直接添加到预加载缓冲区
print(f"🎵 将缓存音频添加到预加载缓冲区")
self.preload_buffer.append(audio_data)
# 立即设置TTS生成完成状态因为缓存音频相当于已经生成完成
self.tts_generation_complete = True
# 重要修复不在这里立即设置all_audio_received=True
# 等待音频实际开始播放时再设置,避免时序问题
print(f"🎵 缓存音频已准备好等待播放开始时设置all_audio_received")
# 检查是否应该开始播放(关键修复)
if (not self.is_playing and len(self.preload_buffer) >= self.preload_size):
print(f"🎵 缓存音频预加载完成,开始播放...")
# 将预加载的数据移到播放缓冲区
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = 0 # 重置播放时间,避免冷却期
# 关键修复在音频开始播放时设置last_audio_chunk_time
self.last_audio_chunk_time = time.time()
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 现在可以设置all_audio_received为True
self.all_audio_received = True
print(f"🎵 缓存音频开始播放设置all_audio_received=True")
self.logger.info("开始播放缓存音频(预加载完成)")
elif (not self.is_playing and len(self.preload_buffer) > 0):
print(f"🎵 缓存音频已添加但未达到预加载大小,强制开始播放...")
# 对于缓存音频,直接开始播放
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = 0 # 重置播放时间,避免冷却期
# 关键修复在音频开始播放时设置last_audio_chunk_time
self.last_audio_chunk_time = time.time()
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 现在可以设置all_audio_received为True
self.all_audio_received = True
print(f"🎵 缓存音频强制开始播放设置all_audio_received=True")
self.logger.info("强制开始播放缓存音频")
# 发送TTS完成信号到主队列
try:
tts_complete_command = "TTS_COMPLETE:"
self.audio_queue.put(tts_complete_command)
print(f"🎵 缓存音频已发送TTS完成信号到主队列")
except Exception as e:
print(f"❌ 发送TTS完成信号失败: {e}")
print(f"✅ 缓存音频处理完成")
except Exception as e:
self.logger.error(f"缓存音频处理失败: {e}")
# 即使失败也要设置TTS完成状态避免系统卡住
self.tts_generation_complete = True
# 失败时也要设置all_audio_received避免死锁
self.all_audio_received = True
print(f"🔧 缓存音频处理失败设置完成状态为True")
def _generate_tts_audio(self, text, character_name=None):
"""生成TTS音频数据并发送到统一播放队列 - 借鉴 recorder.py 的流式处理"""
try:
print(f"🎵 OutputProcess开始生成TTS音频: {text[:50]}...")
print(f"🎵 文本总长度: {len(text)} 字符")
# 简化:直接使用预加载缓冲区
print(f"🎵 开始生成TTS音频将直接添加到预加载缓冲区")
# 重置播放状态
self.completion_sent = False
self._last_audio_size = 0
self.tts_generation_complete = False
# 用于收集音频数据以保存缓存
all_audio_data = []
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "output_process_tts"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
self.logger.error(f"TTS请求失败: {response.status_code}")
return False
print(f"🎵 TTS请求成功状态码: {response.status_code}")
# 处理流式响应 - 借鉴 recorder.py 的优化策略
total_audio_size = 0
chunk_count = 0
success_count = 0
print(f"🎵 开始接收TTS音频流...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 检查音频数据是否异常小
if audio_size < 100:
print(f"⚠️ 警告:音频块 {chunk_count} 大小异常小: {audio_size} 字节")
# 检查是否连续收到小音频块(但不警告最后一个块,通常较小)
if hasattr(self, '_last_audio_size') and self._last_audio_size > 0 and chunk_count < 15: # 不检查最后几个块
if audio_size < self._last_audio_size * 0.5:
print(f"⚠️ 警告:音频块 {chunk_count} 大小突然减小: {self._last_audio_size} -> {audio_size}")
self._last_audio_size = audio_size
# 简化:直接添加到预加载缓冲区
try:
# 添加到预加载缓冲区
self.preload_buffer.append(chunk_audio)
# 收集音频数据用于缓存
all_audio_data.append(chunk_audio)
success_count += 1
# 检查是否应该开始播放
if (not self.is_playing and
len(self.preload_buffer) >= self.preload_size):
# 将预加载的数据移到播放缓冲区
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
self.logger.info("开始播放TTS音频预加载完成")
# 减少进度显示频率
if chunk_count % 50 == 0: # 进一步减少显示频率
progress = (f"🎵 TTS生成中: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB")
print(f"\r{progress}", end='', flush=True)
except Exception as e:
self.logger.error(f"添加音频到预加载缓冲区失败: {e}")
continue
continue
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:
# 处理句子信息
continue
if data.get("code", 0) == 20000000:
# 真正的结束信号
print(f"🎵 收到TTS流结束信号总共处理了 {chunk_count} 个音频块")
print(f"🎵 总音频大小: {total_audio_size} 字节 ({total_audio_size/1024:.1f} KB)")
break
if data.get("code", 0) > 0:
self.logger.error(f"TTS错误响应: {data}")
break
except json.JSONDecodeError:
continue
# 处理剩余的预加载数据
if self.preload_buffer:
print(f"🔊 将剩余的 {len(self.preload_buffer)} 个音频块转移到播放缓冲区")
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
if not self.is_playing:
self.is_playing = True
self.last_playback_time = time.time()
self.logger.info("开始播放TTS音频处理剩余预加载")
else:
print(f"⚠️ TTS生成完成但预加载缓冲区为空")
success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0
self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB")
# 保存音频到缓存(如果指定了角色名称且有音频数据)
if character_name and all_audio_data and success_count > 0:
try:
# 合并所有音频数据
complete_audio = b''.join(all_audio_data)
if save_greeting_cache(character_name, complete_audio):
print(f"✅ TTS音频已保存到缓存角色: {character_name}")
else:
print(f"⚠️ TTS音频缓存保存失败角色: {character_name}")
except Exception as e:
print(f"❌ 保存TTS音频缓存时出错: {e}")
# 通知自己TTS生成已完成
self.tts_generation_complete = True
print(f"🎵 OutputProcess TTS生成已完成")
# 关键修复如果TTS生成完成但没有生成任何音频数据设置all_audio_received为True
# 这解决了语音转文字失败时的死锁问题
if success_count == 0:
print(f"🔧 TTS生成完成但没有音频数据设置all_audio_received=True以避免死锁")
self.all_audio_received = True
# 发送TTS完成信号到主队列
try:
tts_complete_command = "TTS_COMPLETE:"
self.audio_queue.put(tts_complete_command)
print(f"🎵 已发送TTS完成信号到主队列")
except Exception as e:
print(f"❌ 发送TTS完成信号失败: {e}")
# 简化:直接使用统一播放完成检测机制
self.logger.info("TTS生成完成等待统一播放完成检测机制处理...")
return success_count > 0
finally:
response.close()
session.close()
except Exception as e:
self.logger.error(f"TTS音频生成失败: {e}")
# 即使失败也要设置TTS完成状态避免系统卡住
self.tts_generation_complete = True
# 关键修复TTS生成失败时也要设置all_audio_received为True以避免死锁
print(f"🔧 TTS生成失败设置all_audio_received=True以避免死锁")
self.all_audio_received = True
# 发送TTS完成信号到主队列
try:
tts_complete_command = "TTS_COMPLETE:"
self.audio_queue.put(tts_complete_command)
print(f"🎵 TTS生成失败但已发送TTS完成信号到主队列")
except Exception as send_error:
print(f"❌ 发送TTS完成信号失败: {send_error}")
return False
# ========== 智能句子缓冲系统 - 从 recorder.py 借鉴 ==========
def _should_trigger_tts(self, sentence):
"""智能判断是否应该触发TTS - 确保首句足够长且积累足够内容"""
current_time = time.time()
# 修改策略允许合理的TTS并行生成但控制队列长度
tts_queue_size = self.tts_task_queue.qsize()
if tts_queue_size >= 3: # 允许最多3个TTS任务在队列中
print(f"🎵 TTS队列达到上限: 当前队列大小={tts_queue_size}, 暂时跳过")
return False
# 首次播放的特殊处理:确保有足够内容
if not hasattr(self, '_first_playback_started') or not self._first_playback_started:
total_buffered_text = ''.join(self.tts_buffer) + sentence
# 首次播放必须满足以下条件之一:
# 1. 总文本长度超过40字符且至少有2个句子
if len(total_buffered_text) >= 40 and len(self.tts_buffer) >= 1:
print(f"🎵 首次播放触发:总长度{len(total_buffered_text)}字符,{len(self.tts_buffer)+1}个句子")
self._first_playback_started = True
return True
# 2. 有1个完整长句子超过25字符
elif len(sentence) >= 25 and self._is_complete_sentence(sentence) and len(self.tts_buffer) >= 1:
print(f"🎵 首次播放触发:长句子{len(sentence)}字符+缓冲内容")
self._first_playback_started = True
return True
# 3. 缓冲区达到最大值
elif len(self.tts_buffer) >= self.tts_buffer_max_size:
print(f"🎵 首次播放触发:缓冲区达到最大值{len(self.tts_buffer)}")
self._first_playback_started = True
return True
# 4. 超过5秒还没触发防止无限等待
elif hasattr(self, '_first_text_time') and (current_time - self._first_text_time) > 5.0:
print(f"🎵 首次播放触发超时5秒当前长度{len(total_buffered_text)}字符")
self._first_playback_started = True
return True
else:
# 首次播放前记录第一个文本的时间
if not hasattr(self, '_first_text_time'):
self._first_text_time = current_time
return False
# 非首次播放的正常逻辑
# 再次检查TTS队列状态防止在首次播放检查后队列又有变化
tts_queue_size = self.tts_task_queue.qsize()
if tts_queue_size >= 3: # 保持一致的队列限制
print(f"🎵 TTS队列达到上限(非首次): 当前队列大小={tts_queue_size}, 暂时跳过")
return False
# 检查缓冲区大小
if len(self.tts_buffer) >= self.tts_buffer_max_size:
return True
# 检查时间窗口
time_since_last = current_time - self.tts_last_trigger_time
if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size:
return True
# 检查是否为完整句子(使用严格的检测)
if self._is_complete_sentence(sentence):
return True
# 检查句子特征 - 优化降低长句子触发阈值30字符以上
if len(sentence) > 30: # 超过30字符的句子立即触发
return True
# 中等长度句子20-30字符如果有结束标点也触发
if len(sentence) > 20:
end_punctuations = ['', '', '', '.', '!', '?']
if any(sentence.strip().endswith(p) for p in end_punctuations):
return True
# 优化:即使短句子,如果缓冲区有内容且时间过半也触发
if len(self.tts_buffer) > 0 and time_since_last >= (self.tts_accumulation_time * 0.7):
return True
# 短句子只在缓冲区较多或时间窗口到期时触发
return False
def _process_tts_buffer(self):
"""处理TTS缓冲区 - 发送累积的句子到TTS"""
if not self.tts_buffer:
return
# 合并缓冲区的句子
combined_text = ''.join(self.tts_buffer)
print(f"🔊 合并后的文本: '{combined_text}' (长度: {len(combined_text)})")
# 重置TTS生成完成状态 - 关键修复
self.tts_generation_complete = False
# 添加到TTS任务队列
if self._add_tts_task(combined_text):
print(f"🎵 触发TTS: {combined_text[:50]}...")
self.tts_last_trigger_time = time.time()
else:
print(f"❌ 添加TTS任务失败")
# 清空缓冲区
self.tts_buffer.clear()
def _add_sentence_to_buffer(self, sentence):
"""添加句子到智能缓冲区 - 核心方法"""
if not sentence.strip():
print(f"🔊 句子为空,不添加到缓冲区")
return
self.tts_buffer.append(sentence)
# 检查是否应该触发TTS
should_trigger = self._should_trigger_tts(sentence)
if should_trigger:
self._process_tts_buffer()
def _flush_tts_buffer(self):
"""强制刷新TTS缓冲区 - 确保所有内容都被处理"""
if self.tts_buffer:
self._process_tts_buffer()
def _is_complete_sentence(self, text):
"""检测是否为完整句子 - 从 recorder.py 完全移植"""
import re
if not text or len(text.strip()) == 0:
return False
# 增加最小长度要求 - 至少10个字符才考虑作为完整句子
if len(text.strip()) < 10:
return False
# 句子结束标点符号
sentence_endings = r'[。!?.!?]'
# 检查是否以句子结束符结尾
if re.search(sentence_endings + r'\s*$', text):
# 对于以结束符结尾的句子要求至少15个字符
if len(text.strip()) >= 15:
return True
# 检查是否包含句子结束符(可能在句子中间)
if re.search(sentence_endings, text):
# 如果后面有其他标点或字符,可能不是完整句子
remaining_text = re.split(sentence_endings, text, 1)[-1]
if len(remaining_text.strip()) > 0:
return False
# 对于包含结束符的句子要求至少20个字符
if len(text.strip()) >= 20:
return True
# 对于较长的文本超过50字符即使没有结束符也可以考虑
if len(text.strip()) >= 50:
return True
# 对于中等长度的文本,如果包含常见完整句式模式
if len(text.strip()) >= 20:
common_patterns = [
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得
]
for pattern in common_patterns:
if re.match(pattern, text):
return True
return False
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
# 移除方括号内容:【内容】
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
# 移除方括号内容:[content]
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
# 移除书名号内容:「内容」
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
# 清理多余空格
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
return filtered_text
def process_streaming_text(self, text):
"""处理流式文本 - 新增的公共接口用于与LLM流式输出集成"""
print(f"🔊 收到流式文本: '{text}' (长度: {len(text)})")
if not text.strip():
print(f"🔊 流式文本为空,跳过处理")
return
# 过滤括号内容
filtered_text = self._filter_parentheses_content(text.strip())
print(f"🔊 过滤后的文本: '{filtered_text}' (长度: {len(filtered_text)})")
if filtered_text:
# 检查是否是第一次收到流式文本(即所有状态都还是完成状态)
if (self.llm_generation_complete and
self.tts_generation_complete and
self.all_audio_received):
print(f"🔊 首次收到流式文本,重置所有播放完成检测状态")
self.llm_generation_complete = False
self.tts_generation_complete = False
self.all_audio_received = False
self.end_signal_received = False
self.completion_sent = False
# 重置首次播放状态
self._first_playback_started = False
if hasattr(self, '_first_text_time'):
delattr(self, '_first_text_time')
print(f"🔊 已重置首次播放状态,新的对话将重新积累内容")
# 使用智能句子缓冲系统
print(f"🔊 添加文本到智能缓冲区")
self._add_sentence_to_buffer(filtered_text)
else:
print(f"🔊 过滤后文本为空,不添加到缓冲区")
def process_complete_text(self, text):
"""处理完整文本 - 强制刷新缓冲区"""
print(f"🔊 process_complete_text 被调用,输入文本: {text}")
if not text.strip():
print("🔊 文本为空,返回")
return
# 过滤括号内容
filtered_text = self._filter_parentheses_content(text.strip())
print(f"🔊 过滤后文本: {filtered_text}")
if filtered_text:
# 重置所有播放完成检测状态 - 开始新的对话
self.llm_generation_complete = False
self.tts_generation_complete = False
self.all_audio_received = False
self.end_signal_received = False
self.completion_sent = False
# 重置首次播放状态
self._first_playback_started = False
if hasattr(self, '_first_text_time'):
delattr(self, '_first_text_time')
print(f"🔊 处理完整文本:已重置所有播放完成检测状态")
# 直接添加到缓冲区并强制处理
print(f"🔊 添加文本到TTS缓冲区: {filtered_text}")
self.tts_buffer.append(filtered_text)
print(f"🔊 当前TTS缓冲区大小: {len(self.tts_buffer)}")
print(f"🔊 开始处理TTS缓冲区...")
self._process_tts_buffer()
print(f"🔊 TTS缓冲区处理完成")
else:
print("🔊 过滤后文本为空,不添加到缓冲区")
def process_greeting_text(self, text, character_name):
"""处理打招呼文本 - 带缓存支持"""
print(f"🔊 process_greeting_text 被调用,输入文本: {text} (角色: {character_name})")
if not text.strip():
print("🔊 文本为空,返回")
return
# 过滤括号内容
filtered_text = self._filter_parentheses_content(text.strip())
print(f"🔊 过滤后文本: {filtered_text}")
if filtered_text:
# 重置所有播放完成检测状态 - 开始新的对话
self.llm_generation_complete = False
self.tts_generation_complete = False
self.all_audio_received = False
self.end_signal_received = False
self.completion_sent = False
# 重置首次播放状态
self._first_playback_started = False
if hasattr(self, '_first_text_time'):
delattr(self, '_first_text_time')
print(f"🔊 处理打招呼文本:已重置所有播放完成检测状态")
# 添加到TTS缓冲区并传递角色名称用于缓存
print(f"🔊 添加打招呼文本到TTS缓冲区: {filtered_text} (角色: {character_name})")
self.tts_buffer.append(filtered_text)
print(f"🔊 当前TTS缓冲区大小: {len(self.tts_buffer)}")
print(f"🔊 开始处理TTS缓冲区带缓存支持...")
# 修改_process_tts_buffer调用传递角色名称
self._process_tts_buffer_with_cache(character_name)
print(f"🔊 TTS缓冲区处理完成")
else:
print("🔊 过滤后文本为空,不添加到缓冲区")
def _process_tts_buffer_with_cache(self, character_name):
"""处理TTS缓冲区 - 发送累积的句子到TTS带缓存支持"""
if not self.tts_buffer:
return
# 合并缓冲区的句子
combined_text = ''.join(self.tts_buffer)
print(f"🔊 合并后的文本: '{combined_text}' (长度: {len(combined_text)})")
# 重置TTS生成完成状态 - 关键修复
self.tts_generation_complete = False
# 添加到TTS任务队列带角色名称用于缓存
if self._add_tts_task(combined_text, character_name):
print(f"🎵 触发TTS: {combined_text[:50]}... (角色: {character_name})")
self.tts_last_trigger_time = time.time()
else:
print(f"❌ 添加TTS任务失败")
# 清空缓冲区
self.tts_buffer.clear()
# ========== 原有方法保持不变 ==========
def _update_performance_stats(self):
"""更新性能统计信息"""
current_buffer_size = len(self.playback_buffer) + len(self.preload_buffer)
self.performance_stats['avg_buffer_size'] = (
self.performance_stats['avg_buffer_size'] * 0.9 + current_buffer_size * 0.1
)
if current_buffer_size > self.performance_stats['max_buffer_size']:
self.performance_stats['max_buffer_size'] = current_buffer_size
# 每5秒打印一次性能统计
if hasattr(self, '_last_stats_time'):
if time.time() - self._last_stats_time >= 5.0:
self._print_performance_stats()
self._last_stats_time = time.time()
else:
self._last_stats_time = time.time()
def _print_performance_stats(self):
"""打印性能统计信息"""
stats = self.performance_stats
print(f"📊 性能统计: 平均缓冲={stats['avg_buffer_size']:.1f}, "
f"最大缓冲={stats['max_buffer_size']}, 已播放={stats['total_chunks_played']}块, "
f"音频大小={stats['total_audio_size']/1024:.1f}KB")
def process_tts_request(self, text):
"""处理TTS请求的公共接口 - 兼容原有接口"""
# 使用新的智能缓冲系统
self.process_complete_text(text)
return True
if __name__ == "__main__":
# 测试代码
print("音频进程模块测试")
print("这个模块应该在多进程环境中运行")