Local-Voice/audio_processes.py
2025-09-21 03:00:11 +08:00

1186 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程音频处理模块
定义输入进程和输出进程的类
使用增强版语音检测器
"""
import multiprocessing as mp
import queue
import time
import threading
import numpy as np
import pyaudio
from enum import Enum
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
import json
import wave
import os
from enhanced_voice_detector import EnhancedVoiceDetector
from process_logger import ProcessLogger
import requests
import base64
import gzip
class RecordingState(Enum):
"""录音状态枚举"""
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
PLAYING = "playing"
@dataclass
class AudioSegment:
"""音频片段数据结构"""
audio_data: bytes
start_time: float
end_time: float
duration: float
metadata: Dict[str, Any] = None
@dataclass
class ControlCommand:
"""控制命令数据结构"""
command: str
parameters: Dict[str, Any] = None
@dataclass
class ProcessEvent:
"""进程事件数据结构"""
event_type: str
data: Optional[bytes] = None
metadata: Dict[str, Any] = None
class InputProcess:
"""输入进程 - 专门负责录音和语音检测"""
def __init__(self, command_queue: mp.Queue, event_queue: mp.Queue, config: Dict[str, Any] = None):
self.command_queue = command_queue # 主进程 → 输入进程
self.event_queue = event_queue # 输入进程 → 主进程
# 配置参数
self.config = config or self._get_default_config()
# 初始化日志记录器
self.logger = ProcessLogger("InputProcess")
# 音频参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 1024
# 状态控制
self.recording_enabled = True # 是否允许录音
self.is_recording = False # 是否正在录音
self.recording_buffer = [] # 录音缓冲区
self.pre_record_buffer = [] # 预录音缓冲区
self.silence_start_time = None
self.recording_start_time = None
# 预录音参数
self.pre_record_duration = 2.0
self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE)
# 增强语音检测器
self.voice_detector = EnhancedVoiceDetector(
sample_rate=self.RATE,
chunk_size=self.CHUNK_SIZE
)
# 静音检测参数
self.consecutive_silence_count = 0
self.silence_threshold_count = 30 # 约3秒
# PyAudio实例
self.audio = None
self.input_stream = None
# 运行状态
self.running = True
# TTS 工作线程
self.tts_worker_running = True
self.tts_worker_thread = None
self.tts_task_queue = mp.Queue(maxsize=10)
# TTS 配置
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
self.tts_app_id = "8718217928"
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.tts_resource_id = "volc.service_type.10029"
self.tts_app_key = "aGjiRDfUWi"
self.tts_speaker = "zh_female_wanqudashu_moon_bigtts"
# 启动 TTS 工作线程
self._start_tts_worker()
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
'zcr_min': 2400, # 适应16kHz采样率的ZCR最小值
'zcr_max': 12000, # 适应16kHz采样率的ZCR最大值
'min_recording_time': 2.0, # 最小录音时间
'max_recording_time': 30.0,
'silence_threshold': 3.0,
'pre_record_duration': 2.0
}
def run(self):
"""输入进程主循环"""
self.logger.info("输入进程启动")
self._setup_audio()
try:
while self.running:
# 1. 检查主进程命令
self._check_commands()
# 2. 如果允许录音,处理音频
if self.recording_enabled:
self._process_audio()
# 3. 短暂休眠减少CPU占用
time.sleep(0.01)
except KeyboardInterrupt:
self.logger.info("输入进程收到中断信号")
except Exception as e:
self.logger.error(f"输入进程错误: {e}")
finally:
self._cleanup()
self.logger.info("输入进程退出")
def _setup_audio(self):
"""设置音频输入设备"""
try:
self.audio = pyaudio.PyAudio()
self.input_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
self.logger.info("音频设备初始化成功")
except Exception as e:
self.logger.error(f"音频设备初始化失败: {e}")
raise
def _check_commands(self):
"""检查主进程控制命令"""
try:
while True:
command = self.command_queue.get_nowait()
if command.command == 'enable_recording':
self.recording_enabled = True
self.logger.info("录音功能已启用")
elif command.command == 'disable_recording':
self.recording_enabled = False
# 如果正在录音,立即停止并发送数据
if self.is_recording:
self._stop_recording()
self.logger.info("录音功能已禁用")
elif command.command == 'shutdown':
self.logger.info("收到关闭命令")
self.running = False
return
except queue.Empty:
pass
def _process_audio(self):
"""处理音频数据"""
try:
data = self.input_stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
return
# 更新预录音缓冲区
self._update_pre_record_buffer(data)
# 使用增强语音检测器
detection_result = self.voice_detector.is_voice_advanced(data)
is_voice = detection_result['is_voice']
if self.is_recording:
# 录音模式
self.recording_buffer.append(data)
# 静音检测
if is_voice:
self.silence_start_time = None
self.consecutive_silence_count = 0
else:
self.consecutive_silence_count += 1
if self.silence_start_time is None:
self.silence_start_time = time.time()
# 检查是否应该停止录音
recording_duration = time.time() - self.recording_start_time
should_stop = False
# 静音检测
if (self.consecutive_silence_count >= self.silence_threshold_count and
recording_duration >= self.config['min_recording_time']):
should_stop = True
print(f"🎙️ 输入进程:静音检测触发停止录音")
# 最大时间检测
if recording_duration >= self.config['max_recording_time']:
should_stop = True
print(f"🎙️ 输入进程:达到最大录音时间")
if should_stop:
self._stop_recording()
else:
# 显示录音状态
confidence = detection_result.get('confidence', 0.0)
energy = detection_result.get('energy', 0.0)
zcr = detection_result.get('zcr', 0.0)
status = (f"\r🎙️ 录音中... {recording_duration:.1f}s | "
f"能量: {energy:.0f} | ZCR: {zcr:.0f} | "
f"语音: {is_voice} | 置信度: {confidence:.2f}")
print(status, end='', flush=True)
else:
# 监听模式
if is_voice:
# 检测到语音,开始录音
self._start_recording()
else:
# 显示监听状态
if detection_result['calibrating']:
progress = detection_result['calibration_progress'] * 100
status = f"\r🎙️ 校准中... {progress:.0f}%"
else:
confidence = detection_result.get('confidence', 0.0)
energy = detection_result.get('energy', 0.0)
zcr = detection_result.get('zcr', 0.0)
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = (f"\r🎙️ 监听中... 能量: {energy:.0f} | ZCR: {zcr:.0f} | "
f"语音: {is_voice} | 置信度: {confidence:.2f} | "
f"缓冲: {buffer_usage:.0f}%")
print(status, end='', flush=True)
except Exception as e:
print(f"🎙️ 输入进程音频处理错误: {e}")
def _update_pre_record_buffer(self, audio_data: bytes):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
# 保持缓冲区大小
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def _start_recording(self):
"""开始录音"""
if not self.recording_enabled:
return
self.is_recording = True
self.recording_buffer = []
self.recording_start_time = time.time()
self.silence_start_time = None
self.consecutive_silence_count = 0
# 将预录音缓冲区的内容添加到录音中
self.recording_buffer.extend(self.pre_record_buffer)
self.pre_record_buffer.clear()
print(f"🎙️ 输入进程:开始录音(包含预录音 {self.config['pre_record_duration']}秒)")
def _stop_recording(self):
"""停止录音并发送数据"""
if not self.is_recording:
return
self.is_recording = False
# 合并录音数据
if self.recording_buffer:
audio_data = b''.join(self.recording_buffer)
duration = len(audio_data) / (self.RATE * 2)
# 创建音频片段
segment = AudioSegment(
audio_data=audio_data,
start_time=self.recording_start_time,
end_time=time.time(),
duration=duration,
metadata={
'sample_rate': self.RATE,
'channels': self.CHANNELS,
'format': self.FORMAT,
'chunk_size': self.CHUNK_SIZE
}
)
# 保存录音文件(可选)
filename = self._save_recording(audio_data)
# 发送给主进程
self.event_queue.put(ProcessEvent(
event_type='recording_complete',
data=audio_data,
metadata={
'duration': duration,
'start_time': self.recording_start_time,
'filename': filename
}
))
print(f"📝 输入进程:录音完成,时长 {duration:.2f}")
# 清空缓冲区
self.recording_buffer = []
self.pre_record_buffer = []
def _save_recording(self, audio_data: bytes) -> str:
"""保存录音文件"""
try:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"💾 输入进程:录音已保存到 {filename}")
return filename
except Exception as e:
print(f"❌ 输入进程保存录音失败: {e}")
return None
def _cleanup(self):
"""清理资源"""
# 停止 TTS 工作线程
self.tts_worker_running = False
if self.tts_worker_thread:
self.tts_task_queue.put(None) # 发送结束信号
self.tts_worker_thread.join(timeout=2.0)
if self.input_stream:
try:
self.input_stream.stop_stream()
self.input_stream.close()
except:
pass
if self.audio:
try:
self.audio.terminate()
except:
pass
def _start_tts_worker(self):
"""启动TTS工作线程"""
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
self.tts_worker_thread.start()
self.logger.info("TTS工作线程已启动")
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
# 从队列获取任务
task = self.tts_task_queue.get(timeout=1.0)
if task is None: # 结束信号
break
task_type, content = task
if task_type == "tts_sentence":
# 生成音频数据
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
time.sleep(0.1)
def _add_tts_task(self, content):
"""添加TTS任务到队列"""
try:
self.tts_task_queue.put_nowait(("tts_sentence", content))
return True
except queue.Full:
self.logger.warning("TTS任务队列已满丢弃任务")
return False
def _generate_tts_audio(self, text):
"""生成TTS音频数据"""
try:
self.logger.info(f"生成TTS音频: {text[:50]}...")
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "input_process_tts"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
self.logger.error(f"TTS请求失败: {response.status_code}")
return False
# 处理流式响应
total_audio_size = 0
chunk_count = 0
self.logger.info("开始接收TTS音频流...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 这里可以将音频数据发送到输出进程
# 或者直接处理音频数据
self.logger.debug(f"接收音频块: {audio_size} 字节")
# 减少进度显示频率
if chunk_count % 5 == 0:
progress = f"生成音频: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB"
self.logger.info(progress)
continue
if data.get("code", 0) == 20000000:
break
if data.get("code", 0) > 0:
self.logger.error(f"TTS错误响应: {data}")
break
except json.JSONDecodeError:
continue
self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 总大小: {total_audio_size / 1024:.1f} KB")
return chunk_count > 0
finally:
response.close()
session.close()
except Exception as e:
self.logger.error(f"TTS音频生成失败: {e}")
return False
def process_tts_request(self, text):
"""处理TTS请求的公共接口"""
return self._add_tts_task(text)
class OutputProcess:
"""输出进程 - 借鉴 recorder.py 的优秀架构"""
def __init__(self, audio_queue: mp.Queue, config: Dict[str, Any] = None, event_queue: mp.Queue = None):
self.audio_queue = audio_queue # 主进程 → 输出进程(统一音频播放队列)
self.event_queue = event_queue # 输出进程 → 主进程(事件通知)
self.config = config or self._get_default_config()
# 初始化日志记录器
self.logger = ProcessLogger("OutputProcess")
# 音频播放参数 - 完全借鉴 recorder.py 的优化参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 2048 # 增加缓冲区大小,减少卡顿
# 播放状态管理 - 借鉴 recorder.py 的状态管理模式
self.is_playing = False
self.currently_playing = False # 当前是否正在播放(双重状态检查)
self.playback_buffer = [] # 播放缓冲区
self.total_chunks_played = 0
self.total_audio_size = 0
self.last_playback_time = 0 # 最后播放时间戳
self.playback_cooldown_period = 1.5 # 播放冷却时间(秒)- 防止回声
# 智能缓冲系统 - 借鉴 recorder.py 的缓冲策略
self.preload_buffer = [] # 预加载缓冲区
self.preload_size = 3 # 预加载3个音频块
self.min_buffer_size = 1 # 最小缓冲区大小
self.audio_device_healthy = True # 音频设备健康状态
# 统一播放工作线程 - 核心改进
self.playback_worker_running = True
self.playback_worker_thread = None
# 播放完成检测
self.last_audio_time = 0 # 最后收到音频数据的时间
self.playback_timeout = 5.0 # 播放超时时间(秒)
self.completion_sent = False # 防止重复发送完成事件
# PyAudio实例
self.audio = None
self.output_stream = None
# 运行状态
self.running = True
# TTS 工作线程
self.tts_worker_running = True
self.tts_worker_thread = None
self.tts_task_queue = mp.Queue(maxsize=10)
# TTS 配置
self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
self.tts_app_id = "8718217928"
self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.tts_resource_id = "volc.service_type.10029"
self.tts_app_key = "aGjiRDfUWi"
self.tts_speaker = "zh_female_wanqudashu_moon_bigtts"
# 启动工作线程 - 先启动播放线程再启动TTS线程
self._start_playback_worker()
self._start_tts_worker()
def _start_playback_worker(self):
"""启动播放工作线程 - 借鉴 recorder.py 的播放线程模式"""
self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_worker_thread.start()
self.logger.info("播放工作线程已启动")
def _playback_worker(self):
"""播放工作线程 - 专门负责音频播放,完全借鉴 recorder.py"""
print("🔊 播放工作线程开始运行...")
# 等待音频设备就绪和初始化完成
max_wait_time = 10 # 最多等待10秒
wait_start_time = time.time()
while (self.audio is None or not self.running) and (time.time() - wait_start_time) < max_wait_time:
time.sleep(0.1)
if self.audio is None:
print("❌ 音频设备未就绪,播放工作线程退出")
self.logger.error("音频设备未就绪,播放工作线程退出")
return
print("🔊 音频设备已就绪,开始创建播放流")
# 创建音频播放流
playback_stream = None
try:
playback_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=512,
start=True # 立即启动流
)
print("🔊 音频播放流已创建")
except Exception as e:
print(f"❌ 创建音频播放流失败: {e}")
self.logger.error(f"创建音频播放流失败: {e}")
return
chunks_played = 0
total_size = 0
try:
while self.playback_worker_running:
try:
# 从播放缓冲区获取音频数据
if self.playback_buffer:
audio_chunk = self.playback_buffer.pop(0)
if audio_chunk and len(audio_chunk) > 0:
# 检查播放冷却期
current_time = time.time()
time_since_last_play = current_time - self.last_playback_time
in_cooldown = (self.last_playback_time > 0 and
time_since_last_play < self.playback_cooldown_period)
if in_cooldown:
# 在冷却期内,跳过播放
print(f"🔊 播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)")
self.logger.debug(f"播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)")
continue
# 播放音频块
self.currently_playing = True
self.last_playback_time = current_time # 更新最后播放时间
playback_stream.write(audio_chunk)
chunks_played += 1
total_size += len(audio_chunk)
# 减少进度显示频率
if chunks_played % 10 == 0:
progress = f"🔊 播放工作: {chunks_played} 块 | {total_size / 1024:.1f} KB"
print(f"\r{progress}", end='', flush=True)
else:
self.currently_playing = False
else:
# 缓冲区为空,检查是否还在接收数据
self.currently_playing = False
time.sleep(0.01) # 短暂休眠减少CPU占用
continue
except Exception as e:
print(f"❌ 播放工作线程错误: {e}")
self.logger.error(f"播放工作线程错误: {e}")
self.currently_playing = False
time.sleep(0.1)
print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB")
finally:
self.currently_playing = False
if playback_stream:
try:
playback_stream.stop_stream()
playback_stream.close()
except:
pass
print("🔊 音频播放流已关闭")
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
'buffer_size': 1000,
'show_progress': True,
'progress_interval': 100
}
def run(self):
"""输出进程主循环 - 借鉴 recorder.py 的优雅主循环"""
self.logger.info("输出进程启动")
self._setup_audio()
try:
while self.running:
# 1. 处理音频队列(数据接收)
self._process_audio_queue()
# 2. 检查设备健康状态和冷却期 - 防止回声
current_time = time.time()
time_since_last_play = current_time - self.last_playback_time
in_cooldown = (self.last_playback_time > 0 and
time_since_last_play < self.playback_cooldown_period)
# 检查TTS和播放状态
tts_active = not self.tts_task_queue.empty()
playback_active = self.currently_playing or len(self.playback_buffer) > 0
# 如果设备不健康、正在播放、TTS生成中、或在冷却期内显示状态并跳过音频处理
if not self.audio_device_healthy or tts_active or playback_active or in_cooldown:
# 显示播放状态
tts_queue_size = self.tts_task_queue.qsize()
queue_size = len(self.playback_buffer) + len(self.preload_buffer)
if not self.audio_device_healthy:
status = f"🔧 设备重置中... TTS: {tts_queue_size} 播放: {queue_size}"
elif tts_active:
status = f"🎵 TTS生成中... TTS: {tts_queue_size} 播放: {queue_size}"
elif in_cooldown:
cooldown_time = self.playback_cooldown_period - time_since_last_play
status = f"🔊 播放冷却中... {cooldown_time:.1f}s TTS: {tts_queue_size} 播放: {queue_size}"
else:
playing_status = "播放中" if self.currently_playing else "等待播放"
status = f"🔊 {playing_status}... TTS: {tts_queue_size} 播放: {queue_size}"
print(f"\r{status}", end='', flush=True)
time.sleep(0.1) # 播放时增加延迟减少CPU使用
continue
# 3. 主动检查预加载缓冲区,确保音频能及时播放
if (not self.is_playing and
len(self.preload_buffer) >= self.min_buffer_size and
len(self.playback_buffer) == 0):
# 立即将预加载数据移到播放缓冲区
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
print(f"🎵 主循环检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}")
# 4. 显示播放进度
self._show_progress()
# 5. 借鉴 recorder.py: 根据播放状态调整休眠时间,优化性能
if self.is_playing and (self.playback_buffer or self.preload_buffer):
time.sleep(0.005) # 播放时极短休眠,提高响应性
else:
time.sleep(0.02) # 非播放时稍长休眠减少CPU占用
except KeyboardInterrupt:
self.logger.info("输出进程收到中断信号")
except Exception as e:
self.logger.error(f"输出进程错误: {e}")
import traceback
print(f"❌ 输出进程错误详情: {traceback.format_exc()}")
finally:
self._cleanup()
self.logger.info("输出进程退出")
def _setup_audio(self):
"""设置音频输出设备"""
try:
self.audio = pyaudio.PyAudio()
print("🔊 PyAudio实例已创建")
# 主进程不需要创建输出流,由播放工作线程负责
# 这里只创建PyAudio实例供播放工作线程使用
self.output_stream = None # 标记为None表明主进程不直接使用输出流
self.logger.info("音频设备初始化成功")
print("🔊 音频设备初始化完成")
except Exception as e:
self.logger.error(f"音频设备初始化失败: {e}")
print(f"❌ 音频设备初始化失败: {e}")
raise
def _process_audio_queue(self):
"""处理来自主进程的音频数据 - 借鉴 recorder.py 的优雅队列处理"""
try:
processed_count = 0
end_signal_received = False
while self.running:
try:
# 使用较短的超时,保持响应性
audio_data = self.audio_queue.get(timeout=0.05)
processed_count += 1
# 减少日志输出频率,提高性能
if processed_count <= 3 or processed_count % 50 == 0:
print(f"📥 输出进程收到第 {processed_count} 个数据包")
if audio_data is None:
# 结束信号处理
print(f"📥 输出进程收到结束信号")
end_signal_received = True
# 重置完成事件标记
self.completion_sent = False
# 检查是否应该立即结束
tts_queue_size = self.tts_task_queue.qsize()
playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer)
if tts_queue_size == 0 and playback_queue_size == 0 and not self.currently_playing:
print(f"📥 所有队列已清空且未在播放,处理结束信号")
self._finish_playback()
return
else:
print(f"📥 延迟处理结束信号 - TTS队列: {tts_queue_size}, 播放缓冲: {playback_queue_size}")
# 重新放回队列,稍后重试
self.audio_queue.put(None)
time.sleep(0.05)
continue
if isinstance(audio_data, str) and audio_data.startswith("METADATA:"):
# 处理元数据
metadata = audio_data[9:]
print(f"📥 输出进程收到元数据: {metadata}")
continue
# 音频数据处理
if isinstance(audio_data, bytes):
# 更新最后收到音频数据的时间
self.last_audio_time = time.time()
# 减少日志输出,提高性能
if processed_count <= 3 or processed_count % 50 == 0:
print(f"📥 输出进程收到音频数据: {len(audio_data)} 字节")
# 直接添加到预加载缓冲区
self.preload_buffer.append(audio_data)
# 检查是否应该开始播放
if (not self.is_playing and
len(self.preload_buffer) >= self.preload_size):
# 将预加载的数据移到播放缓冲区
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
print(f"🎵 开始播放音频(预加载完成),播放缓冲区大小: {len(self.playback_buffer)}")
else:
print(f"📥 输出进程收到未知类型数据: {type(audio_data)}")
except queue.Empty:
# 队列为空时的处理
if end_signal_received:
tts_queue_size = self.tts_task_queue.qsize()
playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer)
if tts_queue_size == 0 and playback_queue_size == 0 and not self.currently_playing:
print(f"📥 播放完成,所有工作已完成")
self._finish_playback()
return
# 检查是否应该将预加载缓冲区的数据移到播放缓冲区
if (not self.is_playing and
len(self.preload_buffer) >= self.min_buffer_size):
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
print(f"🎵 开始播放音频(最小缓冲区满足)")
# 退出循环避免过度占用CPU
if processed_count > 0:
break
else:
time.sleep(0.01)
except Exception as e:
self.logger.error(f"处理音频队列时出错: {e}")
print(f"❌ 输出进程处理队列时出错: {e}")
def _play_audio(self):
"""播放音频数据 - 简化版本,主要由播放工作线程处理"""
# 现在播放逻辑主要由专门的播放工作线程处理
# 这个方法只负责一些状态检查和协调工作
# 检查是否应该将预加载缓冲区的数据移到播放缓冲区
if (not self.is_playing and
len(self.preload_buffer) >= self.min_buffer_size and
len(self.playback_buffer) == 0):
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
print(f"🎵 主线程检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}")
# 如果缓冲区为空且之前在播放,检查是否应该发送完成事件
if (self.is_playing and
len(self.playback_buffer) == 0 and
len(self.preload_buffer) == 0 and
not self.currently_playing):
# 短暂等待,确保所有音频都播放完成
time.sleep(0.1)
if len(self.playback_buffer) == 0 and len(self.preload_buffer) == 0:
self.is_playing = False
print(f"🎵 播放缓冲区已清空,准备发送完成事件")
# 注意不在这里直接发送完成事件由_process_audio_queue中的结束信号处理
def _show_progress(self):
"""显示播放进度"""
if (self.config['show_progress'] and
self.total_chunks_played > 0 and
self.total_chunks_played % self.config['progress_interval'] == 0):
progress = f"🔊 播放进度: {self.total_chunks_played} 块 | {self.total_audio_size / 1024:.1f} KB"
print(f"\r{progress}", end='', flush=True)
def _finish_playback(self):
"""完成播放 - 借鉴 recorder.py 的优雅完成机制"""
# 防止重复发送完成事件
if self.completion_sent:
return
self.is_playing = False
self.playback_buffer.clear()
self.preload_buffer.clear()
self.last_playback_time = 0
# 通知主进程播放完成
if self.event_queue and not self.completion_sent:
try:
# 发送播放完成事件到主进程
completion_event = ProcessEvent(
event_type='playback_complete',
metadata={
'completion_time': time.time()
}
)
self.event_queue.put(completion_event)
print("📡 输出进程:已发送播放完成事件到主进程")
self.completion_sent = True
except Exception as e:
self.logger.error(f"发送播放完成事件失败: {e}")
else:
if not self.event_queue:
print("⚠️ 输出进程:未设置事件队列,无法通知主进程播放完成")
# 额外等待确保音频设备完全停止
time.sleep(0.5)
def _cleanup(self):
"""清理资源"""
print("🔊 开始清理输出进程资源...")
# 停止播放工作线程
self.playback_worker_running = False
if self.playback_worker_thread:
print("🔊 等待播放工作线程退出...")
self.playback_worker_thread.join(timeout=3.0)
# 停止 TTS 工作线程
self.tts_worker_running = False
if self.tts_worker_thread:
print("🔊 等待TTS工作线程退出...")
self.tts_task_queue.put(None) # 发送结束信号
self.tts_worker_thread.join(timeout=2.0)
# 清理主进程的输出流(如果存在)
if self.output_stream:
try:
self.output_stream.stop_stream()
self.output_stream.close()
print("🔊 主进程输出流已关闭")
except Exception as e:
print(f"⚠️ 关闭主进程输出流时出错: {e}")
# 清理PyAudio实例
if self.audio:
try:
self.audio.terminate()
print("🔊 PyAudio实例已终止")
except Exception as e:
print(f"⚠️ 终止PyAudio实例时出错: {e}")
print("🔊 输出进程资源清理完成")
def _start_tts_worker(self):
"""启动TTS工作线程"""
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
self.tts_worker_thread.start()
self.logger.info("TTS工作线程已启动")
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
# 从队列获取任务
task = self.tts_task_queue.get(timeout=1.0)
if task is None: # 结束信号
break
task_type, content = task
if task_type == "tts_sentence":
# 生成音频数据并发送到统一播放队列
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
time.sleep(0.1)
def _add_tts_task(self, content):
"""添加TTS任务到队列"""
try:
self.tts_task_queue.put_nowait(("tts_sentence", content))
return True
except queue.Full:
self.logger.warning("TTS任务队列已满丢弃任务")
return False
def _generate_tts_audio(self, text):
"""生成TTS音频数据并发送到统一播放队列 - 借鉴 recorder.py 的流式处理"""
try:
self.logger.info(f"生成TTS音频: {text[:50]}...")
# 清空所有缓冲区,确保新的音频不被旧数据干扰
self.playback_buffer.clear()
self.preload_buffer.clear()
self.is_playing = False
self.completion_sent = False # 重置完成标记
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "output_process_tts"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
self.logger.error(f"TTS请求失败: {response.status_code}")
return False
# 处理流式响应 - 借鉴 recorder.py 的优化策略
total_audio_size = 0
chunk_count = 0
success_count = 0
self.logger.info("开始接收TTS音频流...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 借鉴 recorder.py: 使用预加载缓冲区机制
try:
self.preload_buffer.append(chunk_audio)
success_count += 1
# 检查是否应该开始播放(借鉴 recorder.py 的预加载策略)
if (not self.is_playing and
len(self.preload_buffer) >= self.preload_size):
# 将预加载的数据移到播放缓冲区
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
self.is_playing = True
self.last_playback_time = time.time()
self.logger.info("开始播放TTS音频预加载完成")
# 减少进度显示频率
if chunk_count % 20 == 0: # 减少显示频率
progress = (f"生成音频: {chunk_count} 块 | 成功: {success_count} | "
f"{total_audio_size / 1024:.1f} KB | "
f"预加载: {len(self.preload_buffer)} | "
f"播放缓冲: {len(self.playback_buffer)}")
self.logger.info(progress)
except Exception as e:
self.logger.error(f"添加音频到预加载缓冲区失败: {e}")
continue
continue
if data.get("code", 0) == 20000000:
break
if data.get("code", 0) > 0:
self.logger.error(f"TTS错误响应: {data}")
break
except json.JSONDecodeError:
continue
# 处理剩余的预加载数据
if self.preload_buffer:
self.playback_buffer.extend(self.preload_buffer)
self.preload_buffer.clear()
if not self.is_playing:
self.is_playing = True
self.last_playback_time = time.time()
self.logger.info("开始播放TTS音频处理剩余预加载")
success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0
self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB")
# 等待播放完成
if success_count > 0:
self.logger.info("等待TTS音频播放完成...")
self._wait_for_playback_complete()
return success_count > 0
finally:
response.close()
session.close()
except Exception as e:
self.logger.error(f"TTS音频生成失败: {e}")
return False
def _wait_for_playback_complete(self):
"""等待播放完成"""
max_wait_time = 30 # 最多等待30秒
wait_start_time = time.time()
while (len(self.playback_buffer) > 0 or self.currently_playing) and (time.time() - wait_start_time) < max_wait_time:
# 等待播放缓冲区清空且当前播放完成
time.sleep(0.1)
if len(self.playback_buffer) == 0 and not self.currently_playing:
self.logger.info("TTS音频播放完成")
# 调用播放完成处理,发送完成事件
self._finish_playback()
else:
self.logger.warning(f"TTS音频播放超时剩余 {len(self.playback_buffer)} 块未播放")
# 清空缓冲区
self.playback_buffer.clear()
self.preload_buffer.clear()
# 即使超时也要调用播放完成处理
self._finish_playback()
def process_tts_request(self, text):
"""处理TTS请求的公共接口"""
return self._add_tts_task(text)
if __name__ == "__main__":
# 测试代码
print("音频进程模块测试")
print("这个模块应该在多进程环境中运行")