diff --git a/audio_processes.py b/audio_processes.py index 1838d96..735228c 100644 --- a/audio_processes.py +++ b/audio_processes.py @@ -114,7 +114,7 @@ class InputProcess: self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc" self.tts_resource_id = "volc.service_type.10029" self.tts_app_key = "aGjiRDfUWi" - self.tts_speaker = "zh_female_wanqudashu_moon_bigtts" + self.tts_speaker = config.get('tts_speaker', "zh_female_wanqudashu_moon_bigtts") if config else "zh_female_wanqudashu_moon_bigtts" # 启动 TTS 工作线程 self._start_tts_worker() @@ -448,8 +448,6 @@ class InputProcess: # 生成音频数据 self._generate_tts_audio(content) - self.tts_task_queue.task_done() - except queue.Empty: continue except Exception as e: @@ -591,10 +589,19 @@ class OutputProcess: self.playback_completed = False # 播放完成标志 self.end_signal_received = False # 结束信号接收标志 - # 智能缓冲系统 - 借鉴 recorder.py 的缓冲策略 - self.preload_buffer = [] # 预加载缓冲区 + # 智能缓冲系统 - 借鉴 recorder.py 的智能句子累积策略 + self.preload_buffer = [] # 预加载缓冲区(保留用于音频块) self.preload_size = 3 # 预加载3个音频块 + + # 智能句子缓冲系统 - 从 recorder.py 借鉴的核心机制 + self.tts_buffer = [] # 智能句子缓冲区 + self.tts_buffer_max_size = 3 # 最多缓冲3个句子 + self.tts_buffer_min_size = 1 # 最少1个句子 + self.tts_accumulation_time = 0.2 # 200ms积累窗口 + self.tts_last_trigger_time = 0 # 上次触发TTS的时间 + self.tts_pending_sentences = [] # 待处理的句子 self.min_buffer_size = 1 # 最小缓冲区大小 + self.audio_device_healthy = True # 音频设备健康状态 # 统一播放工作线程 - 核心改进 @@ -606,6 +613,14 @@ class OutputProcess: self.playback_timeout = 5.0 # 播放超时时间(秒) self.completion_sent = False # 防止重复发送完成事件 + # 增强的播放完成检测状态 + self.llm_generation_complete = False # LLM生成是否完成 + self.tts_generation_complete = False # TTS生成是否完成 + self.all_audio_received = False # 所有音频数据是否已接收 + self.pre_buffer_empty = False # 预缓冲区是否为空 + self.playback_buffer_empty = False # 播放缓冲区是否为空 + self.no_active_playback = False # 是否没有活跃的播放 + # PyAudio实例 self.audio = None self.output_stream = None @@ -624,7 +639,7 @@ class OutputProcess: self.tts_access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc" self.tts_resource_id = "volc.service_type.10029" self.tts_app_key = "aGjiRDfUWi" - self.tts_speaker = "zh_female_wanqudashu_moon_bigtts" + self.tts_speaker = config.get('tts_speaker', "zh_female_wanqudashu_moon_bigtts") if config else "zh_female_wanqudashu_moon_bigtts" # 启动工作线程 - 先启动播放线程,再启动TTS线程 print("🔊 准备启动播放工作线程...") @@ -825,7 +840,10 @@ class OutputProcess: # 关键修复:即使正在播放,也要检查播放完成 if self.end_signal_received: - self._check_playback_completion() + # 使用增强的播放完成检测 + if self._check_enhanced_playback_completion(): + print(f"📥 播放状态时增强播放完成检测通过,处理结束信号") + self._finish_playback() time.sleep(0.1) # 播放时增加延迟减少CPU使用 continue @@ -846,7 +864,10 @@ class OutputProcess: # 5. 主动检查播放完成(无论什么状态都要检查) if self.end_signal_received: - self._check_playback_completion() + # 使用增强的播放完成检测 + if self._check_enhanced_playback_completion(): + print(f"📥 主循环增强播放完成检测通过,处理结束信号") + self._finish_playback() # 6. 借鉴 recorder.py: 根据播放状态调整休眠时间,优化性能 if self.is_playing and (self.playback_buffer or self.preload_buffer): @@ -907,6 +928,7 @@ class OutputProcess: print(f"📥 输出进程收到结束信号") end_signal_received = True self.end_signal_received = True + self.all_audio_received = True # 重置完成事件标记 self.completion_sent = False @@ -914,18 +936,13 @@ class OutputProcess: self.playback_completed = False print(f"📥 已重置所有播放完成相关标志") - # 检查是否应该立即结束 - tts_queue_size = self.tts_task_queue.qsize() - playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer) - - if tts_queue_size == 0 and playback_queue_size == 0 and not self.currently_playing: - # 双重确认机制:所有队列已清空且播放工作线程已停止 - print(f"📥 双重确认通过:TTS队列={tts_queue_size}, 播放缓冲={len(self.playback_buffer)}, 预加载={len(self.preload_buffer)}, 正在播放={self.currently_playing}") - print(f"📥 所有音频已播放完成,处理结束信号") + # 使用增强的播放完成检测 + if self._check_enhanced_playback_completion(): + print(f"📥 增强播放完成检测通过,处理结束信号") self._finish_playback() return else: - print(f"📥 延迟处理结束信号 - TTS队列: {tts_queue_size}, 播放缓冲: {playback_queue_size}, 正在播放: {self.currently_playing}") + print(f"📥 延迟处理结束信号 - 等待LLM、TTS和播放完成") # 重新放回队列,稍后重试 self.audio_queue.put(None) time.sleep(0.05) @@ -937,6 +954,39 @@ class OutputProcess: print(f"📥 输出进程收到元数据: {metadata}") continue + # 处理来自ControlSystem的流式文本命令 + if isinstance(audio_data, str) and audio_data.startswith("STREAMING_TEXT:"): + # 流式文本处理 - 智能缓冲 + streaming_text = audio_data[15:] # 移除 "STREAMING_TEXT:" 前缀 + print(f"📥 输出进程收到流式文本: {streaming_text}") + self.process_streaming_text(streaming_text) + continue + + if isinstance(audio_data, str) and audio_data.startswith("COMPLETE_TEXT:"): + # 完整文本处理 - 强制刷新缓冲区 + complete_text = audio_data[14:] # 移除 "COMPLETE_TEXT:" 前缀 + print(f"📥 输出进程收到完整文本: {complete_text}") + self.process_complete_text(complete_text) + continue + + if isinstance(audio_data, str) and audio_data.startswith("FLUSH_TTS_BUFFER:"): + # 刷新TTS缓冲区 - 确保所有内容都被处理 + print(f"📥 输出进程收到刷新缓冲区命令") + self._flush_tts_buffer() + continue + + if isinstance(audio_data, str) and audio_data.startswith("LLM_COMPLETE:"): + # LLM生成完成信号 + print(f"📥 输出进程收到LLM生成完成信号") + self.llm_generation_complete = True + continue + + if isinstance(audio_data, str) and audio_data.startswith("TTS_COMPLETE:"): + # TTS生成完成信号 + print(f"📥 输出进程收到TTS生成完成信号") + self.tts_generation_complete = True + continue + # 音频数据处理 if isinstance(audio_data, bytes): # 更新最后收到音频数据的时间 @@ -985,44 +1035,28 @@ class OutputProcess: except queue.Empty: # 队列为空时的处理 if end_signal_received: - tts_queue_size = self.tts_task_queue.qsize() - playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer) - - print(f"📥 队列空时检查播放完成: TTS={tts_queue_size}, 播放缓冲={len(self.playback_buffer)}, 预加载={len(self.preload_buffer)}, 正在播放={self.currently_playing}") - - if tts_queue_size == 0 and playback_queue_size == 0: - # 三重确认机制:所有缓冲区都空了,检查播放状态 - if not self.currently_playing: - print(f"📥 三重确认通过:TTS队列=0, 播放缓冲=0, 预加载=0, 播放状态=False") - print(f"📥 调用 _finish_playback() 前,completion_sent={self.completion_sent}") - self._finish_playback() - print(f"📥 调用 _finish_playback() 后,completion_sent={self.completion_sent}") - return - else: - # 播放工作线程可能还在播放最后一个块,等待一下 - print(f"📥 等待播放工作线程完成...") - time.sleep(0.3) # 增加等待时间,确保播放完成 - if not self.currently_playing: - print(f"📥 三重确认通过(等待后):TTS队列=0, 播放缓冲=0, 预加载=0, 播放状态=False") - print(f"📥 调用 _finish_playback() 前,completion_sent={self.completion_sent}") - self._finish_playback() - print(f"📥 调用 _finish_playback() 后,completion_sent={self.completion_sent}") - return - else: - print(f"📥 播放工作线程仍在播放,继续等待...") - elif tts_queue_size == 0 and playback_queue_size > 0: - # 还有数据要播放,继续等待 - print(f"📥 还有 {playback_queue_size} 个音频块待播放,等待播放完成") - time.sleep(0.2) # 增加等待时间 - elif tts_queue_size == 0 and len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0: - # 关键修复:播放缓冲区为空但预加载缓冲区还有数据,需要转移数据 - print(f"📥 播放缓冲区为空但预加载缓冲区有 {len(self.preload_buffer)} 个数据块,转移数据到播放缓冲区") - transfer_count = min(len(self.preload_buffer), 3) # 一次转移最多3个块 - for _ in range(transfer_count): - if self.preload_buffer: - self.playback_buffer.append(self.preload_buffer.pop(0)) - print(f"📥 已转移 {transfer_count} 个数据块到播放缓冲区") - time.sleep(0.2) # 增加等待时间 + # 使用增强的播放完成检测 + if self._check_enhanced_playback_completion(): + print(f"📥 队列空时增强播放完成检测通过,处理结束信号") + self._finish_playback() + return + else: + print(f"📥 队列空时增强播放完成检测未通过,继续等待") + # 如果还有数据要播放,继续等待 + tts_queue_size = self.tts_task_queue.qsize() + playback_queue_size = len(self.playback_buffer) + len(self.preload_buffer) + if playback_queue_size > 0: + print(f"📥 还有 {playback_queue_size} 个音频块待播放,等待播放完成") + time.sleep(0.2) # 增加等待时间 + elif tts_queue_size == 0 and len(self.playback_buffer) == 0 and len(self.preload_buffer) > 0: + # 关键修复:播放缓冲区为空但预加载缓冲区还有数据,需要转移数据 + print(f"📥 播放缓冲区为空但预加载缓冲区有 {len(self.preload_buffer)} 个数据块,转移数据到播放缓冲区") + transfer_count = min(len(self.preload_buffer), 3) # 一次转移最多3个块 + for _ in range(transfer_count): + if self.preload_buffer: + self.playback_buffer.append(self.preload_buffer.pop(0)) + print(f"📥 已转移 {transfer_count} 个数据块到播放缓冲区") + time.sleep(0.2) # 增加等待时间 # 检查是否应该补充播放缓冲区的数据 if not self.is_playing and len(self.preload_buffer) >= self.min_buffer_size: @@ -1047,7 +1081,10 @@ class OutputProcess: else: # 关键修复:即使没有处理数据,也要检查播放完成 if self.end_signal_received: - self._check_playback_completion() + # 使用增强的播放完成检测 + if self._check_enhanced_playback_completion(): + print(f"📥 无数据处理时增强播放完成检测通过,处理结束信号") + self._finish_playback() time.sleep(0.01) @@ -1105,6 +1142,18 @@ class OutputProcess: self.preload_buffer.clear() self.last_playback_time = 0 + # 重置所有增强的状态标志 + self.llm_generation_complete = False + self.tts_generation_complete = False + self.all_audio_received = False + self.pre_buffer_empty = False + self.playback_buffer_empty = False + self.no_active_playback = False + self.end_signal_received = False + self.playback_completed = False + + print("📡 输出进程:已重置所有播放完成状态标志") + # 通知主进程播放完成 if self.event_queue: try: @@ -1128,6 +1177,79 @@ class OutputProcess: time.sleep(0.5) print("📡 输出进程:播放完成逻辑执行完毕") + def _check_enhanced_playback_completion(self): + """增强的播放完成检测 - 考虑LLM、TTS和缓冲区状态""" + if not self.end_signal_received: + return False + + # 更新状态变量 + self.pre_buffer_empty = (len(self.preload_buffer) == 0) + self.playback_buffer_empty = (len(self.playback_buffer) == 0) + self.no_active_playback = (not self.currently_playing) # 同步更新状态变量 + + tts_queue_size = self.tts_task_queue.qsize() + + print(f"🔍 增强播放完成检查:") + print(f" - LLM生成完成: {self.llm_generation_complete}") + print(f" - TTS生成完成: {self.tts_generation_complete}") + print(f" - 所有音频已接收: {self.all_audio_received}") + print(f" - 预缓冲区为空: {self.pre_buffer_empty}") + print(f" - 播放缓冲区为空: {self.playback_buffer_empty}") + print(f" - 无活跃播放: {self.no_active_playback}") + print(f" - TTS队列大小: {tts_queue_size}") + + # 检查是否所有条件都满足 + all_conditions_met = ( + self.llm_generation_complete and + self.tts_generation_complete and + self.all_audio_received and + self.pre_buffer_empty and + self.playback_buffer_empty and + not self.currently_playing and # 直接检查当前播放状态 + tts_queue_size == 0 + ) + + if all_conditions_met: + print(f"✅ 所有播放完成条件已满足,可以结束播放") + return True + + # 如果LLM和TTS都完成了,但还有音频数据,等待播放完成 + if (self.llm_generation_complete and + self.tts_generation_complete and + self.all_audio_received and + tts_queue_size == 0): + + if self.pre_buffer_empty and self.playback_buffer_empty: + if not self.currently_playing: # 直接检查当前播放状态 + print(f"✅ LLM和TTS完成,所有缓冲区已清空,播放器空闲") + return True + else: + print(f"⏳ 等待最后的音频播放完成...") + time.sleep(0.5) + if not self.currently_playing: + print(f"✅ 最后的音频播放完成") + return True + else: + print(f"⏳ 等待缓冲区数据播放完成 - 预缓冲: {len(self.preload_buffer)}, 播放缓冲: {len(self.playback_buffer)}") + return False + + # 如果LLM还未完成,但其他条件满足,等待LLM完成 + if not self.llm_generation_complete: + print(f"⏳ 等待LLM生成完成...") + return False + + # 如果TTS还未完成,但LLM已完成,等待TTS完成 + if not self.tts_generation_complete: + print(f"⏳ 等待TTS生成完成...") + return False + + # 如果音频还未完全接收,等待接收完成 + if not self.all_audio_received: + print(f"⏳ 等待所有音频接收完成...") + return False + + return False + def _check_playback_completion(self): """检查播放完成状态 - 独立的播放完成检测方法""" if not self.end_signal_received: @@ -1235,8 +1357,6 @@ class OutputProcess: # 生成音频数据并发送到统一播放队列 self._generate_tts_audio(content) - self.tts_task_queue.task_done() - except queue.Empty: continue except Exception as e: @@ -1370,6 +1490,10 @@ class OutputProcess: success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0 self.logger.info(f"TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB") + # 通知自己TTS生成已完成 + self.tts_generation_complete = True + print(f"🎵 OutputProcess TTS生成已完成") + # 等待播放完成 if success_count > 0: self.logger.info("等待TTS音频播放完成...") @@ -1406,9 +1530,170 @@ class OutputProcess: # 即使超时也要调用播放完成处理 self._finish_playback() + # ========== 智能句子缓冲系统 - 从 recorder.py 借鉴 ========== + + def _should_trigger_tts(self, sentence): + """智能判断是否应该触发TTS - 借鉴 recorder.py 的策略""" + current_time = time.time() + + # 检查缓冲区大小 + if len(self.tts_buffer) >= self.tts_buffer_max_size: + return True + + # 检查时间窗口 + time_since_last = current_time - self.tts_last_trigger_time + if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size: + return True + + # 检查是否为完整句子(使用严格的检测) + if self._is_complete_sentence(sentence): + return True + + # 检查句子特征 - 长句子优先(50字符以上) + if len(sentence) > 50: # 超过50字符的句子立即触发 + return True + + # 中等长度句子(30-50字符)如果有结束标点也触发 + if len(sentence) > 30: + end_punctuations = ['。', '!', '?', '.', '!', '?'] + if any(sentence.strip().endswith(p) for p in end_punctuations): + return True + + # 短句子只在缓冲区较多或时间窗口到期时触发 + return False + + def _process_tts_buffer(self): + """处理TTS缓冲区 - 发送累积的句子到TTS""" + if not self.tts_buffer: + return + + # 合并缓冲区的句子 + combined_text = ''.join(self.tts_buffer) + + # 添加到TTS任务队列 + if self._add_tts_task(combined_text): + print(f"🎵 触发TTS: {combined_text[:50]}...") + self.tts_last_trigger_time = time.time() + + # 清空缓冲区 + self.tts_buffer.clear() + + def _add_sentence_to_buffer(self, sentence): + """添加句子到智能缓冲区 - 核心方法""" + if not sentence.strip(): + return + + self.tts_buffer.append(sentence) + + # 检查是否应该触发TTS + if self._should_trigger_tts(sentence): + self._process_tts_buffer() + + def _flush_tts_buffer(self): + """强制刷新TTS缓冲区 - 确保所有内容都被处理""" + if self.tts_buffer: + self._process_tts_buffer() + + def _is_complete_sentence(self, text): + """检测是否为完整句子 - 从 recorder.py 完全移植""" + import re + + if not text or len(text.strip()) == 0: + return False + + # 增加最小长度要求 - 至少10个字符才考虑作为完整句子 + if len(text.strip()) < 10: + return False + + # 句子结束标点符号 + sentence_endings = r'[。!?.!?]' + + # 检查是否以句子结束符结尾 + if re.search(sentence_endings + r'\s*$', text): + # 对于以结束符结尾的句子,要求至少15个字符 + if len(text.strip()) >= 15: + return True + + # 检查是否包含句子结束符(可能在句子中间) + if re.search(sentence_endings, text): + # 如果后面有其他标点或字符,可能不是完整句子 + remaining_text = re.split(sentence_endings, text, 1)[-1] + if len(remaining_text.strip()) > 0: + return False + # 对于包含结束符的句子,要求至少20个字符 + if len(text.strip()) >= 20: + return True + + # 对于较长的文本(超过50字符),即使没有结束符也可以考虑 + if len(text.strip()) >= 50: + return True + + # 对于中等长度的文本,如果包含常见完整句式模式 + if len(text.strip()) >= 20: + common_patterns = [ + r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]', + r'^(你好|谢谢|再见|是的|不是|好的|没问题)', + r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得 + ] + + for pattern in common_patterns: + if re.match(pattern, text): + return True + + return False + + def _filter_parentheses_content(self, text): + """过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植""" + import re + + # 移除中文括号内容:(内容) + filtered_text = re.sub(r'([^)]*)', '', text) + # 移除英文括号内容:(content) + filtered_text = re.sub(r'\([^)]*\)', '', filtered_text) + # 移除方括号内容:【内容】 + filtered_text = re.sub(r'【[^】]*】', '', filtered_text) + # 移除方括号内容:[content] + filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text) + # 移除书名号内容:「内容」 + filtered_text = re.sub(r'「[^」]*」', '', filtered_text) + + # 清理多余空格 + filtered_text = re.sub(r'\s+', ' ', filtered_text).strip() + + return filtered_text + + def process_streaming_text(self, text): + """处理流式文本 - 新增的公共接口,用于与LLM流式输出集成""" + if not text.strip(): + return + + # 过滤括号内容 + filtered_text = self._filter_parentheses_content(text.strip()) + + if filtered_text: + # 使用智能句子缓冲系统 + self._add_sentence_to_buffer(filtered_text) + + def process_complete_text(self, text): + """处理完整文本 - 强制刷新缓冲区""" + if not text.strip(): + return + + # 过滤括号内容 + filtered_text = self._filter_parentheses_content(text.strip()) + + if filtered_text: + # 直接添加到缓冲区并强制处理 + self.tts_buffer.append(filtered_text) + self._process_tts_buffer() + + # ========== 原有方法保持不变 ========== + def process_tts_request(self, text): - """处理TTS请求的公共接口""" - return self._add_tts_task(text) + """处理TTS请求的公共接口 - 兼容原有接口""" + # 使用新的智能缓冲系统 + self.process_complete_text(text) + return True if __name__ == "__main__": # 测试代码 diff --git a/control_system.py b/control_system.py index e428c0b..f5def4b 100644 --- a/control_system.py +++ b/control_system.py @@ -222,7 +222,8 @@ class ControlSystem: output_config = { 'buffer_size': 1000, 'show_progress': True, - 'progress_interval': 100 + 'progress_interval': 100, + 'tts_speaker': self.api_config['tts']['speaker'] } self.output_process = mp.Process( @@ -397,29 +398,26 @@ class ControlSystem: else: text = "语音识别功能已禁用" - # 2. 大语言模型 (LLM) + # 2. 大语言模型 (LLM) - 使用流式处理支持智能缓冲 if self.config['processing']['enable_llm']: - response = self._call_llm(text) - if not response: + success = self._call_llm_streaming(text) + if not success: print("❌ 大语言模型调用失败") self._handle_processing_failure() return - - print(f"💬 AI回复: {response}") else: + # 如果禁用LLM,直接发送文本到TTS response = "大语言模型功能已禁用" - - # 3. 文本转语音 (TTS) - if self.config['processing']['enable_tts']: - success = self._text_to_speech_streaming(response) - if not success: - print("❌ 文本转语音失败") - self._handle_processing_failure() - return - else: - print("ℹ️ 文本转语音功能已禁用") - # 直接发送结束信号 - self.output_audio_queue.put(None) + if self.config['processing']['enable_tts']: + success = self._send_text_to_output_process(response) + if not success: + print("❌ 文本转语音失败") + self._handle_processing_failure() + return + else: + print("ℹ️ 文本转语音功能已禁用") + # 直接发送结束信号 + self.output_audio_queue.put(None) # 标记处理完成 self.processing_complete = True @@ -814,6 +812,287 @@ class ControlSystem: print(f"❌ 大语言模型调用失败: {e}") return None + def _call_llm_streaming(self, text: str) -> bool: + """流式调用大语言模型并实时处理响应""" + if not self.config['processing']['enable_llm']: + return False + + try: + print("🤖 调用大语言模型(流式输出)...") + + # 获取角色配置 + character_config = self._load_character_config(self.config['processing']['character']) + if character_config and "system_prompt" in character_config: + system_prompt = character_config["system_prompt"] + else: + system_prompt = "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。" + + # 获取角色配置中的最大token数 + max_tokens = 50 + if character_config and "max_tokens" in character_config: + max_tokens = character_config["max_tokens"] + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_config['llm']['api_key']}" + } + + # 构建消息列表 + messages = [ + { + "role": "system", + "content": system_prompt + }, + { + "role": "user", + "content": text + } + ] + + data = { + "model": self.api_config['llm']['model'], + "messages": messages, + "max_tokens": max_tokens, + "stream": True # 启用流式输出 + } + + # 使用流式请求 + response = requests.post( + self.api_config['llm']['api_url'], + headers=headers, + json=data, + stream=True, + timeout=30 + ) + + if response.status_code == 200: + print("🔄 开始接收流式响应...") + + # 处理流式响应 + accumulated_text = "" + sentence_buffer = "" + + # 使用行缓冲区处理多字节字符 + buffer = "" + for line_bytes in response.iter_lines(): + if not line_bytes: + continue + + # 解码为字符串,处理可能的编码问题 + try: + line = line_bytes.decode('utf-8') + except UnicodeDecodeError: + try: + line = line_bytes.decode('latin-1') + except Exception: + continue + + # 添加到缓冲区 + buffer += line + + # 检查是否包含完整的数据行 + while buffer: + # 查找完整的数据行 + if '\n' in buffer: + line, buffer = buffer.split('\n', 1) + else: + line = buffer + buffer = "" + + line = line.strip() + if not line: + continue + + try: + # 解析SSE格式的响应 + if line.startswith("data: "): + data_str = line[6:] # 移除 "data: " 前缀 + + if data_str == "[DONE]": + break + + # 尝试解析JSON + try: + chunk_data = json.loads(data_str) + except json.JSONDecodeError as e: + # 尝试修复JSON:检查是否是不完整的字符串 + if not data_str.endswith('}'): + # 这可能是一个不完整的JSON,等待下一行 + buffer = data_str + buffer + continue + + print(f"⚠️ JSON解析错误: {e}") + continue + + # 处理解析后的数据 + if "choices" in chunk_data and len(chunk_data["choices"]) > 0: + delta = chunk_data["choices"][0].get("delta", {}) + content = delta.get("content", "") + + if content: + accumulated_text += content + sentence_buffer += content + + # 检查是否有完整句子 + if self._is_complete_sentence(sentence_buffer): + print(f"📝 检测到完整句子: {sentence_buffer}") + + # 过滤括号内容 + filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip()) + + if filtered_sentence: + print(f"🎵 发送到智能缓冲区: {filtered_sentence}") + # 发送到输出进程的智能缓冲系统 + self._send_streaming_text_to_output_process(filtered_sentence) + + # 重置句子缓冲区 + sentence_buffer = "" + + # 显示进度 + print(f"\r💬 已生成: {accumulated_text}", end='', flush=True) + + except Exception as e: + print(f"⚠️ 处理流式响应时出错: {e}") + continue + + print(f"\n✅ 流式响应完成: {accumulated_text}") + + # 强制刷新缓冲区,确保所有内容都被处理 + if sentence_buffer: + filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip()) + if filtered_sentence: + print(f"🎵 发送剩余句子: {filtered_sentence}") + self._send_streaming_text_to_output_process(filtered_sentence) + + # 通知输出进程LLM生成已完成 + self._notify_llm_complete() + + # 通知输出进程所有文本已发送完成 + self._flush_output_process_tts_buffer() + + return accumulated_text != "" + + else: + print(f"❌ LLM API调用失败: {response.status_code}") + return False + + except Exception as e: + print(f"❌ 流式LLM调用失败: {e}") + return False + + def _send_streaming_text_to_output_process(self, text: str): + """发送流式文本到输出进程的智能缓冲系统""" + try: + # 发送特殊命令到输出进程,指示这是流式文本 + streaming_command = f"STREAMING_TEXT:{text}" + self.output_audio_queue.put(streaming_command) + return True + except Exception as e: + print(f"❌ 发送流式文本失败: {e}") + return False + + def _send_text_to_output_process(self, text: str) -> bool: + """发送完整文本到输出进程""" + try: + # 发送特殊命令到输出进程,指示这是完整文本 + complete_command = f"COMPLETE_TEXT:{text}" + self.output_audio_queue.put(complete_command) + return True + except Exception as e: + print(f"❌ 发送文本失败: {e}") + return False + + def _flush_output_process_tts_buffer(self): + """通知输出进程刷新TTS缓冲区""" + try: + # 发送刷新缓冲区命令 + flush_command = "FLUSH_TTS_BUFFER:" + self.output_audio_queue.put(flush_command) + return True + except Exception as e: + print(f"❌ 刷新缓冲区失败: {e}") + return False + + def _notify_llm_complete(self): + """通知输出进程LLM生成已完成""" + try: + # 发送LLM完成信号 + llm_complete_command = "LLM_COMPLETE:" + self.output_audio_queue.put(llm_complete_command) + print("📡 主控制:已发送LLM生成完成信号") + return True + except Exception as e: + print(f"❌ 发送LLM完成信号失败: {e}") + return False + + def _is_complete_sentence(self, text): + """检测是否为完整句子 - 从 recorder.py 移植""" + import re + + if not text or len(text.strip()) == 0: + return False + + # 增加最小长度要求 - 至少10个字符才考虑作为完整句子 + if len(text.strip()) < 10: + return False + + # 句子结束标点符号 + sentence_endings = r'[。!?.!?]' + + # 检查是否以句子结束符结尾 + if re.search(sentence_endings + r'\s*$', text): + # 对于以结束符结尾的句子,要求至少15个字符 + if len(text.strip()) >= 15: + return True + + # 检查是否包含句子结束符(可能在句子中间) + if re.search(sentence_endings, text): + # 如果后面有其他标点或字符,可能不是完整句子 + remaining_text = re.split(sentence_endings, text, 1)[-1] + if len(remaining_text.strip()) > 0: + return False + # 对于包含结束符的句子,要求至少20个字符 + if len(text.strip()) >= 20: + return True + + # 对于较长的文本(超过50字符),即使没有结束符也可以考虑 + if len(text.strip()) >= 50: + return True + + # 对于中等长度的文本,如果包含常见完整句式模式 + if len(text.strip()) >= 20: + common_patterns = [ + r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]', + r'^(你好|谢谢|再见|是的|不是|好的|没问题)', + r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得 + ] + + for pattern in common_patterns: + if re.match(pattern, text): + return True + + return False + + def _filter_parentheses_content(self, text): + """过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植""" + import re + + # 移除中文括号内容:(内容) + filtered_text = re.sub(r'([^)]*)', '', text) + # 移除英文括号内容:(content) + filtered_text = re.sub(r'\([^)]*\)', '', filtered_text) + # 移除方括号内容:【内容】 + filtered_text = re.sub(r'【[^】]*】', '', filtered_text) + # 移除方括号内容:[content] + filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text) + # 移除书名号内容:「内容」 + filtered_text = re.sub(r'「[^」]*」', '', filtered_text) + + # 清理多余空格 + filtered_text = re.sub(r'\s+', ' ', filtered_text).strip() + + return filtered_text + def _text_to_speech_streaming(self, text: str) -> bool: """文本转语音(流式)""" if not self.config['processing']['enable_tts']: @@ -949,6 +1228,15 @@ class ControlSystem: time.sleep(0.5) + # 发送TTS完成信号 + print(f"📦 发送TTS完成信号到输出进程") + try: + tts_complete_command = "TTS_COMPLETE:" + self.output_audio_queue.put(tts_complete_command) + print(f"📡 已发送TTS完成信号到输出进程") + except Exception as e: + print(f"❌ 发送TTS完成信号失败: {e}") + # 发送结束信号,通知输出进程所有音频已发送完成 print(f"📦 发送结束信号到输出进程") print(f"📊 音频队列当前大小: {self.output_audio_queue.qsize()}") diff --git a/logs/InputProcess_20250921_115617.log b/logs/InputProcess_20250921_115617.log new file mode 100644 index 0000000..361903e --- /dev/null +++ b/logs/InputProcess_20250921_115617.log @@ -0,0 +1,6 @@ +2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess +2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_115617.log +2025-09-21 11:56:17 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动 +2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess +2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_115617.log +2025-09-21 11:56:17 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动 diff --git a/logs/OutputProcess_20250921_115614.log b/logs/OutputProcess_20250921_115614.log new file mode 100644 index 0000000..d88b7a8 --- /dev/null +++ b/logs/OutputProcess_20250921_115614.log @@ -0,0 +1,8 @@ +2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志系统初始化完成 - 进程: OutputProcess +2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志文件: logs/OutputProcess_20250921_115614.log +2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] 播放工作线程已启动 +2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] TTS工作线程已启动 +2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志系统初始化完成 - 进程: OutputProcess +2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志文件: logs/OutputProcess_20250921_115614.log +2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] 播放工作线程已启动 +2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] TTS工作线程已启动