From 5a7e39f5b5f2fec0b568b11f386b5b55e97d84e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Sat, 20 Sep 2025 20:13:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E5=A3=B0=E5=BE=85=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- recorder.py | 705 ++++++++++++++++++++++++++++++++++++++++------ test_streaming.py | 108 +++++++ 2 files changed, 724 insertions(+), 89 deletions(-) create mode 100644 test_streaming.py diff --git a/recorder.py b/recorder.py index 28c46fd..77aee99 100644 --- a/recorder.py +++ b/recorder.py @@ -132,8 +132,318 @@ class EnergyBasedRecorder: self.frame_count = 0 self.start_time = time.time() + # 流式TTS优化系统 + self.tts_task_queue = queue.Queue(maxsize=10) # TTS任务队列 + self.tts_buffer = [] # 智能缓冲区 + self.tts_buffer_max_size = 3 # 最多缓冲3个句子 + self.tts_buffer_min_size = 1 # 最少1个句子 + self.tts_accumulation_time = 0.2 # 200ms积累窗口 + self.tts_last_trigger_time = 0 + self.tts_worker_running = True + self.tts_worker_thread = None + self.tts_pending_sentences = [] # 待处理的句子 + + # 统一音频播放系统 + self.audio_playback_queue = queue.Queue(maxsize=1000) # 统一音频播放队列,大队列确保不会截断 + self.playback_worker_running = True + self.playback_worker_thread = None + self.currently_playing = False # 当前是否正在播放 + self.last_playback_time = 0 # 最后一次播放时间戳 + self.playback_cooldown_period = 1.5 # 播放冷却时间(秒) + + # 启动工作线程 + self._start_tts_worker() + self._start_playback_worker() + self._setup_audio() + def _start_tts_worker(self): + """启动TTS工作线程""" + self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True) + self.tts_worker_thread.start() + print("🎵 TTS工作线程已启动") + + def _start_playback_worker(self): + """启动音频播放工作线程""" + self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True) + self.playback_worker_thread.start() + print("🔊 音频播放工作线程已启动") + + def _tts_worker(self): + """TTS工作线程 - 处理TTS任务队列""" + while self.tts_worker_running: + try: + # 从队列获取任务 + task = self.tts_task_queue.get(timeout=1.0) + if task is None: # 结束信号 + break + + task_type, content = task + if task_type == "tts_sentence": + # 生成音频数据并发送到统一播放队列 + self._generate_tts_audio(content) + + self.tts_task_queue.task_done() + + except queue.Empty: + continue + except Exception as e: + print(f"❌ TTS工作线程错误: {e}") + time.sleep(0.1) + + def _playback_worker(self): + """音频播放工作线程 - 处理统一音频播放队列""" + print("🔊 播放工作线程开始运行...") + + # 等待音频设备就绪 + time.sleep(0.5) + + # 创建音频播放流 + playback_stream = None + try: + playback_stream = self.audio.open( + format=self.FORMAT, + channels=self.CHANNELS, + rate=self.RATE, + output=True, + frames_per_buffer=512 + ) + print("🔊 音频播放流已创建") + except Exception as e: + print(f"❌ 创建音频播放流失败: {e}") + return + + chunks_played = 0 + total_size = 0 + + try: + while self.playback_worker_running: + try: + # 从播放队列获取音频数据 + audio_chunk = self.audio_playback_queue.get(timeout=1.0) + + if audio_chunk is None: # 结束信号 + print("🔊 收到播放结束信号") + break + + if isinstance(audio_chunk, str) and audio_chunk.startswith("METADATA:"): + # 处理元数据(如文本信息) + metadata = audio_chunk[9:] # 移除 "METADATA:" 前缀 + print(f"📝 播放元数据: {metadata}") + self.audio_playback_queue.task_done() + continue + + # 播放音频块 + if audio_chunk and len(audio_chunk) > 0: + self.currently_playing = True + self.last_playback_time = time.time() # 更新最后播放时间 + playback_stream.write(audio_chunk) + chunks_played += 1 + total_size += len(audio_chunk) + + # 减少进度显示频率 + if chunks_played % 10 == 0: + print(f"\r🔊 统一播放: {chunks_played} 块 | {total_size / 1024:.1f} KB", end='', flush=True) + + self.audio_playback_queue.task_done() + + except queue.Empty: + # 队列空时,检查是否正在播放 + self.currently_playing = False + continue + except Exception as e: + print(f"❌ 播放工作线程错误: {e}") + self.currently_playing = False + time.sleep(0.1) + + print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB") + + finally: + self.currently_playing = False + if playback_stream: + playback_stream.stop_stream() + playback_stream.close() + print("🔊 音频播放流已关闭") + + def _add_tts_task(self, content): + """添加TTS任务到队列""" + try: + self.tts_task_queue.put_nowait(("tts_sentence", content)) + return True + except queue.Full: + print("⚠️ TTS任务队列已满,丢弃任务") + return False + + def _generate_tts_audio(self, text): + """生成TTS音频数据并发送到统一播放队列""" + if not self.enable_tts: + return False + + try: + print(f"🎵 生成TTS音频: {text[:50]}...") + + # 添加元数据到播放队列 + try: + self.audio_playback_queue.put_nowait(f"METADATA:{text[:30]}...") + except queue.Full: + print("⚠️ 播放队列满,跳过元数据") + + # 构建请求头 + headers = { + "X-Api-App-Id": self.tts_app_id, + "X-Api-Access-Key": self.tts_access_key, + "X-Api-Resource-Id": self.tts_resource_id, + "X-Api-App-Key": self.tts_app_key, + "Content-Type": "application/json", + "Connection": "keep-alive" + } + + # 构建请求参数 + payload = { + "user": { + "uid": "recorder_tts_generator" + }, + "req_params": { + "text": text, + "speaker": self.tts_speaker, + "audio_params": { + "format": "pcm", + "sample_rate": 16000, + "enable_timestamp": True + }, + "additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}" + } + } + + # 发送请求 + session = requests.Session() + try: + response = session.post(self.tts_url, headers=headers, json=payload, stream=True) + + if response.status_code != 200: + print(f"❌ TTS请求失败: {response.status_code}") + return False + + # 处理流式响应 + total_audio_size = 0 + chunk_count = 0 + success_count = 0 + + print("🔄 开始接收TTS音频流...") + + for chunk in response.iter_lines(decode_unicode=True): + if not chunk: + continue + + try: + data = json.loads(chunk) + + if data.get("code", 0) == 0 and "data" in data and data["data"]: + chunk_audio = base64.b64decode(data["data"]) + audio_size = len(chunk_audio) + total_audio_size += audio_size + chunk_count += 1 + + # 将音频块发送到统一播放队列 + try: + self.audio_playback_queue.put_nowait(chunk_audio) + success_count += 1 + except queue.Full: + # 队列满时稍微等待 + time.sleep(0.01) + try: + self.audio_playback_queue.put_nowait(chunk_audio) + success_count += 1 + except queue.Full: + # 如果还是满的,跳过这个块 + continue + + # 减少进度显示频率 + if chunk_count % 5 == 0: + print(f"\r📥 生成音频: {chunk_count} 块 | 成功: {success_count} | {total_audio_size / 1024:.1f} KB", end='', flush=True) + continue + + if data.get("code", 0) == 20000000: + break + + if data.get("code", 0) > 0: + print(f"\n❌ TTS错误响应: {data}") + break + + except json.JSONDecodeError: + continue + + success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0 + print(f"\n✅ TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB") + + return success_count > 0 + + finally: + response.close() + session.close() + + except Exception as e: + print(f"❌ TTS音频生成失败: {e}") + return False + + def _should_trigger_tts(self, sentence): + """智能判断是否应该触发TTS""" + current_time = time.time() + + # 检查缓冲区大小 + if len(self.tts_buffer) >= self.tts_buffer_max_size: + return True + + # 检查时间窗口 + time_since_last = current_time - self.tts_last_trigger_time + if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size: + return True + + # 检查句子特征 - 长句子优先(调整为100字符,防止回声) + if len(sentence) > 100: # 超过100字符的长句子立即触发 + return True + + # 中等长度句子(80-100字符)如果有结束标点也触发 + if len(sentence) > 80: + end_punctuations = ['。', '!', '?', '.', '!', '?'] + if any(sentence.strip().endswith(p) for p in end_punctuations): + return True + + # 短句子只在缓冲区较多或时间窗口到期时触发 + return False + + def _process_tts_buffer(self): + """处理TTS缓冲区""" + if not self.tts_buffer: + return + + # 合并缓冲区的句子 + combined_text = ''.join(self.tts_buffer) + + # 添加到TTS任务队列 + if self._add_tts_task(combined_text): + print(f"🎵 触发TTS: {combined_text[:50]}...") + self.tts_last_trigger_time = time.time() + + # 清空缓冲区 + self.tts_buffer.clear() + + def _add_sentence_to_buffer(self, sentence): + """添加句子到智能缓冲区""" + if not sentence.strip(): + return + + self.tts_buffer.append(sentence) + + # 检查是否应该触发TTS + if self._should_trigger_tts(sentence): + self._process_tts_buffer() + + def _flush_tts_buffer(self): + """强制刷新TTS缓冲区""" + if self.tts_buffer: + self._process_tts_buffer() + def _load_available_characters(self): """加载可用角色列表""" characters = [] @@ -583,88 +893,56 @@ class EnergyBasedRecorder: time.sleep(0.3) def play_audio_realtime(self, audio_queue): - """真正的实时流式播放:从队列中获取音频并立即播放""" + """音频队列转发器:将音频数据转发到统一播放队列""" try: - print("🔊 启动实时音频播放器...") + print("🔊 启动音频队列转发器...") - # 确保音频输入已停止 - if self.recording: - self.recording = False - self.recorded_frames = [] - self.recording_start_time = None - self.last_sound_time = None - - # 清空缓冲区 - self.pre_record_buffer = [] - self.energy_history = [] - self.zcr_history = [] - - # 关闭输入流 - if self.stream: - self.stream.stop_stream() - self.stream.close() - self.stream = None - - self.is_playing = True - time.sleep(0.3) # 等待音频设备切换 - - # 创建播放流 - playback_stream = self.audio.open( - format=self.FORMAT, - channels=self.CHANNELS, - rate=self.RATE, - output=True, - frames_per_buffer=1024 # 较小的缓冲区以实现更快的响应 - ) - - print("🚫 音频输入已关闭,实时播放器就绪") - - chunks_played = 0 + chunks_forwarded = 0 total_size = 0 - # 持续从队列中获取音频数据并播放 + # 持续从队列中获取音频数据并转发到统一播放队列 while True: try: # 设置超时以避免无限等待 - chunk = audio_queue.get(timeout=1.0) + chunk = audio_queue.get(timeout=0.5) if chunk is None: # 结束信号 - print("📥 收到播放结束信号") + print("\n📥 收到转发结束信号") break if chunk: # 确保chunk不为空 - playback_stream.write(chunk) - chunks_played += 1 - total_size += len(chunk) - - # 显示播放进度 - print(f"\r🔊 实时播放: {chunks_played} 块 | {total_size / 1024:.1f} KB", end='', flush=True) + # 转发到统一播放队列 + try: + self.audio_playback_queue.put_nowait(chunk) + chunks_forwarded += 1 + total_size += len(chunk) + + # 减少进度显示频率以提高性能 + if chunks_forwarded % 10 == 0: + print(f"\r📤 转发音频: {chunks_forwarded} 块 | {total_size / 1024:.1f} KB", end='', flush=True) + except queue.Full: + print("⚠️ 统一播放队列满,丢弃音频块") + continue audio_queue.task_done() except queue.Empty: # 队列为空,检查是否还在接收数据 if not hasattr(self, '_receiving_audio') or not self._receiving_audio: - print("\n📡 音频接收完成,播放器结束") + print("\n📡 音频接收完成,转发器结束") break + # 继续等待,不要显示太多调试信息 continue except Exception as e: - print(f"\n❌ 播放过程中出错: {e}") + print(f"\n❌ 转发过程中出错: {e}") break - # 确保播放流正确关闭 - playback_stream.stop_stream() - playback_stream.close() - - print(f"\n✅ 实时播放完成: {chunks_played} 块, {total_size / 1024:.1f} KB") + print(f"\n✅ 音频转发完成: {chunks_forwarded} 块, {total_size / 1024:.1f} KB") return True except Exception as e: - print(f"\n❌ 实时播放失败: {e}") + print(f"\n❌ 音频转发失败: {e}") return False - finally: - self.is_playing = False - time.sleep(0.3) def play_audio_hybrid(self, audio_chunks): """混合模式播放:智能选择流式或传统播放""" @@ -834,13 +1112,21 @@ class EnergyBasedRecorder: # 调用文本转语音 if self.enable_tts: print("-" * 50) - tts_file = self.text_to_speech(llm_response) - if tts_file: - print("✅ AI语音回复完成") + # 检查是否已经在流式处理中播放了语音 + if not hasattr(self, '_streaming_tts_completed') or not self._streaming_tts_completed: + tts_file = self.text_to_speech(llm_response) + if tts_file: + print("✅ AI语音回复完成") + # 删除录音文件 + self._safe_delete_file(filename, "录音文件") + else: + print("❌ 文本转语音失败") + else: + print("✅ AI语音回复已完成(流式播放)") # 删除录音文件 self._safe_delete_file(filename, "录音文件") - else: - print("❌ 文本转语音失败") + # 重置流式标记 + self._streaming_tts_completed = False else: print("ℹ️ 文本转语音功能已禁用") # 如果不启用TTS,直接删除录音文件 @@ -862,7 +1148,19 @@ class EnergyBasedRecorder: elif websockets is None: print("ℹ️ 请安装 websockets 库以启用语音识别功能") - print("🔄 准备重新开启音频输入") + # 等待音频播放完成后再重新开启音频输入 + print("⏳ 等待音频播放完成...") + while not self.audio_playback_queue.empty() or self.currently_playing: + queue_size = self.audio_playback_queue.qsize() + playing_status = "播放中" if self.currently_playing else "等待播放" + print(f"\r🔊 {playing_status}... 队列: {queue_size}", end='', flush=True) + time.sleep(0.1) + + # 额外等待1秒,确保音频设备完全停止 + print("\n⏳ 等待音频设备完全停止...") + time.sleep(1.0) + + print("🔄 音频播放完成,准备重新开启音频输入") self.recording = False self.recorded_frames = [] @@ -950,10 +1248,21 @@ class EnergyBasedRecorder: if len(data) == 0: continue - # 如果正在播放,完全跳过音频处理 - if self.is_playing: + # 检查播放冷却期 - 防止回声 + current_time = time.time() + time_since_last_play = current_time - self.last_playback_time + in_cooldown = time_since_last_play < self.playback_cooldown_period + + # 如果正在播放、播放队列不为空、或在冷却期内,完全跳过音频处理 + if self.is_playing or self.currently_playing or not self.audio_playback_queue.empty() or in_cooldown: # 显示播放状态 - status = "🔊 播放中... 跳过录音处理" + queue_size = self.audio_playback_queue.qsize() + if in_cooldown: + cooldown_time = self.playback_cooldown_period - time_since_last_play + status = f"🔊 播放冷却中... {cooldown_time:.1f}s 队列: {queue_size}" + else: + playing_status = "播放中" if self.currently_playing else "等待播放" + status = f"🔊 {playing_status}... 队列: {queue_size}" print(f"\r{status}", end='', flush=True) time.sleep(0.1) # 播放时增加延迟减少CPU使用 continue @@ -1058,6 +1367,18 @@ class EnergyBasedRecorder: if self.recording: self.stop_recording() + # 停止TTS工作线程 + self.tts_worker_running = False + if self.tts_worker_thread: + self.tts_task_queue.put(None) # 发送结束信号 + self.tts_worker_thread.join(timeout=2.0) + + # 停止播放工作线程 + self.playback_worker_running = False + if self.playback_worker_thread: + self.audio_playback_queue.put(None) # 发送结束信号 + self.playback_worker_thread.join(timeout=3.0) + if self.stream: self.stream.stop_stream() self.stream.close() @@ -1219,12 +1540,12 @@ class EnergyBasedRecorder: return None def call_llm(self, user_message): - """调用大语言模型API""" + """调用大语言模型API - 支持流式输出""" if not self.enable_llm: return None try: - print("🤖 调用大语言模型...") + print("🤖 调用大语言模型(流式输出)...") # 获取角色配置中的系统提示词 if self.character_config and "system_prompt" in self.character_config: @@ -1266,25 +1587,123 @@ class EnergyBasedRecorder: data = { "model": self.llm_model, "messages": messages, - "max_tokens": max_tokens + "max_tokens": max_tokens, + "stream": True # 启用流式输出 } - response = requests.post(self.llm_api_url, headers=headers, json=data, timeout=30) + # 使用流式请求 + response = requests.post(self.llm_api_url, headers=headers, json=data, stream=True, timeout=30) if response.status_code == 200: - result = response.json() - if "choices" in result and len(result["choices"]) > 0: - llm_response = result["choices"][0]["message"]["content"] - # 过滤括号中的旁白内容 - filtered_response = self._filter_parentheses_content(llm_response.strip()) + print("🔄 开始接收流式响应...") + + # 处理流式响应 + accumulated_text = "" + sentence_buffer = "" + + print("🔍 开始接收SSE流...") + + # 使用行缓冲区处理多字节字符 + buffer = "" + for line_bytes in response.iter_lines(): + if not line_bytes: + continue - # 保存到历史记录 + # 解码为字符串,处理可能的编码问题 + try: + line = line_bytes.decode('utf-8') + except UnicodeDecodeError: + try: + line = line_bytes.decode('latin-1') + except Exception: + continue + + # 添加到缓冲区 + buffer += line + + # 检查是否包含完整的数据行 + while buffer: + # 查找完整的数据行 + if '\n' in buffer: + line, buffer = buffer.split('\n', 1) + else: + line = buffer + buffer = "" + + line = line.strip() + if not line: + continue + + try: + # 解析SSE格式的响应 + if line.startswith("data: "): + data_str = line[6:] # 移除 "data: " 前缀 + + if data_str == "[DONE]": + break + + # 尝试解析JSON,增加错误处理 + try: + chunk_data = json.loads(data_str) + except json.JSONDecodeError as e: + # 尝试修复JSON:检查是否是不完整的字符串 + if not data_str.endswith('}'): + # 这可能是一个不完整的JSON,等待下一行 + buffer = data_str + buffer + continue + + print(f"\n🔍 调试 - 原始行: {repr(line)}") + print(f"🔍 调试 - 数据字符串: {repr(data_str)}") + print(f"🔍 调试 - JSON错误: {e}") + continue + + # 处理解析后的数据 + if "choices" in chunk_data and len(chunk_data["choices"]) > 0: + delta = chunk_data["choices"][0].get("delta", {}) + content = delta.get("content", "") + + if content: + accumulated_text += content + sentence_buffer += content + + # 检查是否有完整句子 + if self._is_complete_sentence(sentence_buffer): + print(f"📝 检测到完整句子: {sentence_buffer}") + + # 过滤括号内容 + filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip()) + + if filtered_sentence: + print(f"🎵 添加到智能缓冲区: {filtered_sentence}") + # 使用智能缓冲系统替代立即调用TTS + self._add_sentence_to_buffer(filtered_sentence) + + # 标记已经进行了流式TTS播放 + self._streaming_tts_completed = True + + # 重置句子缓冲区 + sentence_buffer = "" + + # 显示进度 + print(f"\r💬 已生成: {accumulated_text}", end='', flush=True) + + except Exception as e: + print(f"\n⚠️ 处理流式响应时出错: {e}") + print(f"🔍 调试 - 问题行: {repr(line)}") + continue + + print(f"\n✅ 流式响应完成: {accumulated_text}") + + # 强制刷新TTS缓冲区,确保所有内容都被处理 + self._flush_tts_buffer() + + # 保存完整回复到历史记录 + if accumulated_text: + filtered_response = self._filter_parentheses_content(accumulated_text.strip()) self._add_to_chat_history(user_message, filtered_response) - return filtered_response - else: - print("❌ LLM API响应格式错误") - return None + return accumulated_text + else: print(f"❌ LLM API调用失败: {response.status_code}") print(f"响应内容: {response.text}") @@ -1317,6 +1736,86 @@ class EnergyBasedRecorder: return filtered_text + def _is_complete_sentence(self, text): + """检测是否为完整句子""" + import re + + if not text or len(text.strip()) == 0: + return False + + # 句子结束标点符号 + sentence_endings = r'[。!?.!?]' + + # 检查是否以句子结束符结尾 + if re.search(sentence_endings + r'\s*$', text): + return True + + # 检查是否包含句子结束符(可能在句子中间) + if re.search(sentence_endings, text): + # 如果后面有其他标点或字符,可能不是完整句子 + remaining_text = re.split(sentence_endings, text, 1)[-1] + if len(remaining_text.strip()) > 0: + return False + return True + + # 对于较短的文本,如果包含常见完整句式模式 + common_patterns = [ + r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]', + r'^(你好|谢谢|再见|是的|不是|好的|没问题)', + r'^[\u4e00-\u9fff]{2,4}[的得了]$' # 2-4个中文字+的/了/得 + ] + + for pattern in common_patterns: + if re.match(pattern, text): + return True + + return False + + def _filter_parentheses_content(self, text): + """过滤文本中的括号内容(包括中文和英文括号)""" + import re + + # 移除中文括号内容:(内容) + filtered_text = re.sub(r'([^)]*)', '', text) + # 移除英文括号内容:(content) + filtered_text = re.sub(r'\([^)]*\)', '', filtered_text) + # 移除方括号内容:【内容】 + filtered_text = re.sub(r'【[^】]*】', '', filtered_text) + # 移除方括号内容:[content] + filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text) + # 移除书名号内容:「内容」 + filtered_text = re.sub(r'「[^」]*」', '', filtered_text) + + # 清理多余空格 + filtered_text = re.sub(r'\s+', ' ', filtered_text).strip() + + return filtered_text + + def _add_to_chat_history(self, user_message, assistant_response): + """添加对话到历史记录""" + # 如果历史记录超过最大长度,移除最早的记录 + if len(self.chat_history) >= self.max_history_length: + self.chat_history.pop(0) + + # 添加新的对话记录 + self.chat_history.append({ + "user": user_message, + "assistant": assistant_response, + "timestamp": time.time() + }) + + def clear_chat_history(self): + """清空聊天历史""" + self.chat_history = [] + print("💬 聊天历史已清空") + + def get_chat_history_summary(self): + """获取聊天历史摘要""" + if not self.chat_history: + return "暂无聊天历史" + + return f"当前有 {len(self.chat_history)} 轮对话记录" + def _add_to_chat_history(self, user_message, assistant_response): """添加对话到历史记录""" # 如果历史记录超过最大长度,移除最早的记录 @@ -1358,13 +1857,24 @@ class EnergyBasedRecorder: timestamp = time.strftime("%Y%m%d_%H%M%S") return f"tts_response_{timestamp}.pcm" - def text_to_speech(self, text): - """文本转语音 - 真正实时流式播放""" + def text_to_speech(self, text, mode='normal', use_worker=True): + """文本转语音 - 真正实时流式播放 + mode: 'normal' 普通模式, 'buffered' 智能缓冲模式 + use_worker: 是否使用工作线程模式 + """ if not self.enable_tts: return None try: - print("🔊 开始文本转语音(实时流式播放)...") + if mode == 'buffered': + print("🎵 开始智能缓冲TTS处理...") + else: + print("🔊 开始文本转语音(实时流式播放)...") + + # 如果是在工作线程中,不启动播放器 + if use_worker and hasattr(self, 'is_playing') and self.is_playing: + print("⚠️ 工作线程模式,跳过播放器启动") + return None # 构建请求头 headers = { @@ -1393,8 +1903,8 @@ class EnergyBasedRecorder: } } - # 创建音频队列 - audio_queue = queue.Queue() + # 创建音频队列 - 设置更大的队列大小以减少中断 + audio_queue = queue.Queue(maxsize=50) # 增加队列大小 # 启动实时播放线程 self._receiving_audio = True @@ -1403,6 +1913,9 @@ class EnergyBasedRecorder: player_thread.start() print("🎵 实时播放器已启动") + # 给播放器一点时间启动 + time.sleep(0.1) + # 发送请求 session = requests.Session() try: @@ -1415,11 +1928,14 @@ class EnergyBasedRecorder: audio_queue.put(None) return None - # 处理流式响应 - 实时播放模式 + # 处理流式响应 - 根据模式优化 total_audio_size = 0 chunk_count = 0 - print("🔄 开始接收TTS音频流(实时播放)...") + if mode == 'buffered': + print("🔄 开始接收TTS音频流(智能缓冲模式)...") + else: + print("🔄 开始接收TTS音频流(实时播放)...") for chunk in response.iter_lines(decode_unicode=True): if not chunk: @@ -1434,11 +1950,22 @@ class EnergyBasedRecorder: total_audio_size += audio_size chunk_count += 1 - # 将音频块放入队列进行实时播放 - audio_queue.put(chunk_audio) + # 将音频块放入队列进行实时播放 - 使用非阻塞方式 + try: + audio_queue.put_nowait(chunk_audio) + except queue.Full: + # 如果队列满了,稍微等待一下 + time.sleep(0.01) + try: + audio_queue.put_nowait(chunk_audio) + except queue.Full: + # 如果还是满的,跳过这个块以避免阻塞 + print(f"\n⚠️ 音频队列已满,跳过块 {chunk_count}") + continue - # 显示接收进度 - print(f"\r📥 接收并播放: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB", end='', flush=True) + # 减少进度显示频率以提高性能 + if chunk_count % 5 == 0: + print(f"\r📥 接收并播放: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB", end='', flush=True) continue if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]: diff --git a/test_streaming.py b/test_streaming.py new file mode 100644 index 0000000..7ae3c3e --- /dev/null +++ b/test_streaming.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +测试流式响应解析的脚本 +""" + +import json +import requests +import os + +def test_streaming_response(): + """测试流式响应解析""" + + # 检查API密钥 + api_key = os.environ.get("ARK_API_KEY") + if not api_key: + print("❌ 请设置 ARK_API_KEY 环境变量") + return + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + data = { + "messages": [ + { + "content": "你是一个智能助手,请回答问题。", + "role": "system" + }, + { + "content": "你好,请简单介绍一下自己", + "role": "user" + } + ], + "model": "doubao-1-5-pro-32k-250115", + "stream": True + } + + print("🚀 开始测试流式响应...") + + try: + response = requests.post( + "https://ark.cn-beijing.volces.com/api/v3/chat/completions", + headers=headers, + json=data, + stream=True, + timeout=30 + ) + + print(f"📊 响应状态: {response.status_code}") + + if response.status_code != 200: + print(f"❌ 请求失败: {response.text}") + return + + print("🔍 开始解析流式响应...") + + accumulated_text = "" + line_count = 0 + + for line in response.iter_lines(decode_unicode=True): + line_count += 1 + + if not line or not line.strip(): + continue + + # 预处理 + line = line.strip() + + print(f"\n--- 第{line_count}行 ---") + print(f"原始内容: {repr(line)}") + + if line.startswith("data: "): + data_str = line[6:] # 移除 "data: " 前缀 + print(f"处理后: {repr(data_str)}") + + if data_str == "[DONE]": + print("✅ 流结束") + break + + try: + chunk_data = json.loads(data_str) + print(f"✅ JSON解析成功: {chunk_data}") + + if "choices" in chunk_data and len(chunk_data["choices"]) > 0: + delta = chunk_data["choices"][0].get("delta", {}) + content = delta.get("content", "") + + if content: + accumulated_text += content + print(f"💬 累计内容: {accumulated_text}") + + except json.JSONDecodeError as e: + print(f"❌ JSON解析失败: {e}") + print(f"🔍 问题数据: {repr(data_str)}") + except Exception as e: + print(f"❌ 其他错误: {e}") + + print(f"\n✅ 测试完成,总共处理了 {line_count} 行") + print(f"📝 最终内容: {accumulated_text}") + + except Exception as e: + print(f"❌ 测试失败: {e}") + +if __name__ == "__main__": + test_streaming_response() \ No newline at end of file