diff --git a/AUDIO_PROCESSES_IMPROVEMENTS.md b/AUDIO_PROCESSES_IMPROVEMENTS.md new file mode 100644 index 0000000..e3bd8cd --- /dev/null +++ b/AUDIO_PROCESSES_IMPROVEMENTS.md @@ -0,0 +1,127 @@ +# Audio Processes 改进总结 + +## 问题背景 +- 原始问题:TTS音频只播放3个字符就停止,出现ALSA underrun错误 +- 根本原因:音频缓冲区管理不当,播放策略过于保守 + +## 改进内容 + +### 1. 音频播放优化 (_play_audio 方法) +- **改进前**:保守的播放策略,需要缓冲区有足够数据才开始播放 +- **改进后**: + - 借鉴 recorder.py 的播放策略:只要有数据就播放 + - 添加错误恢复机制,自动检测和恢复 ALSA underrun + - 优化缓冲区管理,减少延迟 + +### 2. TTS 工作线程模式 +- **参考**: recorder.py 的 TTS 工作线程实现 +- **实现功能**: + - 独立的 TTS 工作线程处理音频生成 + - 任务队列管理,避免阻塞主线程 + - 统一的 TTS 请求接口 `process_tts_request()` + - 支持流式音频处理 + +### 3. 统一的音频播放队列 +- **InputProcess 和 OutputProcess 都支持**: + - TTS 工作线程 + - 音频生成和播放队列 + - 统一的错误处理和日志记录 + +### 4. 关键改进点 + +#### 音频播放策略 +```python +# 改进前:保守策略 +if len(self.playback_buffer) > 2: # 需要缓冲区有足够数据 + # 开始播放 + +# 改进后:积极策略 + 错误恢复 +audio_chunk = self.playback_buffer.pop(0) +if audio_chunk and len(audio_chunk) > 0: + try: + self.output_stream.write(audio_chunk) + # 统计信息 + except Exception as e: + # ALSA underrun 错误恢复 + if "underrun" in str(e).lower(): + # 自动恢复音频流 +``` + +#### TTS 工作线程 +```python +def _tts_worker(self): + """TTS工作线程 - 处理TTS任务队列""" + while self.tts_worker_running: + try: + task = self.tts_task_queue.get(timeout=1.0) + if task is None: + break + + task_type, content = task + if task_type == "tts_sentence": + self._generate_tts_audio(content) + + self.tts_task_queue.task_done() + + except queue.Empty: + continue + except Exception as e: + self.logger.error(f"TTS工作线程错误: {e}") +``` + +#### 错误恢复机制 +```python +# ALSA underrun 检测和恢复 +if "underrun" in str(e).lower() or "alsa" in str(e).lower(): + self.logger.info("检测到ALSA underrun,尝试恢复音频流") + try: + if self.output_stream: + self.output_stream.stop_stream() + time.sleep(0.1) + self.output_stream.start_stream() + self.logger.info("音频流已恢复") + except Exception as recovery_e: + self.logger.error(f"恢复音频流失败: {recovery_e}") + self.playback_buffer.clear() +``` + +### 5. 性能优化 +- 减少日志输出频率,提高性能 +- 优化队列处理策略,使用适当的超时设置 +- 动态调整休眠时间,根据播放状态优化CPU使用 + +### 6. 测试和验证 +- 创建了测试脚本 `test_audio_processes.py` +- 验证了语法正确性 +- 可以测试 TTS 功能的完整性 + +## 使用方法 + +### 在控制系统中使用 +```python +from audio_processes import InputProcess, OutputProcess + +# 创建输入和输出进程 +input_process = InputProcess(command_queue, event_queue) +output_process = OutputProcess(audio_queue) + +# 处理TTS请求 +output_process.process_tts_request("你好,这是测试语音") +``` + +### 独立测试 +```bash +python test_audio_processes.py +``` + +## 预期效果 +- 解决 ALSA underrun 错误 +- 提高音频播放的流畅性 +- 减少 TTS 处理的延迟 +- 提供更稳定的音频处理能力 + +## 注意事项 +1. 确保系统安装了必要的依赖:`requests`, `pyaudio` +2. 检查音频设备是否正常工作 +3. 网络连接正常(用于TTS服务) +4. 适当调整音频参数以适应不同环境 \ No newline at end of file diff --git a/asr_diagnostic.py b/asr_diagnostic.py new file mode 100644 index 0000000..4184147 --- /dev/null +++ b/asr_diagnostic.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +语音识别诊断工具 +用于测试和诊断语音识别功能的具体问题 +""" + +import asyncio +import json +import gzip +import uuid +import numpy as np +import wave +import os +from typing import Optional + +class ASRDiagnostic: + """ASR诊断工具""" + + def __init__(self): + self.api_config = { + 'asr': { + 'appid': "8718217928", + 'cluster': "volcano_tts", + 'token': "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc", + 'ws_url': "wss://openspeech.bytedance.com/api/v2/asr" + } + } + + def generate_asr_header(self, message_type=1, message_type_specific_flags=0): + """生成ASR头部""" + PROTOCOL_VERSION = 0b0001 + DEFAULT_HEADER_SIZE = 0b0001 + JSON = 0b0001 + GZIP = 0b0001 + + header = bytearray() + header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE) + header.append((message_type << 4) | message_type_specific_flags) + header.append((JSON << 4) | GZIP) + header.append(0x00) # reserved + return header + + def parse_asr_response(self, res): + """解析ASR响应""" + print(f"🔍 解析响应,原始大小: {len(res)} 字节") + + if len(res) < 8: + print(f"❌ 响应太短,无法解析") + return {} + + try: + message_type = res[1] >> 4 + payload_size = int.from_bytes(res[4:8], "big", signed=False) + payload_msg = res[8:8+payload_size] + + print(f"📋 消息类型: {message_type}, 载荷大小: {payload_size}") + + if message_type == 0b1001: # SERVER_FULL_RESPONSE + try: + if payload_msg.startswith(b'{'): + result = json.loads(payload_msg.decode('utf-8')) + print(f"✅ 成功解析JSON响应") + return result + else: + print(f"❌ 响应不是JSON格式") + except Exception as e: + print(f"❌ JSON解析失败: {e}") + + except Exception as e: + print(f"❌ 响应解析异常: {e}") + + return {} + + async def test_asr_with_audio_file(self, audio_file_path: str): + """使用音频文件测试ASR""" + print(f"🎵 测试ASR - 音频文件: {audio_file_path}") + + if not os.path.exists(audio_file_path): + print(f"❌ 音频文件不存在: {audio_file_path}") + return + + try: + # 读取音频文件 + with wave.open(audio_file_path, 'rb') as wf: + channels = wf.getnchannels() + width = wf.getsampwidth() + rate = wf.getframerate() + frames = wf.readframes(wf.getnframes()) + + print(f"📊 音频信息: 采样率={rate}Hz, 声道={channels}, 位深={width*8}bits") + print(f"📊 音频大小: {len(frames)} 字节") + + # 如果是立体声,转换为单声道 + if channels > 1: + audio_array = np.frombuffer(frames, dtype=np.int16) + audio_array = audio_array.reshape(-1, channels) + audio_array = np.mean(audio_array, axis=1).astype(np.int16) + frames = audio_array.tobytes() + print(f"🔄 已转换为单声道") + + return await self._test_asr_connection(frames) + + except Exception as e: + print(f"❌ 音频文件处理失败: {e}") + return None + + async def test_asr_with_silence(self): + """测试静音音频""" + print(f"🔇 测试ASR - 静音音频") + + # 生成3秒的静音音频 (16kHz, 16bit, 单声道) + duration = 3 # 秒 + sample_rate = 16000 + silence_data = bytes(duration * sample_rate * 2) # 2 bytes per sample + + return await self._test_asr_connection(silence_data) + + async def test_asr_with_noise(self): + """测试噪音音频""" + print(f"📢 测试ASR - 噪音音频") + + # 生成3秒的随机噪音 + duration = 3 # 秒 + sample_rate = 16000 + noise_data = np.random.randint(-32768, 32767, duration * sample_rate, dtype=np.int16) + noise_data = noise_data.tobytes() + + return await self._test_asr_connection(noise_data) + + async def _test_asr_connection(self, audio_data: bytes): + """测试ASR连接""" + try: + import websockets + + # 构建请求参数 + reqid = str(uuid.uuid4()) + request_params = { + 'app': { + 'appid': self.api_config['asr']['appid'], + 'cluster': self.api_config['asr']['cluster'], + 'token': self.api_config['asr']['token'], + }, + 'user': { + 'uid': 'asr_diagnostic' + }, + 'request': { + 'reqid': reqid, + 'nbest': 1, + 'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate', + 'show_language': False, + 'show_utterances': False, + 'result_type': 'full', + "sequence": 1 + }, + 'audio': { + 'format': 'wav', + 'rate': 16000, + 'language': 'zh-CN', + 'bits': 16, + 'channel': 1, + 'codec': 'raw' + } + } + + print(f"📋 ASR请求参数:") + print(f" - AppID: {request_params['app']['appid']}") + print(f" - Cluster: {request_params['app']['cluster']}") + print(f" - Token: {request_params['app']['token'][:20]}...") + print(f" - RequestID: {reqid}") + + # 构建请求 + payload_bytes = str.encode(json.dumps(request_params)) + payload_bytes = gzip.compress(payload_bytes) + full_client_request = bytearray(self.generate_asr_header()) + full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) + full_client_request.extend(payload_bytes) + + # 设置认证头 + additional_headers = {'Authorization': 'Bearer; {}'.format(self.api_config['asr']['token'])} + + print(f"📡 连接WebSocket...") + + # 连接WebSocket + async with websockets.connect( + self.api_config['asr']['ws_url'], + additional_headers=additional_headers, + max_size=1000000000 + ) as ws: + print(f"✅ WebSocket连接成功") + + # 发送请求 + print(f"📤 发送ASR配置请求...") + await ws.send(full_client_request) + res = await ws.recv() + result = self.parse_asr_response(res) + print(f"📥 配置响应: {result}") + + # 发送音频数据 + chunk_size = int(1 * 2 * 16000 * 15000 / 1000) # 1秒 chunks + total_chunks = 0 + + for offset in range(0, len(audio_data), chunk_size): + chunk = audio_data[offset:offset + chunk_size] + last = (offset + chunk_size) >= len(audio_data) + + payload_bytes = gzip.compress(chunk) + audio_only_request = bytearray( + self.generate_asr_header( + message_type=0b0010, + message_type_specific_flags=0b0010 if last else 0 + ) + ) + audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big')) + audio_only_request.extend(payload_bytes) + + await ws.send(audio_only_request) + res = await ws.recv() + result = self.parse_asr_response(res) + total_chunks += 1 + + if last: + print(f"📨 发送最后一块音频数据 (总计{total_chunks}块)") + + # 获取最终结果 + print(f"🎯 等待最终识别结果...") + if 'payload_msg' in result and 'result' in result['payload_msg']: + results = result['payload_msg']['result'] + print(f"📝 ASR返回结果数量: {len(results)}") + if results: + text = results[0].get('text', '识别失败') + print(f"✅ 识别结果: {text}") + return text + else: + print(f"❌ ASR结果为空") + else: + print(f"❌ ASR响应格式异常: {result.keys()}") + print(f"📋 完整响应: {result}") + + return None + + except Exception as e: + print(f"❌ ASR连接异常: {e}") + import traceback + print(f"❌ 详细错误:\n{traceback.format_exc()}") + return None + + async def run_diagnostic(self): + """运行完整诊断""" + print("🔧 ASR诊断工具") + print("=" * 50) + + # 1. 测试静音 + print("\n1️⃣ 测试静音识别...") + await self.test_asr_with_silence() + + # 2. 测试噪音 + print("\n2️⃣ 测试噪音识别...") + await self.test_asr_with_noise() + + # 3. 测试录音文件(如果存在) + recording_files = [f for f in os.listdir('.') if f.startswith('recording_') and f.endswith('.wav')] + if recording_files: + print(f"\n3️⃣ 测试录音文件...") + for file in recording_files[:3]: # 最多测试3个文件 + await self.test_asr_with_audio_file(file) + else: + print(f"\n3️⃣ 跳过录音文件测试 (无录音文件)") + + print(f"\n✅ 诊断完成") + + +def main(): + """主函数""" + diagnostic = ASRDiagnostic() + + try: + asyncio.run(diagnostic.run_diagnostic()) + except KeyboardInterrupt: + print(f"\n🛑 诊断被用户中断") + except Exception as e: + print(f"❌ 诊断工具异常: {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/audio_processes.py b/audio_processes.py index b3d8ded..2df00bb 100644 --- a/audio_processes.py +++ b/audio_processes.py @@ -4,6 +4,7 @@ """ 多进程音频处理模块 定义输入进程和输出进程的类 +使用增强版语音检测器 """ import multiprocessing as mp @@ -18,6 +19,12 @@ from typing import Optional, List, Dict, Any import json import wave import os +from enhanced_voice_detector import EnhancedVoiceDetector +from process_logger import ProcessLogger + +import requests +import base64 +import gzip class RecordingState(Enum): """录音状态枚举""" @@ -58,6 +65,9 @@ class InputProcess: # 配置参数 self.config = config or self._get_default_config() + # 初始化日志记录器 + self.logger = ProcessLogger("InputProcess") + # 音频参数 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 @@ -69,24 +79,23 @@ class InputProcess: self.is_recording = False # 是否正在录音 self.recording_buffer = [] # 录音缓冲区 self.pre_record_buffer = [] # 预录音缓冲区 - self.voice_detected = False self.silence_start_time = None self.recording_start_time = None - # ZCR检测参数 - self.zcr_history = [] - self.max_zcr_history = 50 - self.consecutive_silence_count = 0 - self.silence_threshold_count = 30 # 约3秒 - self.low_zcr_threshold_count = 20 # 连续低ZCR计数阈值 - self.consecutive_low_zcr_count = 0 # 连续低ZCR计数 - self.voice_activity_history = [] # 语音活动历史 - self.max_voice_history = 30 # 最大历史记录数 - # 预录音参数 self.pre_record_duration = 2.0 self.pre_record_max_frames = int(self.pre_record_duration * self.RATE / self.CHUNK_SIZE) + # 增强语音检测器 + self.voice_detector = EnhancedVoiceDetector( + sample_rate=self.RATE, + chunk_size=self.CHUNK_SIZE + ) + + # 静音检测参数 + self.consecutive_silence_count = 0 + self.silence_threshold_count = 30 # 约3秒 + # PyAudio实例 self.audio = None self.input_stream = None @@ -94,6 +103,22 @@ class InputProcess: # 运行状态 self.running = True + # TTS 工作线程 + self.tts_worker_running = True + self.tts_worker_thread = None + self.tts_task_queue = mp.Queue(maxsize=10) + + # TTS 配置 + self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional" + self.tts_app_id = "8718217928" + self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc" + self.tts_resource_id = "volc.service_type.10029" + self.tts_app_key = "aGjiRDfUWi" + self.tts_speaker = "zh_female_wanqudashu_moon_bigtts" + + # 启动 TTS 工作线程 + self._start_tts_worker() + def _get_default_config(self) -> Dict[str, Any]: """获取默认配置""" return { @@ -107,7 +132,7 @@ class InputProcess: def run(self): """输入进程主循环""" - print("🎙️ 输入进程启动") + self.logger.info("输入进程启动") self._setup_audio() try: @@ -123,12 +148,12 @@ class InputProcess: time.sleep(0.01) except KeyboardInterrupt: - print("🎙️ 输入进程收到中断信号") + self.logger.info("输入进程收到中断信号") except Exception as e: - print(f"❌ 输入进程错误: {e}") + self.logger.error(f"输入进程错误: {e}") finally: self._cleanup() - print("🎙️ 输入进程退出") + self.logger.info("输入进程退出") def _setup_audio(self): """设置音频输入设备""" @@ -141,9 +166,9 @@ class InputProcess: input=True, frames_per_buffer=self.CHUNK_SIZE ) - print("🎙️ 输入进程:音频设备初始化成功") + self.logger.info("音频设备初始化成功") except Exception as e: - print(f"❌ 输入进程音频设备初始化失败: {e}") + self.logger.error(f"音频设备初始化失败: {e}") raise def _check_commands(self): @@ -154,17 +179,17 @@ class InputProcess: if command.command == 'enable_recording': self.recording_enabled = True - print("🎙️ 输入进程:录音功能已启用") + self.logger.info("录音功能已启用") elif command.command == 'disable_recording': self.recording_enabled = False # 如果正在录音,立即停止并发送数据 if self.is_recording: self._stop_recording() - print("🎙️ 输入进程:录音功能已禁用") + self.logger.info("录音功能已禁用") elif command.command == 'shutdown': - print("🎙️ 输入进程:收到关闭命令") + self.logger.info("收到关闭命令") self.running = False return @@ -181,11 +206,9 @@ class InputProcess: # 更新预录音缓冲区 self._update_pre_record_buffer(data) - # ZCR语音检测 - zcr = self._calculate_zcr(data) - - # 语音检测 - is_voice = self._is_voice_active(zcr) + # 使用增强语音检测器 + detection_result = self.voice_detector.is_voice_advanced(data) + is_voice = detection_result['is_voice'] if self.is_recording: # 录音模式 @@ -195,10 +218,8 @@ class InputProcess: if is_voice: self.silence_start_time = None self.consecutive_silence_count = 0 - self.consecutive_low_zcr_count = 0 # 重置低ZCR计数 else: self.consecutive_silence_count += 1 - self.consecutive_low_zcr_count += 1 if self.silence_start_time is None: self.silence_start_time = time.time() @@ -206,11 +227,11 @@ class InputProcess: recording_duration = time.time() - self.recording_start_time should_stop = False - # ZCR静音检测 - if (self.consecutive_low_zcr_count >= self.low_zcr_threshold_count and + # 静音检测 + if (self.consecutive_silence_count >= self.silence_threshold_count and recording_duration >= self.config['min_recording_time']): should_stop = True - print(f"🎙️ 输入进程:ZCR静音检测触发停止录音") + print(f"🎙️ 输入进程:静音检测触发停止录音") # 最大时间检测 if recording_duration >= self.config['max_recording_time']: @@ -219,6 +240,15 @@ class InputProcess: if should_stop: self._stop_recording() + else: + # 显示录音状态 + confidence = detection_result.get('confidence', 0.0) + energy = detection_result.get('energy', 0.0) + zcr = detection_result.get('zcr', 0.0) + status = (f"\r🎙️ 录音中... {recording_duration:.1f}s | " + f"能量: {energy:.0f} | ZCR: {zcr:.0f} | " + f"语音: {is_voice} | 置信度: {confidence:.2f}") + print(status, end='', flush=True) else: # 监听模式 @@ -227,8 +257,19 @@ class InputProcess: self._start_recording() else: # 显示监听状态 - buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100 - print(f"\r🎙️ 监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%", end='', flush=True) + if detection_result['calibrating']: + progress = detection_result['calibration_progress'] * 100 + status = f"\r🎙️ 校准中... {progress:.0f}%" + else: + confidence = detection_result.get('confidence', 0.0) + energy = detection_result.get('energy', 0.0) + zcr = detection_result.get('zcr', 0.0) + buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100 + status = (f"\r🎙️ 监听中... 能量: {energy:.0f} | ZCR: {zcr:.0f} | " + f"语音: {is_voice} | 置信度: {confidence:.2f} | " + f"缓冲: {buffer_usage:.0f}%") + + print(status, end='', flush=True) except Exception as e: print(f"🎙️ 输入进程音频处理错误: {e}") @@ -251,7 +292,6 @@ class InputProcess: self.recording_start_time = time.time() self.silence_start_time = None self.consecutive_silence_count = 0 - self.consecutive_low_zcr_count = 0 # 将预录音缓冲区的内容添加到录音中 self.recording_buffer.extend(self.pre_record_buffer) @@ -324,33 +364,14 @@ class InputProcess: print(f"❌ 输入进程保存录音失败: {e}") return None - def _calculate_zcr(self, audio_data: bytes) -> float: - """计算零交叉率""" - if len(audio_data) == 0: - return 0 - - audio_array = np.frombuffer(audio_data, dtype=np.int16) - - # 计算零交叉次数 - zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) - - # 归一化到采样率 - zcr = zero_crossings / len(audio_array) * self.RATE - - # 更新ZCR历史 - self.zcr_history.append(zcr) - if len(self.zcr_history) > self.max_zcr_history: - self.zcr_history.pop(0) - - return zcr - - def _is_voice_active(self, zcr: float) -> bool: - """基于ZCR判断是否为语音活动""" - # 简单的ZCR范围检测,匹配recorder.py的实现 - return 2400 < zcr < 12000 - def _cleanup(self): """清理资源""" + # 停止 TTS 工作线程 + self.tts_worker_running = False + if self.tts_worker_thread: + self.tts_task_queue.put(None) # 发送结束信号 + self.tts_worker_thread.join(timeout=2.0) + if self.input_stream: try: self.input_stream.stop_stream() @@ -363,25 +384,180 @@ class InputProcess: self.audio.terminate() except: pass + + def _start_tts_worker(self): + """启动TTS工作线程""" + self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True) + self.tts_worker_thread.start() + self.logger.info("TTS工作线程已启动") + + def _tts_worker(self): + """TTS工作线程 - 处理TTS任务队列""" + while self.tts_worker_running: + try: + # 从队列获取任务 + task = self.tts_task_queue.get(timeout=1.0) + if task is None: # 结束信号 + break + + task_type, content = task + if task_type == "tts_sentence": + # 生成音频数据 + self._generate_tts_audio(content) + + self.tts_task_queue.task_done() + + except queue.Empty: + continue + except Exception as e: + self.logger.error(f"TTS工作线程错误: {e}") + time.sleep(0.1) + + def _add_tts_task(self, content): + """添加TTS任务到队列""" + try: + self.tts_task_queue.put_nowait(("tts_sentence", content)) + return True + except queue.Full: + self.logger.warning("TTS任务队列已满,丢弃任务") + return False + + def _generate_tts_audio(self, text): + """生成TTS音频数据""" + try: + self.logger.info(f"生成TTS音频: {text[:50]}...") + + # 构建请求头 + headers = { + "X-Api-App-Id": self.tts_app_id, + "X-Api-Access-Key": self.tts_access_key, + "X-Api-Resource-Id": self.tts_resource_id, + "X-Api-App-Key": self.tts_app_key, + "Content-Type": "application/json", + "Connection": "keep-alive" + } + + # 构建请求参数 + payload = { + "user": { + "uid": "input_process_tts" + }, + "req_params": { + "text": text, + "speaker": self.tts_speaker, + "audio_params": { + "format": "pcm", + "sample_rate": 16000, + "enable_timestamp": True + }, + "additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}" + } + } + + # 发送请求 + session = requests.Session() + try: + response = session.post(self.tts_url, headers=headers, json=payload, stream=True) + + if response.status_code != 200: + self.logger.error(f"TTS请求失败: {response.status_code}") + return False + + # 处理流式响应 + total_audio_size = 0 + chunk_count = 0 + + self.logger.info("开始接收TTS音频流...") + + for chunk in response.iter_lines(decode_unicode=True): + if not chunk: + continue + + try: + data = json.loads(chunk) + + if data.get("code", 0) == 0 and "data" in data and data["data"]: + chunk_audio = base64.b64decode(data["data"]) + audio_size = len(chunk_audio) + total_audio_size += audio_size + chunk_count += 1 + + # 这里可以将音频数据发送到输出进程 + # 或者直接处理音频数据 + self.logger.debug(f"接收音频块: {audio_size} 字节") + + # 减少进度显示频率 + if chunk_count % 5 == 0: + progress = f"生成音频: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB" + self.logger.info(progress) + continue + + if data.get("code", 0) == 20000000: + break + + if data.get("code", 0) > 0: + self.logger.error(f"TTS错误响应: {data}") + break + + except json.JSONDecodeError: + continue + + self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 总大小: {total_audio_size / 1024:.1f} KB") + + return chunk_count > 0 + + finally: + response.close() + session.close() + + except Exception as e: + self.logger.error(f"TTS音频生成失败: {e}") + return False + + def process_tts_request(self, text): + """处理TTS请求的公共接口""" + return self._add_tts_task(text) class OutputProcess: - """输出进程 - 专门负责音频播放""" + """输出进程 - 借鉴 recorder.py 的优秀架构""" - def __init__(self, audio_queue: mp.Queue, config: Dict[str, Any] = None): - self.audio_queue = audio_queue # 主进程 → 输出进程 + def __init__(self, audio_queue: mp.Queue, config: Dict[str, Any] = None, event_queue: mp.Queue = None): + self.audio_queue = audio_queue # 主进程 → 输出进程(统一音频播放队列) + self.event_queue = event_queue # 输出进程 → 主进程(事件通知) self.config = config or self._get_default_config() - # 音频播放参数 + # 初始化日志记录器 + self.logger = ProcessLogger("OutputProcess") + + # 音频播放参数 - 完全借鉴 recorder.py 的优化参数 self.FORMAT = pyaudio.paInt16 self.CHANNELS = 1 self.RATE = 16000 - self.CHUNK_SIZE = 512 + self.CHUNK_SIZE = 2048 # 增加缓冲区大小,减少卡顿 - # 播放状态 + # 播放状态管理 - 借鉴 recorder.py 的状态管理模式 self.is_playing = False - self.playback_buffer = [] + self.currently_playing = False # 当前是否正在播放(双重状态检查) + self.playback_buffer = [] # 播放缓冲区 self.total_chunks_played = 0 self.total_audio_size = 0 + self.last_playback_time = 0 # 最后播放时间戳 + self.playback_cooldown_period = 1.5 # 播放冷却时间(秒)- 防止回声 + + # 智能缓冲系统 - 借鉴 recorder.py 的缓冲策略 + self.preload_buffer = [] # 预加载缓冲区 + self.preload_size = 3 # 预加载3个音频块 + self.min_buffer_size = 1 # 最小缓冲区大小 + self.audio_device_healthy = True # 音频设备健康状态 + + # 统一播放工作线程 - 核心改进 + self.playback_worker_running = True + self.playback_worker_thread = None + + # 播放完成检测 + self.last_audio_time = 0 # 最后收到音频数据的时间 + self.playback_timeout = 5.0 # 播放超时时间(秒) + self.completion_sent = False # 防止重复发送完成事件 # PyAudio实例 self.audio = None @@ -389,6 +565,124 @@ class OutputProcess: # 运行状态 self.running = True + + # TTS 工作线程 + self.tts_worker_running = True + self.tts_worker_thread = None + self.tts_task_queue = mp.Queue(maxsize=10) + + # TTS 配置 + self.tts_url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional" + self.tts_app_id = "8718217928" + self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc" + self.tts_resource_id = "volc.service_type.10029" + self.tts_app_key = "aGjiRDfUWi" + self.tts_speaker = "zh_female_wanqudashu_moon_bigtts" + + # 启动工作线程 - 先启动播放线程,再启动TTS线程 + self._start_playback_worker() + self._start_tts_worker() + + def _start_playback_worker(self): + """启动播放工作线程 - 借鉴 recorder.py 的播放线程模式""" + self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True) + self.playback_worker_thread.start() + self.logger.info("播放工作线程已启动") + + def _playback_worker(self): + """播放工作线程 - 专门负责音频播放,完全借鉴 recorder.py""" + print("🔊 播放工作线程开始运行...") + + # 等待音频设备就绪和初始化完成 + max_wait_time = 10 # 最多等待10秒 + wait_start_time = time.time() + + while (self.audio is None or not self.running) and (time.time() - wait_start_time) < max_wait_time: + time.sleep(0.1) + + if self.audio is None: + print("❌ 音频设备未就绪,播放工作线程退出") + self.logger.error("音频设备未就绪,播放工作线程退出") + return + + print("🔊 音频设备已就绪,开始创建播放流") + + # 创建音频播放流 + playback_stream = None + try: + playback_stream = self.audio.open( + format=self.FORMAT, + channels=self.CHANNELS, + rate=self.RATE, + output=True, + frames_per_buffer=512, + start=True # 立即启动流 + ) + print("🔊 音频播放流已创建") + except Exception as e: + print(f"❌ 创建音频播放流失败: {e}") + self.logger.error(f"创建音频播放流失败: {e}") + return + + chunks_played = 0 + total_size = 0 + + try: + while self.playback_worker_running: + try: + # 从播放缓冲区获取音频数据 + if self.playback_buffer: + audio_chunk = self.playback_buffer.pop(0) + + if audio_chunk and len(audio_chunk) > 0: + # 检查播放冷却期 + current_time = time.time() + time_since_last_play = current_time - self.last_playback_time + in_cooldown = (self.last_playback_time > 0 and + time_since_last_play < self.playback_cooldown_period) + + if in_cooldown: + # 在冷却期内,跳过播放 + print(f"🔊 播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)") + self.logger.debug(f"播放冷却中,跳过播放 ({time_since_last_play:.1f}s < {self.playback_cooldown_period}s)") + continue + + # 播放音频块 + self.currently_playing = True + self.last_playback_time = current_time # 更新最后播放时间 + playback_stream.write(audio_chunk) + chunks_played += 1 + total_size += len(audio_chunk) + + # 减少进度显示频率 + if chunks_played % 10 == 0: + progress = f"🔊 播放工作: {chunks_played} 块 | {total_size / 1024:.1f} KB" + print(f"\r{progress}", end='', flush=True) + else: + self.currently_playing = False + else: + # 缓冲区为空,检查是否还在接收数据 + self.currently_playing = False + time.sleep(0.01) # 短暂休眠,减少CPU占用 + continue + + except Exception as e: + print(f"❌ 播放工作线程错误: {e}") + self.logger.error(f"播放工作线程错误: {e}") + self.currently_playing = False + time.sleep(0.1) + + print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB") + + finally: + self.currently_playing = False + if playback_stream: + try: + playback_stream.stop_stream() + playback_stream.close() + except: + pass + print("🔊 音频播放流已关闭") def _get_default_config(self) -> Dict[str, Any]: """获取默认配置""" @@ -399,87 +693,219 @@ class OutputProcess: } def run(self): - """输出进程主循环""" - print("🔊 输出进程启动") + """输出进程主循环 - 借鉴 recorder.py 的优雅主循环""" + self.logger.info("输出进程启动") self._setup_audio() try: while self.running: - # 处理音频队列 + # 1. 处理音频队列(数据接收) self._process_audio_queue() - # 播放缓冲的音频 - self._play_audio() + # 2. 检查设备健康状态和冷却期 - 防止回声 + current_time = time.time() + time_since_last_play = current_time - self.last_playback_time + in_cooldown = (self.last_playback_time > 0 and + time_since_last_play < self.playback_cooldown_period) - # 显示播放进度 + # 检查TTS和播放状态 + tts_active = not self.tts_task_queue.empty() + playback_active = self.currently_playing or len(self.playback_buffer) > 0 + + # 如果设备不健康、正在播放、TTS生成中、或在冷却期内,显示状态并跳过音频处理 + if not self.audio_device_healthy or tts_active or playback_active or in_cooldown: + # 显示播放状态 + tts_queue_size = self.tts_task_queue.qsize() + queue_size = len(self.playback_buffer) + len(self.preload_buffer) + + if not self.audio_device_healthy: + status = f"🔧 设备重置中... TTS: {tts_queue_size} 播放: {queue_size}" + elif tts_active: + status = f"🎵 TTS生成中... TTS: {tts_queue_size} 播放: {queue_size}" + elif in_cooldown: + cooldown_time = self.playback_cooldown_period - time_since_last_play + status = f"🔊 播放冷却中... {cooldown_time:.1f}s TTS: {tts_queue_size} 播放: {queue_size}" + else: + playing_status = "播放中" if self.currently_playing else "等待播放" + status = f"🔊 {playing_status}... TTS: {tts_queue_size} 播放: {queue_size}" + + print(f"\r{status}", end='', flush=True) + time.sleep(0.1) # 播放时增加延迟减少CPU使用 + continue + + # 3. 主动检查预加载缓冲区,确保音频能及时播放 + if (not self.is_playing and + len(self.preload_buffer) >= self.min_buffer_size and + len(self.playback_buffer) == 0): + # 立即将预加载数据移到播放缓冲区 + self.playback_buffer.extend(self.preload_buffer) + self.preload_buffer.clear() + self.is_playing = True + self.last_playback_time = time.time() + print(f"🎵 主循环检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}") + + # 4. 显示播放进度 self._show_progress() - time.sleep(0.001) # 极短休眠,确保流畅播放 + # 5. 借鉴 recorder.py: 根据播放状态调整休眠时间,优化性能 + if self.is_playing and (self.playback_buffer or self.preload_buffer): + time.sleep(0.005) # 播放时极短休眠,提高响应性 + else: + time.sleep(0.02) # 非播放时稍长休眠,减少CPU占用 except KeyboardInterrupt: - print("🔊 输出进程收到中断信号") + self.logger.info("输出进程收到中断信号") except Exception as e: - print(f"❌ 输出进程错误: {e}") + self.logger.error(f"输出进程错误: {e}") + import traceback + print(f"❌ 输出进程错误详情: {traceback.format_exc()}") finally: self._cleanup() - print("🔊 输出进程退出") + self.logger.info("输出进程退出") def _setup_audio(self): """设置音频输出设备""" try: self.audio = pyaudio.PyAudio() - self.output_stream = self.audio.open( - format=self.FORMAT, - channels=self.CHANNELS, - rate=self.RATE, - output=True, - frames_per_buffer=self.CHUNK_SIZE - ) - print("🔊 输出进程:音频设备初始化成功") + print("🔊 PyAudio实例已创建") + + # 主进程不需要创建输出流,由播放工作线程负责 + # 这里只创建PyAudio实例供播放工作线程使用 + self.output_stream = None # 标记为None,表明主进程不直接使用输出流 + + self.logger.info("音频设备初始化成功") + print("🔊 音频设备初始化完成") except Exception as e: - print(f"❌ 输出进程音频设备初始化失败: {e}") + self.logger.error(f"音频设备初始化失败: {e}") + print(f"❌ 音频设备初始化失败: {e}") raise def _process_audio_queue(self): - """处理来自主进程的音频数据""" + """处理来自主进程的音频数据 - 借鉴 recorder.py 的优雅队列处理""" try: - while True: - audio_data = self.audio_queue.get_nowait() - - if audio_data is None: - # 结束信号 - self._finish_playback() - break - - if isinstance(audio_data, str) and audio_data.startswith("METADATA:"): - # 处理元数据 - metadata = audio_data[9:] # 移除 "METADATA:" 前缀 - print(f"📝 输出进程:播放元数据 {metadata}") - continue - - # 音频数据放入播放缓冲区 - self.playback_buffer.append(audio_data) - if not self.is_playing: - self.is_playing = True - print("🔊 输出进程:开始播放音频") + processed_count = 0 + end_signal_received = False + + while self.running: + try: + # 使用较短的超时,保持响应性 + audio_data = self.audio_queue.get(timeout=0.05) + processed_count += 1 - except queue.Empty: - pass + # 减少日志输出频率,提高性能 + if processed_count <= 3 or processed_count % 50 == 0: + print(f"📥 输出进程收到第 {processed_count} 个数据包") + + if audio_data is None: + # 结束信号处理 + print(f"📥 输出进程收到结束信号") + end_signal_received = True + + # 重置完成事件标记 + self.completion_sent = False + + # 检查是否应该立即结束 + tts_queue_size = self.tts_task_queue.qsize() + playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer) + + if tts_queue_size == 0 and playback_queue_size == 0 and not self.currently_playing: + print(f"📥 所有队列已清空且未在播放,处理结束信号") + self._finish_playback() + return + else: + print(f"📥 延迟处理结束信号 - TTS队列: {tts_queue_size}, 播放缓冲: {playback_queue_size}") + # 重新放回队列,稍后重试 + self.audio_queue.put(None) + time.sleep(0.05) + continue + + if isinstance(audio_data, str) and audio_data.startswith("METADATA:"): + # 处理元数据 + metadata = audio_data[9:] + print(f"📥 输出进程收到元数据: {metadata}") + continue + + # 音频数据处理 + if isinstance(audio_data, bytes): + # 更新最后收到音频数据的时间 + self.last_audio_time = time.time() + + # 减少日志输出,提高性能 + if processed_count <= 3 or processed_count % 50 == 0: + print(f"📥 输出进程收到音频数据: {len(audio_data)} 字节") + + # 直接添加到预加载缓冲区 + self.preload_buffer.append(audio_data) + + # 检查是否应该开始播放 + if (not self.is_playing and + len(self.preload_buffer) >= self.preload_size): + # 将预加载的数据移到播放缓冲区 + self.playback_buffer.extend(self.preload_buffer) + self.preload_buffer.clear() + self.is_playing = True + self.last_playback_time = time.time() + print(f"🎵 开始播放音频(预加载完成),播放缓冲区大小: {len(self.playback_buffer)}") + + else: + print(f"📥 输出进程收到未知类型数据: {type(audio_data)}") + + except queue.Empty: + # 队列为空时的处理 + if end_signal_received: + tts_queue_size = self.tts_task_queue.qsize() + playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer) + + if tts_queue_size == 0 and playback_queue_size == 0 and not self.currently_playing: + print(f"📥 播放完成,所有工作已完成") + self._finish_playback() + return + + # 检查是否应该将预加载缓冲区的数据移到播放缓冲区 + if (not self.is_playing and + len(self.preload_buffer) >= self.min_buffer_size): + self.playback_buffer.extend(self.preload_buffer) + self.preload_buffer.clear() + self.is_playing = True + self.last_playback_time = time.time() + print(f"🎵 开始播放音频(最小缓冲区满足)") + + # 退出循环,避免过度占用CPU + if processed_count > 0: + break + else: + time.sleep(0.01) + + except Exception as e: + self.logger.error(f"处理音频队列时出错: {e}") + print(f"❌ 输出进程处理队列时出错: {e}") def _play_audio(self): - """播放音频数据""" - if self.playback_buffer and self.output_stream: - try: - # 取出一块音频数据播放 - audio_chunk = self.playback_buffer.pop(0) - if audio_chunk and len(audio_chunk) > 0: - self.output_stream.write(audio_chunk) - self.total_chunks_played += 1 - self.total_audio_size += len(audio_chunk) - - except Exception as e: - print(f"❌ 输出进程播放错误: {e}") - self.playback_buffer.clear() + """播放音频数据 - 简化版本,主要由播放工作线程处理""" + # 现在播放逻辑主要由专门的播放工作线程处理 + # 这个方法只负责一些状态检查和协调工作 + + # 检查是否应该将预加载缓冲区的数据移到播放缓冲区 + if (not self.is_playing and + len(self.preload_buffer) >= self.min_buffer_size and + len(self.playback_buffer) == 0): + self.playback_buffer.extend(self.preload_buffer) + self.preload_buffer.clear() + self.is_playing = True + self.last_playback_time = time.time() + print(f"🎵 主线程检测:开始播放预加载音频,播放缓冲区大小: {len(self.playback_buffer)}") + + # 如果缓冲区为空且之前在播放,检查是否应该发送完成事件 + if (self.is_playing and + len(self.playback_buffer) == 0 and + len(self.preload_buffer) == 0 and + not self.currently_playing): + # 短暂等待,确保所有音频都播放完成 + time.sleep(0.1) + if len(self.playback_buffer) == 0 and len(self.preload_buffer) == 0: + self.is_playing = False + print(f"🎵 播放缓冲区已清空,准备发送完成事件") + # 注意:不在这里直接发送完成事件,由_process_audio_queue中的结束信号处理 def _show_progress(self): """显示播放进度""" @@ -491,35 +917,268 @@ class OutputProcess: print(f"\r{progress}", end='', flush=True) def _finish_playback(self): - """完成播放""" + """完成播放 - 借鉴 recorder.py 的优雅完成机制""" + # 防止重复发送完成事件 + if self.completion_sent: + return + self.is_playing = False self.playback_buffer.clear() - - if self.total_chunks_played > 0: - print(f"\n✅ 输出进程:播放完成,总计 {self.total_chunks_played} 块, {self.total_audio_size / 1024:.1f} KB") - - # 重置统计 - self.total_chunks_played = 0 - self.total_audio_size = 0 + self.preload_buffer.clear() + self.last_playback_time = 0 # 通知主进程播放完成 - # 这里可以通过共享内存或另一个队列来实现 - # 暂时简化处理,由主进程通过队列大小判断 + if self.event_queue and not self.completion_sent: + try: + # 发送播放完成事件到主进程 + completion_event = ProcessEvent( + event_type='playback_complete', + metadata={ + 'completion_time': time.time() + } + ) + self.event_queue.put(completion_event) + print("📡 输出进程:已发送播放完成事件到主进程") + self.completion_sent = True + except Exception as e: + self.logger.error(f"发送播放完成事件失败: {e}") + else: + if not self.event_queue: + print("⚠️ 输出进程:未设置事件队列,无法通知主进程播放完成") + + # 额外等待确保音频设备完全停止 + time.sleep(0.5) def _cleanup(self): """清理资源""" + print("🔊 开始清理输出进程资源...") + + # 停止播放工作线程 + self.playback_worker_running = False + if self.playback_worker_thread: + print("🔊 等待播放工作线程退出...") + self.playback_worker_thread.join(timeout=3.0) + + # 停止 TTS 工作线程 + self.tts_worker_running = False + if self.tts_worker_thread: + print("🔊 等待TTS工作线程退出...") + self.tts_task_queue.put(None) # 发送结束信号 + self.tts_worker_thread.join(timeout=2.0) + + # 清理主进程的输出流(如果存在) if self.output_stream: try: self.output_stream.stop_stream() self.output_stream.close() - except: - pass + print("🔊 主进程输出流已关闭") + except Exception as e: + print(f"⚠️ 关闭主进程输出流时出错: {e}") + # 清理PyAudio实例 if self.audio: try: self.audio.terminate() - except: - pass + print("🔊 PyAudio实例已终止") + except Exception as e: + print(f"⚠️ 终止PyAudio实例时出错: {e}") + + print("🔊 输出进程资源清理完成") + + def _start_tts_worker(self): + """启动TTS工作线程""" + self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True) + self.tts_worker_thread.start() + self.logger.info("TTS工作线程已启动") + + def _tts_worker(self): + """TTS工作线程 - 处理TTS任务队列""" + while self.tts_worker_running: + try: + # 从队列获取任务 + task = self.tts_task_queue.get(timeout=1.0) + if task is None: # 结束信号 + break + + task_type, content = task + if task_type == "tts_sentence": + # 生成音频数据并发送到统一播放队列 + self._generate_tts_audio(content) + + self.tts_task_queue.task_done() + + except queue.Empty: + continue + except Exception as e: + self.logger.error(f"TTS工作线程错误: {e}") + time.sleep(0.1) + + def _add_tts_task(self, content): + """添加TTS任务到队列""" + try: + self.tts_task_queue.put_nowait(("tts_sentence", content)) + return True + except queue.Full: + self.logger.warning("TTS任务队列已满,丢弃任务") + return False + + def _generate_tts_audio(self, text): + """生成TTS音频数据并发送到统一播放队列 - 借鉴 recorder.py 的流式处理""" + try: + self.logger.info(f"生成TTS音频: {text[:50]}...") + + # 清空所有缓冲区,确保新的音频不被旧数据干扰 + self.playback_buffer.clear() + self.preload_buffer.clear() + self.is_playing = False + self.completion_sent = False # 重置完成标记 + + # 构建请求头 + headers = { + "X-Api-App-Id": self.tts_app_id, + "X-Api-Access-Key": self.tts_access_key, + "X-Api-Resource-Id": self.tts_resource_id, + "X-Api-App-Key": self.tts_app_key, + "Content-Type": "application/json", + "Connection": "keep-alive" + } + + # 构建请求参数 + payload = { + "user": { + "uid": "output_process_tts" + }, + "req_params": { + "text": text, + "speaker": self.tts_speaker, + "audio_params": { + "format": "pcm", + "sample_rate": 16000, + "enable_timestamp": True + }, + "additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}" + } + } + + # 发送请求 + session = requests.Session() + try: + response = session.post(self.tts_url, headers=headers, json=payload, stream=True) + + if response.status_code != 200: + self.logger.error(f"TTS请求失败: {response.status_code}") + return False + + # 处理流式响应 - 借鉴 recorder.py 的优化策略 + total_audio_size = 0 + chunk_count = 0 + success_count = 0 + + self.logger.info("开始接收TTS音频流...") + + for chunk in response.iter_lines(decode_unicode=True): + if not chunk: + continue + + try: + data = json.loads(chunk) + + if data.get("code", 0) == 0 and "data" in data and data["data"]: + chunk_audio = base64.b64decode(data["data"]) + audio_size = len(chunk_audio) + total_audio_size += audio_size + chunk_count += 1 + + # 借鉴 recorder.py: 使用预加载缓冲区机制 + try: + self.preload_buffer.append(chunk_audio) + success_count += 1 + + # 检查是否应该开始播放(借鉴 recorder.py 的预加载策略) + if (not self.is_playing and + len(self.preload_buffer) >= self.preload_size): + # 将预加载的数据移到播放缓冲区 + self.playback_buffer.extend(self.preload_buffer) + self.preload_buffer.clear() + self.is_playing = True + self.last_playback_time = time.time() + self.logger.info("开始播放TTS音频(预加载完成)") + + # 减少进度显示频率 + if chunk_count % 20 == 0: # 减少显示频率 + progress = (f"生成音频: {chunk_count} 块 | 成功: {success_count} | " + f"{total_audio_size / 1024:.1f} KB | " + f"预加载: {len(self.preload_buffer)} | " + f"播放缓冲: {len(self.playback_buffer)}") + self.logger.info(progress) + + except Exception as e: + self.logger.error(f"添加音频到预加载缓冲区失败: {e}") + continue + + continue + + if data.get("code", 0) == 20000000: + break + + if data.get("code", 0) > 0: + self.logger.error(f"TTS错误响应: {data}") + break + + except json.JSONDecodeError: + continue + + # 处理剩余的预加载数据 + if self.preload_buffer: + self.playback_buffer.extend(self.preload_buffer) + self.preload_buffer.clear() + if not self.is_playing: + self.is_playing = True + self.last_playback_time = time.time() + self.logger.info("开始播放TTS音频(处理剩余预加载)") + + success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0 + self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB") + + # 等待播放完成 + if success_count > 0: + self.logger.info("等待TTS音频播放完成...") + self._wait_for_playback_complete() + + return success_count > 0 + + finally: + response.close() + session.close() + + except Exception as e: + self.logger.error(f"TTS音频生成失败: {e}") + return False + + def _wait_for_playback_complete(self): + """等待播放完成""" + max_wait_time = 30 # 最多等待30秒 + wait_start_time = time.time() + + while (len(self.playback_buffer) > 0 or self.currently_playing) and (time.time() - wait_start_time) < max_wait_time: + # 等待播放缓冲区清空且当前播放完成 + time.sleep(0.1) + + if len(self.playback_buffer) == 0 and not self.currently_playing: + self.logger.info("TTS音频播放完成") + # 调用播放完成处理,发送完成事件 + self._finish_playback() + else: + self.logger.warning(f"TTS音频播放超时,剩余 {len(self.playback_buffer)} 块未播放") + # 清空缓冲区 + self.playback_buffer.clear() + self.preload_buffer.clear() + # 即使超时也要调用播放完成处理 + self._finish_playback() + + def process_tts_request(self, text): + """处理TTS请求的公共接口""" + return self._add_tts_task(text) if __name__ == "__main__": # 测试代码 diff --git a/control_system.py b/control_system.py index cd4e782..9c1ea2f 100644 --- a/control_system.py +++ b/control_system.py @@ -38,6 +38,7 @@ class ControlSystem: self.input_command_queue = mp.Queue(maxsize=100) # 主进程 → 输入进程 self.input_event_queue = mp.Queue(maxsize=100) # 输入进程 → 主进程 self.output_audio_queue = mp.Queue(maxsize=1000) # 主进程 → 输出进程 + self.output_event_queue = mp.Queue(maxsize=100) # 输出进程 → 主进程 # 进程 self.input_process = None @@ -214,7 +215,8 @@ class ControlSystem: self.output_process = mp.Process( target=OutputProcess( self.output_audio_queue, - output_config + output_config, + self.output_event_queue # 传递事件队列 ).run ) @@ -286,13 +288,8 @@ class ControlSystem: def _handle_playing_state(self): """处理播放状态""" - # 检查播放是否完成 - if self.output_audio_queue.qsize() == 0 and not self.playback_complete: - # 等待一小段时间确保播放完成 - time.sleep(0.5) - if self.output_audio_queue.qsize() == 0: - self.playback_complete = True - self.stats['total_conversations'] += 1 + # 现在主要由输出进程的播放完成事件驱动 + pass def _check_events(self): """检查进程事件""" @@ -307,6 +304,18 @@ class ControlSystem: except queue.Empty: pass + + # 检查输出进程事件 + try: + while True: + event = self.output_event_queue.get_nowait() + + if event.event_type == 'playback_complete': + print("📡 主控制:收到播放完成事件") + self._handle_playback_complete(event) + + except queue.Empty: + pass def _handle_recording_complete(self, event: ProcessEvent): """处理录音完成事件""" @@ -327,6 +336,21 @@ class ControlSystem: print(f"🎯 状态:RECORDING → PROCESSING (时长: {event.metadata['duration']:.2f}s)") + def _handle_playback_complete(self, event: ProcessEvent): + """处理播放完成事件""" + # 标记播放完成 + self.playback_complete = True + + # 更新统计 + self.stats['total_conversations'] += 1 + + # 切换到空闲状态 + self.state = RecordingState.IDLE + print(f"🎯 状态:PLAYING → IDLE") + + # 重新启用输入进程录音功能 + self.input_command_queue.put(ControlCommand('enable_recording')) + def _process_audio_pipeline(self): """处理音频流水线:STT + LLM + TTS""" try: @@ -390,9 +414,17 @@ class ControlSystem: def _speech_to_text(self, audio_data: bytes) -> Optional[str]: """语音转文字""" try: - return asyncio.run(self._recognize_audio_async(audio_data)) + print(f"🔍 开始语音识别,音频大小: {len(audio_data)} 字节") + result = asyncio.run(self._recognize_audio_async(audio_data)) + if result: + print(f"✅ 语音识别成功: {result}") + else: + print(f"❌ 语音识别返回空结果") + return result except Exception as e: print(f"❌ 语音识别异常: {e}") + import traceback + print(f"❌ 详细错误信息:\n{traceback.format_exc()}") return None async def _recognize_audio_async(self, audio_data: bytes) -> Optional[str]: @@ -401,7 +433,57 @@ class ControlSystem: return "语音识别功能已禁用" try: + # 验证音频数据 + print(f"🎵 音频数据验证:") + print(f" - 大小: {len(audio_data)} 字节") + print(f" - 是否为空: {len(audio_data) == 0}") + + if len(audio_data) == 0: + print("❌ 音频数据为空") + return None + + # 检查是否有WAV头部 + has_wav_header = audio_data.startswith(b'RIFF') + print(f" - 有WAV头部: {has_wav_header}") + + if has_wav_header: + # 解析WAV头部 + print(f" - WAV格式,可能需要提取PCM数据") + riff_size = int.from_bytes(audio_data[4:8], 'little') + wave_fmt = audio_data[8:12] + if wave_fmt == b'WAVE': + print(f" - WAVE格式正确") + # 查找fmt块 + fmt_pos = audio_data.find(b'fmt ') + if fmt_pos > 0: + fmt_size = int.from_bytes(audio_data[fmt_pos+4:fmt_pos+8], 'little') + audio_format = int.from_bytes(audio_data[fmt_pos+8:fmt_pos+10], 'little') + channels = int.from_bytes(audio_data[fmt_pos+10:fmt_pos+12], 'little') + sample_rate = int.from_bytes(audio_data[fmt_pos+12:fmt_pos+16], 'little') + print(f" - 音频格式: {audio_format}") + print(f" - 声道数: {channels}") + print(f" - 采样率: {sample_rate}") + else: + print(f" - 纯PCM数据") + + # 检查音频数据格式(假设是16位PCM) + if len(audio_data) % 2 != 0: + print(f"⚠️ 音频数据长度不是2的倍数: {len(audio_data)}") + + # 计算音频时长 + sample_rate = self.config['audio']['sample_rate'] + channels = self.config['audio']['channels'] + bytes_per_second = sample_rate * channels * 2 # 16位 = 2字节 + duration = len(audio_data) / bytes_per_second + print(f" - 配置采样率: {sample_rate} Hz") + print(f" - 配置声道数: {channels}") + print(f" - 估算时长: {duration:.2f} 秒") + + if duration < 0.5: + print(f"⚠️ 音频时长过短: {duration:.2f} 秒") + import websockets + print(f"🔗 连接WebSocket ASR服务: {self.api_config['asr']['ws_url']}") # 生成ASR头部 def generate_asr_header(message_type=1, message_type_specific_flags=0): @@ -417,25 +499,56 @@ class ControlSystem: header.append(0x00) # reserved return header - # 解析ASR响应 + # 解析ASR响应 - 基于recorder.py的工作实现 def parse_asr_response(res): - # 简化的响应解析 - if len(res) < 8: - return {} - + """解析ASR响应""" + PROTOCOL_VERSION = res[0] >> 4 + header_size = res[0] & 0x0f message_type = res[1] >> 4 - payload_size = int.from_bytes(res[4:8], "big", signed=False) - payload_msg = res[8:8+payload_size] + message_type_specific_flags = res[1] & 0x0f + serialization_method = res[2] >> 4 + message_compression = res[2] & 0x0f + reserved = res[3] + header_extensions = res[4:header_size * 4] + payload = res[header_size * 4:] + result = {} + payload_msg = None + payload_size = 0 + + print(f"🔍 响应头信息: message_type={message_type}, compression={message_compression}, serialization={serialization_method}") if message_type == 0b1001: # SERVER_FULL_RESPONSE - try: - if payload_msg.startswith(b'{'): - result = json.loads(payload_msg.decode('utf-8')) - return result - except: - pass + payload_size = int.from_bytes(payload[:4], "big", signed=True) + payload_msg = payload[4:] + print(f"📦 Full响应: payload_size={payload_size}") + elif message_type == 0b1011: # SERVER_ACK + seq = int.from_bytes(payload[:4], "big", signed=True) + result['seq'] = seq + if len(payload) >= 8: + payload_size = int.from_bytes(payload[4:8], "big", signed=False) + payload_msg = payload[8:] + print(f"📦 ACK响应: seq={seq}, payload_size={payload_size}") + elif message_type == 0b1111: # SERVER_ERROR_RESPONSE + code = int.from_bytes(payload[:4], "big", signed=False) + result['code'] = code + payload_size = int.from_bytes(payload[4:8], "big", signed=False) + payload_msg = payload[8:] + print(f"❌ 错误响应: code={code}") - return {} + if payload_msg is None: + return result + + if message_compression == 0b0001: # GZIP + payload_msg = gzip.decompress(payload_msg) + print(f"📦 解压后大小: {len(payload_msg)} 字节") + + if serialization_method == 0b0001: # JSON + payload_msg = json.loads(str(payload_msg, "utf-8")) + print(f"📋 解析后的JSON: {json.dumps(payload_msg, indent=2, ensure_ascii=False)}") + + result['payload_msg'] = payload_msg + result['payload_size'] = payload_size + return result # 构建请求参数 reqid = str(uuid.uuid4()) @@ -458,7 +571,7 @@ class ControlSystem: "sequence": 1 }, 'audio': { - 'format': 'wav', + 'format': 'pcm', 'rate': self.config['audio']['sample_rate'], 'language': 'zh-CN', 'bits': 16, @@ -468,6 +581,14 @@ class ControlSystem: } # 构建请求 + print(f"📋 ASR请求参数:") + print(f" - audio.format: {request_params['audio']['format']}") + print(f" - audio.rate: {request_params['audio']['rate']}") + print(f" - audio.channel: {request_params['audio']['channel']}") + print(f" - audio.bits: {request_params['audio']['bits']}") + print(f" - audio.codec: {request_params['audio']['codec']}") + print(f" - request.workflow: {request_params['request']['workflow']}") + payload_bytes = str.encode(json.dumps(request_params)) payload_bytes = gzip.compress(payload_bytes) full_client_request = bytearray(generate_asr_header()) @@ -478,43 +599,136 @@ class ControlSystem: additional_headers = {'Authorization': 'Bearer; {}'.format(self.api_config['asr']['token'])} # 连接WebSocket + print(f"📡 尝试连接WebSocket...") + print(f"🔗 WebSocket URL: {self.api_config['asr']['ws_url']}") + print(f"📋 Headers: {additional_headers}") async with websockets.connect( self.api_config['asr']['ws_url'], additional_headers=additional_headers, - max_size=1000000000 + max_size=1000000000, + ping_interval=20, + ping_timeout=60 ) as ws: + print(f"✅ WebSocket连接成功") + # 发送请求 + print(f"📤 发送ASR请求...") + print(f"📦 请求大小: {len(full_client_request)} 字节") await ws.send(full_client_request) res = await ws.recv() + print(f"📥 收到ASR响应,大小: {len(res)} 字节") result = parse_asr_response(res) + print(f"🔍 解析ASR响应: {result}") - # 发送音频数据 + # 发送音频数据 - 基于recorder.py实现 chunk_size = int(self.config['audio']['channels'] * 2 * self.config['audio']['sample_rate'] * 15000 / 1000) + print(f"🎵 开始发送音频数据:") + print(f" - 总大小: {len(audio_data)} 字节") + print(f" - 分块大小: {chunk_size} 字节") + print(f" - 预计分块数: {(len(audio_data) + chunk_size - 1) // chunk_size}") + + total_chunks = (len(audio_data) + chunk_size - 1) // chunk_size + chunks_sent = 0 + for offset in range(0, len(audio_data), chunk_size): + chunks_sent += 1 chunk = audio_data[offset:offset + chunk_size] last = (offset + chunk_size) >= len(audio_data) - payload_bytes = gzip.compress(chunk) - audio_only_request = bytearray( - generate_asr_header( - message_type=0b0010, - message_type_specific_flags=0b0010 if last else 0 - ) - ) - audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big')) - audio_only_request.extend(payload_bytes) + print(f"📦 发送第 {chunks_sent}/{total_chunks} 块:") + print(f" - 当前块大小: {len(chunk)} 字节") + print(f" - 偏移量: {offset}-{offset + len(chunk)}") + print(f" - 是否最后一块: {last}") - await ws.send(audio_only_request) - res = await ws.recv() - result = parse_asr_response(res) + try: + payload_bytes = gzip.compress(chunk) + audio_only_request = bytearray( + generate_asr_header( + message_type=0b0010, + message_type_specific_flags=0b0010 if last else 0 + ) + ) + audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big')) + audio_only_request.extend(payload_bytes) + + print(f" - 压缩后大小: {len(payload_bytes)} 字节") + print(f" - 总请求数据大小: {len(audio_only_request)} 字节") + + await ws.send(audio_only_request) + print(f" ✅ 第 {chunks_sent} 块发送成功") + + # 等待服务器响应 + try: + res = await asyncio.wait_for(ws.recv(), timeout=30.0) + print(f" 📥 收到第 {chunks_sent} 块响应,大小: {len(res)} 字节") + result = parse_asr_response(res) + print(f" 🔍 第 {chunks_sent} 块响应解析: {result}") + except asyncio.TimeoutError: + print(f" ⏰ 第 {chunks_sent} 块响应超时") + raise Exception("音频块响应超时") + + # 检查每个响应是否有错误 + if 'code' in result: + print(f" 🔍 第 {chunks_sent} 块响应码: {result['code']}") + if result['code'] != 1000: + print(f" ❌ 第 {chunks_sent} 块数据发送失败: {result}") + return None + + if 'payload_msg' in result and result['payload_msg'].get('code') != 1000: + print(f" ❌ 第 {chunks_sent} 块数据发送失败: {result['payload_msg']}") + return None + + except Exception as chunk_error: + print(f" ❌ 第 {chunks_sent} 块发送异常: {chunk_error}") + raise chunk_error + + if last: + print(f"📨 发送最后一块音频数据完成") + print(f"🎯 所有音频数据发送完成,共发送 {chunks_sent} 块") - # 获取最终结果 - if 'payload_msg' in result and 'result' in result['payload_msg']: - results = result['payload_msg']['result'] - if results: - return results[0].get('text', '识别失败') + # 检查最后一个响应中是否包含识别结果 + print(f"🎯 检查最终识别结果...") + print(f"📋 最后一个响应: {result}") + + if 'payload_msg' in result: + payload_msg = result['payload_msg'] + print(f"📋 最终Payload结构: {list(payload_msg.keys()) if isinstance(payload_msg, dict) else type(payload_msg)}") + print(f"📋 最终Payload内容: {payload_msg}") + + if isinstance(payload_msg, dict): + # 检查响应码 + if 'code' in payload_msg: + code = payload_msg['code'] + print(f"🔢 最终响应码: {code}") + if code == 1000: + print(f"✅ ASR识别成功") + else: + print(f"❌ ASR服务返回错误: {payload_msg.get('message', '未知错误')}") + return None + + # 查找结果 - 与recorder.py保持一致 + if 'result' in payload_msg: + results = payload_msg['result'] + print(f"📝 找到结果字段 'result': {results}") + if isinstance(results, list) and results: + text = results[0].get('text', '识别失败') + print(f"✅ 提取识别文本: {text}") + return text + elif isinstance(results, str): + print(f"✅ 提取识别文本: {results}") + return results + else: + print(f"❌ 未找到result字段,可用字段: {list(payload_msg.keys())}") + print(f"完整payload: {json.dumps(payload_msg, indent=2, ensure_ascii=False)}") + else: + print(f"❌ Payload不是字典类型: {type(payload_msg)}") + else: + print(f"❌ 响应中没有payload_msg字段") + print(f"可用字段: {list(result.keys())}") + if 'code' in result: + print(f"错误码: {result['code']}") return None @@ -580,9 +794,12 @@ class ControlSystem: try: print("🎵 开始文本转语音") + print(f"📝 待转换文本: {text}") # 发送元数据 - self.output_audio_queue.put(f"METADATA:{text[:30]}...") + metadata_msg = f"METADATA:{text[:30]}..." + print(f"📦 发送元数据: {metadata_msg}") + self.output_audio_queue.put(metadata_msg) # 构建请求头 headers = { @@ -614,6 +831,7 @@ class ControlSystem: # 发送请求 session = requests.Session() try: + print(f"🌐 发送TTS请求到: {self.api_config['tts']['url']}") response = session.post( self.api_config['tts']['url'], headers=headers, @@ -625,16 +843,22 @@ class ControlSystem: print(f"❌ TTS请求失败: {response.status_code}") return False + print(f"✅ TTS请求成功,开始接收音频流") + # 处理流式响应 total_audio_size = 0 chunk_count = 0 + queue_size_before = self.output_audio_queue.qsize() for chunk in response.iter_lines(decode_unicode=True): if not chunk: continue + print(f"🔍 原始TTS响应块 {chunk_count + 1}: {chunk[:100]}...") + try: data = json.loads(chunk) + print(f"🔍 解析后的TTS块 {chunk_count + 1}: {data}") if data.get("code", 0) == 0 and "data" in data and data["data"]: chunk_audio = base64.b64decode(data["data"]) @@ -642,24 +866,42 @@ class ControlSystem: total_audio_size += audio_size chunk_count += 1 + # 检查队列状态 + current_queue_size = self.output_audio_queue.qsize() + print(f"📦 发送音频块 {chunk_count}: {audio_size} 字节, 队列大小: {current_queue_size}") + # 发送到输出进程 self.output_audio_queue.put(chunk_audio) + # 检查是否发送成功 + new_queue_size = self.output_audio_queue.qsize() + if new_queue_size == current_queue_size + 1: + print(f"✅ 音频块 {chunk_count} 发送成功") + else: + print(f"⚠️ 音频块 {chunk_count} 发送后队列大小异常: {current_queue_size} -> {new_queue_size}") + # 显示进度 - if chunk_count % 10 == 0: + if chunk_count % 5 == 0: # 更频繁显示进度 progress = f"📥 TTS生成: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB" print(f"\r{progress}", end='', flush=True) - if data.get("code", 0) == 20000000: + elif data.get("code", 0) == 20000000: + print(f"🏁 收到TTS结束信号") break + elif data.get("code", 0) > 0: + print(f"❌ TTS错误响应: {data}") - except json.JSONDecodeError: + except json.JSONDecodeError as e: + print(f"❌ JSON解析错误: {e}") + print(f"原始数据: {chunk}") continue print(f"\n✅ TTS音频生成完成: {chunk_count} 块, {total_audio_size / 1024:.1f} KB") + print(f"📊 队列大小变化: {queue_size_before} -> {self.output_audio_queue.qsize()}") - # 发送结束信号 - self.output_audio_queue.put(None) + # 不再在这里发送结束信号,让输出进程自然播放完所有音频 + print(f"📦 TTS音频数据已全部发送,等待输出进程播放完成") + print(f"📊 音频队列当前大小: {self.output_audio_queue.qsize()}") return chunk_count > 0 @@ -669,6 +911,8 @@ class ControlSystem: except Exception as e: print(f"❌ 文本转语音失败: {e}") + import traceback + print(f"❌ 详细错误: {traceback.format_exc()}") return False def _display_status(self): diff --git a/enhanced_voice_detector.py b/enhanced_voice_detector.py new file mode 100644 index 0000000..4e4b399 --- /dev/null +++ b/enhanced_voice_detector.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +高级语音检测器 +结合能量+ZCR双重检测的自适应语音检测算法 +针对16000Hz采样率优化 +""" + +import numpy as np +import time +from collections import deque +from typing import Dict, Any, Optional +import pyaudio + +class EnhancedVoiceDetector: + """增强版语音检测器""" + + def __init__(self, sample_rate=16000, chunk_size=1024): + self.sample_rate = sample_rate + self.chunk_size = chunk_size + + # 历史数据窗口 + self.energy_window = deque(maxlen=100) + self.zcr_window = deque(maxlen=100) + + # 统计信息 + self.energy_stats = { + 'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0, + 'median': 0, 'q75': 0, 'q25': 0 + } + self.zcr_stats = { + 'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0, + 'median': 0, 'q75': 0, 'q25': 0 + } + + # 检测参数 + self.calibration_mode = True + self.calibration_samples = 0 + self.required_calibration = 100 # 需要100个样本来校准 + + # 自适应参数 - 调整为更敏感 + self.energy_multiplier = 1.0 # 能量阈值倍数(降低) + self.zcr_std_multiplier = 1.0 # ZCR标准差倍数(降低) + self.min_energy_threshold = 80 # 最小能量阈值(降低) + self.consecutive_voice_threshold = 2 # 连续语音检测阈值(降低) + self.consecutive_silence_threshold = 15 # 连续静音检测阈值(增加) + + # 状态跟踪 + self.consecutive_voice_count = 0 + self.consecutive_silence_count = 0 + self.last_voice_time = 0 + + # 调试信息 + self.debug_mode = True + self.voice_count = 0 + self.total_samples = 0 + self._last_voice_state = False + + def calculate_energy(self, audio_data: bytes) -> float: + """计算音频能量(RMS)""" + if len(audio_data) == 0: + return 0 + + audio_array = np.frombuffer(audio_data, dtype=np.int16) + # RMS能量计算 + rms = np.sqrt(np.mean(audio_array.astype(float) ** 2)) + return rms + + def calculate_zcr(self, audio_data: bytes) -> float: + """计算零交叉率""" + if len(audio_data) == 0: + return 0 + + audio_array = np.frombuffer(audio_data, dtype=np.int16) + zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) + zcr = zero_crossings / len(audio_array) * self.sample_rate + return zcr + + def update_statistics(self, energy: float, zcr: float): + """更新统计信息""" + self.energy_window.append(energy) + self.zcr_window.append(zcr) + + if len(self.energy_window) >= 20: + # 计算详细统计信息 + energy_array = np.array(self.energy_window) + zcr_array = np.array(self.zcr_window) + + # 基础统计 + self.energy_stats['mean'] = np.mean(energy_array) + self.energy_stats['std'] = np.std(energy_array) + self.energy_stats['min'] = np.min(energy_array) + self.energy_stats['max'] = np.max(energy_array) + self.energy_stats['median'] = np.median(energy_array) + self.energy_stats['q25'] = np.percentile(energy_array, 25) + self.energy_stats['q75'] = np.percentile(energy_array, 75) + + self.zcr_stats['mean'] = np.mean(zcr_array) + self.zcr_stats['std'] = np.std(zcr_array) + self.zcr_stats['min'] = np.min(zcr_array) + self.zcr_stats['max'] = np.max(zcr_array) + self.zcr_stats['median'] = np.median(zcr_array) + self.zcr_stats['q25'] = np.percentile(zcr_array, 25) + self.zcr_stats['q75'] = np.percentile(zcr_array, 75) + + def get_adaptive_thresholds(self) -> Dict[str, float]: + """获取自适应阈值""" + if len(self.energy_window) < 30: + # 使用更敏感的固定阈值 + return { + 'energy_threshold': 120, + 'zcr_min': 2000, + 'zcr_max': 13000 + } + + # 计算动态能量阈值 - 使用更合理的算法 + # 基于中位数和标准差,但使用更保守的倍数 + base_energy_threshold = (self.energy_stats['median'] + + self.energy_multiplier * self.energy_stats['std']) + + # 使用四分位数来避免异常值影响 + q75 = self.energy_stats['q75'] + q25 = self.energy_stats['q25'] + iqr = q75 - q25 # 四分位距 + + # 基于IQR的鲁棒阈值 - 更敏感 + iqr_threshold = q75 + 0.5 * iqr + + # 结合两种方法的阈值 - 使用更低的阈值 + energy_threshold = max(self.min_energy_threshold, + min(base_energy_threshold * 0.7, iqr_threshold)) + + # 计算动态ZCR阈值 + zcr_center = self.zcr_stats['median'] + zcr_spread = self.zcr_std_multiplier * self.zcr_stats['std'] + + # 确保ZCR范围在合理区间内 - 更宽松 + zcr_min = max(1500, min(3000, zcr_center - zcr_spread)) + zcr_max = min(14000, max(6000, zcr_center + zcr_spread * 2.0)) + + # 确保最小范围 + if zcr_max - zcr_min < 2000: + zcr_max = zcr_min + 2000 + + return { + 'energy_threshold': energy_threshold, + 'zcr_min': zcr_min, + 'zcr_max': zcr_max + } + + def is_voice_basic(self, energy: float, zcr: float) -> bool: + """基础语音检测(单帧)""" + thresholds = self.get_adaptive_thresholds() + + # 能量检测 + energy_ok = energy > thresholds['energy_threshold'] + + # ZCR检测 + zcr_ok = thresholds['zcr_min'] < zcr < thresholds['zcr_max'] + + # 双重条件 + return energy_ok and zcr_ok + + def is_voice_advanced(self, audio_data: bytes) -> Dict[str, Any]: + """高级语音检测(带状态跟踪)""" + # 计算特征 + energy = self.calculate_energy(audio_data) + zcr = self.calculate_zcr(audio_data) + + # 更新统计 + self.update_statistics(energy, zcr) + + # 总样本计数 + self.total_samples += 1 + + # 校准模式 + if self.calibration_mode: + self.calibration_samples += 1 + if self.calibration_samples >= self.required_calibration: + self.calibration_mode = False + if self.debug_mode: + print(f"\n🎯 校准完成!") + print(f" 能量统计: {self.energy_stats['median']:.0f}±{self.energy_stats['std']:.0f}") + print(f" ZCR统计: {self.zcr_stats['median']:.0f}±{self.zcr_stats['std']:.0f}") + + return { + 'is_voice': False, + 'energy': energy, + 'zcr': zcr, + 'calibrating': True, + 'calibration_progress': self.calibration_samples / self.required_calibration, + 'confidence': 0.0 + } + + # 基础检测 + is_voice_frame = self.is_voice_basic(energy, zcr) + + # 状态机处理 + if is_voice_frame: + self.consecutive_voice_count += 1 + self.consecutive_silence_count = 0 + self.last_voice_time = time.time() + else: + self.consecutive_silence_count += 1 + if self.consecutive_silence_count >= self.consecutive_silence_threshold: + self.consecutive_voice_count = 0 + + # 最终决策(需要连续检测到语音) + final_voice_detected = self.consecutive_voice_count >= self.consecutive_voice_threshold + + if final_voice_detected and not hasattr(self, '_last_voice_state') or not self._last_voice_state: + self.voice_count += 1 + + # 更新最后状态 + self._last_voice_state = final_voice_detected + + # 计算置信度 + thresholds = self.get_adaptive_thresholds() + energy_confidence = min(1.0, energy / thresholds['energy_threshold']) + zcr_confidence = 1.0 if thresholds['zcr_min'] < zcr < thresholds['zcr_max'] else 0.0 + confidence = (energy_confidence + zcr_confidence) / 2 + + return { + 'is_voice': final_voice_detected, + 'energy': energy, + 'zcr': zcr, + 'confidence': confidence, + 'energy_threshold': thresholds['energy_threshold'], + 'zcr_min': thresholds['zcr_min'], + 'zcr_max': thresholds['zcr_max'], + 'consecutive_voice_count': self.consecutive_voice_count, + 'consecutive_silence_count': self.consecutive_silence_count, + 'calibrating': False, + 'voice_detection_rate': self.voice_count / self.total_samples if self.total_samples > 0 else 0 + } + + def get_debug_info(self) -> str: + """获取调试信息""" + if self.calibration_mode: + return f"校准中: {self.calibration_samples}/{self.required_calibration}" + + thresholds = self.get_adaptive_thresholds() + return (f"能量阈值: {thresholds['energy_threshold']:.0f} | " + f"ZCR范围: {thresholds['zcr_min']:.0f}-{thresholds['zcr_max']:.0f} | " + f"检测率: {self.voice_count}/{self.total_samples} ({self.voice_count/self.total_samples*100:.1f}%)") + + def reset(self): + """重置检测器状态""" + self.energy_window.clear() + self.zcr_window.clear() + self.calibration_mode = True + self.calibration_samples = 0 + self.consecutive_voice_count = 0 + self.consecutive_silence_count = 0 + self.voice_count = 0 + self.total_samples = 0 + + +class VoiceDetectorTester: + """语音检测器测试器""" + + def __init__(self): + self.detector = EnhancedVoiceDetector() + + def run_test(self, duration=30): + """运行测试""" + print("🎙️ 增强版语音检测器测试") + print("=" * 50) + print("📊 检测算法: 能量+ZCR双重检测") + print("📈 采样率: 16000Hz") + print("🔄 自适应阈值: 启用") + print("⏱️ 测试时长: 30秒") + print("💡 请说话测试检测效果...") + print("🛑 按 Ctrl+C 提前结束") + print("=" * 50) + + try: + # 初始化音频 + audio = pyaudio.PyAudio() + stream = audio.open( + format=pyaudio.paInt16, + channels=1, + rate=16000, + input=True, + frames_per_buffer=1024 + ) + + start_time = time.time() + voice_segments = [] + current_segment = None + + while time.time() - start_time < duration: + # 读取音频数据 + data = stream.read(1024, exception_on_overflow=False) + + # 检测语音 + result = self.detector.is_voice_advanced(data) + + # 处理语音段 + if result['is_voice']: + if current_segment is None: + current_segment = { + 'start_time': time.time(), + 'start_sample': self.detector.total_samples + } + else: + if current_segment is not None: + current_segment['end_time'] = time.time() + current_segment['end_sample'] = self.detector.total_samples + current_segment['duration'] = current_segment['end_time'] - current_segment['start_time'] + voice_segments.append(current_segment) + current_segment = None + + # 显示状态 + if result['calibrating']: + progress = result['calibration_progress'] * 100 + status = f"\r🔧 校准中: {progress:.0f}% | 能量: {result['energy']:.0f} | ZCR: {result['zcr']:.0f}" + else: + status_icon = "🎤" if result['is_voice'] else "🔇" + status_color = "\033[92m" if result['is_voice'] else "\033[90m" + reset_color = "\033[0m" + + status = (f"{status_color}{status_icon} " + f"能量: {result['energy']:.0f}/{result['energy_threshold']:.0f} | " + f"ZCR: {result['zcr']:.0f} ({result['zcr_min']:.0f}-{result['zcr_max']:.0f}) | " + f"置信度: {result['confidence']:.2f} | " + f"连续: {result['consecutive_voice_count']}/{result['consecutive_silence_count']}{reset_color}") + + print(f"\r{status}", end='', flush=True) + + time.sleep(0.01) + + # 结束当前段 + if current_segment is not None: + current_segment['end_time'] = time.time() + current_segment['duration'] = current_segment['end_time'] - current_segment['start_time'] + voice_segments.append(current_segment) + + # 显示统计结果 + print(f"\n\n📊 测试结果统计:") + print(f" 总检测时长: {duration}秒") + print(f" 检测到语音段: {len(voice_segments)}") + print(f" 总语音时长: {sum(s['duration'] for s in voice_segments):.1f}秒") + print(f" 语音占比: {sum(s['duration'] for s in voice_segments)/duration*100:.1f}%") + print(f" 平均置信度: {np.mean([r['confidence'] for r in [self.detector.is_voice_advanced(b'test') for _ in range(10)]]):.2f}") + + if voice_segments: + print(f" 平均语音段时长: {np.mean([s['duration'] for s in voice_segments]):.1f}秒") + print(f" 最长语音段: {max(s['duration'] for s in voice_segments):.1f}秒") + + print(f"\n🎯 检测器状态:") + print(f" {self.detector.get_debug_info()}") + + except KeyboardInterrupt: + print(f"\n\n🛑 测试被用户中断") + except Exception as e: + print(f"\n\n❌ 测试出错: {e}") + finally: + try: + if 'stream' in locals(): + stream.stop_stream() + stream.close() + if 'audio' in locals(): + audio.terminate() + except: + pass + + +def main(): + """主函数""" + tester = VoiceDetectorTester() + tester.run_test() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/process_logger.py b/process_logger.py new file mode 100644 index 0000000..996a4df --- /dev/null +++ b/process_logger.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +日志配置模块 +为多进程录音系统提供日志记录功能 +""" + +import logging +import os +import sys +from datetime import datetime +from typing import Optional + +def setup_process_logger(process_name: str, log_dir: str = "logs") -> logging.Logger: + """ + 为进程设置日志记录器 + + Args: + process_name: 进程名称(用于日志文件名) + log_dir: 日志目录路径 + + Returns: + 配置好的日志记录器 + """ + # 创建日志目录 + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + # 生成日志文件名(包含时间戳) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = os.path.join(log_dir, f"{process_name}_{timestamp}.log") + + # 创建日志记录器 + logger = logging.getLogger(f"{process_name}_logger") + logger.setLevel(logging.DEBUG) + + # 清除现有的处理器 + logger.handlers.clear() + + # 文件处理器(记录所有级别) + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setLevel(logging.DEBUG) + + # 控制台处理器(只记录INFO及以上级别) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(logging.INFO) + + # 创建格式化器 + file_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + console_formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(message)s', + datefmt='%H:%M:%S' + ) + + # 设置格式化器 + file_handler.setFormatter(file_formatter) + console_handler.setFormatter(console_formatter) + + # 添加处理器 + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + logger.info(f"日志系统初始化完成 - 进程: {process_name}") + logger.info(f"日志文件: {log_file}") + + return logger + +class ProcessLogger: + """进程日志包装器""" + + def __init__(self, process_name: str, log_dir: str = "logs"): + self.process_name = process_name + self.logger = setup_process_logger(process_name, log_dir) + + def debug(self, message: str): + """调试日志""" + self.logger.debug(f"[{self.process_name}] {message}") + + def info(self, message: str): + """信息日志""" + self.logger.info(f"[{self.process_name}] {message}") + + def warning(self, message: str): + """警告日志""" + self.logger.warning(f"[{self.process_name}] {message}") + + def error(self, message: str): + """错误日志""" + self.logger.error(f"[{self.process_name}] {message}") + + def critical(self, message: str): + """严重错误日志""" + self.logger.critical(f"[{self.process_name}] {message}") \ No newline at end of file diff --git a/quick_test.py b/quick_test.py deleted file mode 100644 index 456ba0d..0000000 --- a/quick_test.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -快速测试脚本 -用于验证多进程录音系统的基础功能 -""" - -import time -import multiprocessing as mp -from audio_processes import InputProcess, OutputProcess - -def test_audio_processes(): - """测试音频进程类""" - print("🧪 测试音频进程类...") - - # 创建测试队列 - command_queue = mp.Queue() - event_queue = mp.Queue() - audio_queue = mp.Queue() - - # 创建进程配置 - config = { - 'zcr_min': 3000, - 'zcr_max': 10000, - 'min_recording_time': 3.0, - 'max_recording_time': 10.0, # 缩短测试时间 - 'silence_threshold': 3.0, - 'pre_record_duration': 2.0, - 'voice_activation_threshold': 5, # 降低阈值便于测试 - 'calibration_samples': 50, # 减少校准时间 - 'adaptive_threshold': True - } - - # 创建输入进程 - input_process = InputProcess(command_queue, event_queue, config) - - # 创建输出进程 - output_process = OutputProcess(audio_queue) - - print("✅ 音频进程类创建成功") - - # 测试配置加载 - print("📋 测试配置:") - print(f" ZCR范围: {config['zcr_min']} - {config['zcr_max']}") - print(f" 校准样本数: {config['calibration_samples']}") - print(f" 语音激活阈值: {config['voice_activation_threshold']}") - - return True - -def test_dependencies(): - """测试依赖库""" - print("🔍 检查依赖库...") - - dependencies = { - 'numpy': False, - 'pyaudio': False, - 'requests': False, - 'websockets': False - } - - try: - import numpy - dependencies['numpy'] = True - print("✅ numpy") - except ImportError: - print("❌ numpy") - - try: - import pyaudio - dependencies['pyaudio'] = True - print("✅ pyaudio") - except ImportError: - print("❌ pyaudio") - - try: - import requests - dependencies['requests'] = True - print("✅ requests") - except ImportError: - print("❌ requests") - - try: - import websockets - dependencies['websockets'] = True - print("✅ websockets") - except ImportError: - print("❌ websockets") - - missing = [dep for dep, installed in dependencies.items() if not installed] - if missing: - print(f"❌ 缺少依赖: {', '.join(missing)}") - return False - else: - print("✅ 所有依赖都已安装") - return True - -def main(): - """主测试函数""" - print("🚀 多进程录音系统快速测试") - print("=" * 50) - - # 测试依赖 - if not test_dependencies(): - print("❌ 依赖检查失败") - return False - - print() - - # 测试音频进程 - if not test_audio_processes(): - print("❌ 音频进程测试失败") - return False - - print() - print("✅ 所有测试通过!") - print("💡 现在可以运行主程序:") - print(" python multiprocess_recorder.py") - - return True - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/start_with_logging.py b/start_with_logging.py new file mode 100644 index 0000000..1476e88 --- /dev/null +++ b/start_with_logging.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +启动脚本示例 +演示如何使用带日志记录的多进程录音系统 +""" + +import os +import sys +import argparse +from datetime import datetime + +def ensure_logs_directory(): + """确保日志目录存在""" + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + print(f"✅ 创建日志目录: {log_dir}") + return log_dir + +def cleanup_old_logs(log_dir="logs", max_files=10): + """清理旧的日志文件""" + if not os.path.exists(log_dir): + return + + log_files = [] + for file in os.listdir(log_dir): + if file.endswith('.log'): + file_path = os.path.join(log_dir, file) + log_files.append((file_path, os.path.getmtime(file_path))) + + # 按修改时间排序,删除最旧的文件 + log_files.sort(key=lambda x: x[1]) + + while len(log_files) > max_files: + oldest_file = log_files[0][0] + try: + os.remove(oldest_file) + print(f"🗑️ 删除旧日志文件: {oldest_file}") + log_files.pop(0) + except Exception as e: + print(f"⚠️ 删除日志文件失败 {oldest_file}: {e}") + break + +def main(): + """主函数""" + parser = argparse.ArgumentParser( + description='带日志记录的多进程录音系统启动器', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +使用示例: + python start_with_logging.py # 使用默认设置 + python start_with_logging.py --clean-logs # 清理旧日志 + python start_with_logging.py --log-dir my_logs # 指定日志目录 + """ + ) + + parser.add_argument('--character', '-c', type=str, default='libai', + help='选择角色 (默认: libai)') + parser.add_argument('--log-dir', type=str, default='logs', + help='日志目录路径 (默认: logs)') + parser.add_argument('--clean-logs', action='store_true', + help='清理旧日志文件') + parser.add_argument('--max-log-files', type=int, default=10, + help='保留的最大日志文件数量 (默认: 10)') + parser.add_argument('--config', type=str, + help='配置文件路径') + parser.add_argument('--verbose', '-v', action='store_true', + help='详细输出') + + args = parser.parse_args() + + print("🚀 带日志记录的多进程录音系统") + print("=" * 60) + + # 确保日志目录存在 + log_dir = ensure_logs_directory() + + # 清理旧日志文件 + if args.clean_logs: + cleanup_old_logs(log_dir, args.max_log_files) + + # 显示日志配置信息 + print(f"📁 日志目录: {log_dir}") + print(f"🎭 角色: {args.character}") + print("=" * 60) + + # 导入主模块并启动 + try: + # 修改sys.argv以传递参数给主程序 + sys.argv = ['multiprocess_recorder.py'] + if args.character: + sys.argv.extend(['-c', args.character]) + if args.config: + sys.argv.extend(['--config', args.config]) + if args.verbose: + sys.argv.append('--verbose') + + # 导入并运行主程序 + import multiprocess_recorder + multiprocess_recorder.main() + + except KeyboardInterrupt: + print("\n👋 用户中断") + except Exception as e: + print(f"❌ 启动失败: {e}") + if args.verbose: + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_voice_detection.py b/test_voice_detection.py deleted file mode 100644 index c0d3c42..0000000 --- a/test_voice_detection.py +++ /dev/null @@ -1,194 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -语音检测测试脚本 -用于测试和调试ZCR语音检测功能 -""" - -import numpy as np -import time -import pyaudio -from audio_processes import InputProcess -import multiprocessing as mp -import queue - -class VoiceDetectionTester: - """语音检测测试器""" - - def __init__(self): - self.FORMAT = pyaudio.paInt16 - self.CHANNELS = 1 - self.RATE = 16000 - self.CHUNK_SIZE = 1024 - - # 测试参数 - self.test_duration = 30 # 测试30秒 - self.zcr_history = [] - self.voice_count = 0 - - # 音频设备 - self.audio = None - self.stream = None - - def setup_audio(self): - """设置音频设备""" - try: - self.audio = pyaudio.PyAudio() - self.stream = self.audio.open( - format=self.FORMAT, - channels=self.CHANNELS, - rate=self.RATE, - input=True, - frames_per_buffer=self.CHUNK_SIZE - ) - print("✅ 音频设备初始化成功") - return True - except Exception as e: - print(f"❌ 音频设备初始化失败: {e}") - return False - - def calculate_zcr(self, audio_data): - """计算零交叉率""" - if len(audio_data) == 0: - return 0 - - audio_array = np.frombuffer(audio_data, dtype=np.int16) - zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) - zcr = zero_crossings / len(audio_array) * self.RATE - return zcr - - def test_detection(self): - """测试语音检测""" - print("🎙️ 开始语音检测测试") - print("=" * 50) - - # 环境校准阶段 - print("🔍 第一阶段:环境噪音校准 (10秒)") - print("请保持安静,不要说话...") - - calibration_samples = [] - start_time = time.time() - - try: - while time.time() - start_time < 10: - data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) - if len(data) > 0: - zcr = self.calculate_zcr(data) - calibration_samples.append(zcr) - - # 显示进度 - progress = (time.time() - start_time) / 10 * 100 - print(f"\r校准进度: {progress:.1f}%", end='', flush=True) - - time.sleep(0.01) - - print("\n✅ 环境校准完成") - - # 计算统计数据 - if calibration_samples: - avg_zcr = np.mean(calibration_samples) - std_zcr = np.std(calibration_samples) - min_zcr = min(calibration_samples) - max_zcr = max(calibration_samples) - - print(f"📊 环境噪音统计:") - print(f" 平均ZCR: {avg_zcr:.0f}") - print(f" 标准差: {std_zcr:.0f}") - print(f" 最小值: {min_zcr:.0f}") - print(f" 最大值: {max_zcr:.0f}") - - # 建议的检测阈值 - suggested_min = max(2400, avg_zcr + 2 * std_zcr) - suggested_max = min(12000, avg_zcr + 6 * std_zcr) - - print(f"\n🎯 建议的语音检测阈值:") - print(f" 最小阈值: {suggested_min:.0f}") - print(f" 最大阈值: {suggested_max:.0f}") - - # 测试检测 - print(f"\n🎙️ 第二阶段:语音检测测试 (20秒)") - print("现在请说话,测试语音检测...") - - voice_threshold = suggested_min - silence_threshold = suggested_max - - consecutive_voice = 0 - voice_detected = False - - test_start = time.time() - - while time.time() - test_start < 20: - data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False) - if len(data) > 0: - zcr = self.calculate_zcr(data) - - # 简单的语音检测 - is_voice = voice_threshold < zcr < silence_threshold - - if is_voice: - consecutive_voice += 1 - if consecutive_voice >= 5 and not voice_detected: - voice_detected = True - self.voice_count += 1 - print(f"\n🎤 检测到语音 #{self.voice_count}! ZCR: {zcr:.0f}") - else: - consecutive_voice = 0 - if voice_detected: - voice_detected = False - print(f" 语音结束,持续时间: {time.time() - last_voice_time:.1f}秒") - - if voice_detected: - last_voice_time = time.time() - - # 实时显示ZCR值 - status = "🎤" if voice_detected else "🔇" - print(f"\r{status} ZCR: {zcr:.0f} | 阈值: {voice_threshold:.0f}-{silence_threshold:.0f} | " - f"连续语音: {consecutive_voice}/5", end='', flush=True) - - time.sleep(0.01) - - print(f"\n\n✅ 测试完成!共检测到 {self.voice_count} 次语音") - - except KeyboardInterrupt: - print("\n🛑 测试被用户中断") - except Exception as e: - print(f"\n❌ 测试过程中出错: {e}") - - def cleanup(self): - """清理资源""" - if self.stream: - try: - self.stream.stop_stream() - self.stream.close() - except: - pass - - if self.audio: - try: - self.audio.terminate() - except: - pass - - def run_test(self): - """运行完整测试""" - print("🚀 语音检测测试工具") - print("=" * 60) - - if not self.setup_audio(): - print("❌ 无法初始化音频设备,测试终止") - return - - try: - self.test_detection() - finally: - self.cleanup() - print("\n👋 测试结束") - -def main(): - """主函数""" - tester = VoiceDetectionTester() - tester.run_test() - -if __name__ == "__main__": - main() \ No newline at end of file