回声待处理

This commit is contained in:
朱潮 2025-09-20 20:13:55 +08:00
parent 8003ca3799
commit 5a7e39f5b5
2 changed files with 724 additions and 89 deletions

View File

@ -132,8 +132,318 @@ class EnergyBasedRecorder:
self.frame_count = 0
self.start_time = time.time()
# 流式TTS优化系统
self.tts_task_queue = queue.Queue(maxsize=10) # TTS任务队列
self.tts_buffer = [] # 智能缓冲区
self.tts_buffer_max_size = 3 # 最多缓冲3个句子
self.tts_buffer_min_size = 1 # 最少1个句子
self.tts_accumulation_time = 0.2 # 200ms积累窗口
self.tts_last_trigger_time = 0
self.tts_worker_running = True
self.tts_worker_thread = None
self.tts_pending_sentences = [] # 待处理的句子
# 统一音频播放系统
self.audio_playback_queue = queue.Queue(maxsize=1000) # 统一音频播放队列,大队列确保不会截断
self.playback_worker_running = True
self.playback_worker_thread = None
self.currently_playing = False # 当前是否正在播放
self.last_playback_time = 0 # 最后一次播放时间戳
self.playback_cooldown_period = 1.5 # 播放冷却时间(秒)
# 启动工作线程
self._start_tts_worker()
self._start_playback_worker()
self._setup_audio()
def _start_tts_worker(self):
"""启动TTS工作线程"""
self.tts_worker_thread = threading.Thread(target=self._tts_worker, daemon=True)
self.tts_worker_thread.start()
print("🎵 TTS工作线程已启动")
def _start_playback_worker(self):
"""启动音频播放工作线程"""
self.playback_worker_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_worker_thread.start()
print("🔊 音频播放工作线程已启动")
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
# 从队列获取任务
task = self.tts_task_queue.get(timeout=1.0)
if task is None: # 结束信号
break
task_type, content = task
if task_type == "tts_sentence":
# 生成音频数据并发送到统一播放队列
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"❌ TTS工作线程错误: {e}")
time.sleep(0.1)
def _playback_worker(self):
"""音频播放工作线程 - 处理统一音频播放队列"""
print("🔊 播放工作线程开始运行...")
# 等待音频设备就绪
time.sleep(0.5)
# 创建音频播放流
playback_stream = None
try:
playback_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=512
)
print("🔊 音频播放流已创建")
except Exception as e:
print(f"❌ 创建音频播放流失败: {e}")
return
chunks_played = 0
total_size = 0
try:
while self.playback_worker_running:
try:
# 从播放队列获取音频数据
audio_chunk = self.audio_playback_queue.get(timeout=1.0)
if audio_chunk is None: # 结束信号
print("🔊 收到播放结束信号")
break
if isinstance(audio_chunk, str) and audio_chunk.startswith("METADATA:"):
# 处理元数据(如文本信息)
metadata = audio_chunk[9:] # 移除 "METADATA:" 前缀
print(f"📝 播放元数据: {metadata}")
self.audio_playback_queue.task_done()
continue
# 播放音频块
if audio_chunk and len(audio_chunk) > 0:
self.currently_playing = True
self.last_playback_time = time.time() # 更新最后播放时间
playback_stream.write(audio_chunk)
chunks_played += 1
total_size += len(audio_chunk)
# 减少进度显示频率
if chunks_played % 10 == 0:
print(f"\r🔊 统一播放: {chunks_played} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
self.audio_playback_queue.task_done()
except queue.Empty:
# 队列空时,检查是否正在播放
self.currently_playing = False
continue
except Exception as e:
print(f"❌ 播放工作线程错误: {e}")
self.currently_playing = False
time.sleep(0.1)
print(f"\n✅ 播放工作线程结束: 总计 {chunks_played} 块, {total_size / 1024:.1f} KB")
finally:
self.currently_playing = False
if playback_stream:
playback_stream.stop_stream()
playback_stream.close()
print("🔊 音频播放流已关闭")
def _add_tts_task(self, content):
"""添加TTS任务到队列"""
try:
self.tts_task_queue.put_nowait(("tts_sentence", content))
return True
except queue.Full:
print("⚠️ TTS任务队列已满丢弃任务")
return False
def _generate_tts_audio(self, text):
"""生成TTS音频数据并发送到统一播放队列"""
if not self.enable_tts:
return False
try:
print(f"🎵 生成TTS音频: {text[:50]}...")
# 添加元数据到播放队列
try:
self.audio_playback_queue.put_nowait(f"METADATA:{text[:30]}...")
except queue.Full:
print("⚠️ 播放队列满,跳过元数据")
# 构建请求头
headers = {
"X-Api-App-Id": self.tts_app_id,
"X-Api-Access-Key": self.tts_access_key,
"X-Api-Resource-Id": self.tts_resource_id,
"X-Api-App-Key": self.tts_app_key,
"Content-Type": "application/json",
"Connection": "keep-alive"
}
# 构建请求参数
payload = {
"user": {
"uid": "recorder_tts_generator"
},
"req_params": {
"text": text,
"speaker": self.tts_speaker,
"audio_params": {
"format": "pcm",
"sample_rate": 16000,
"enable_timestamp": True
},
"additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}"
}
}
# 发送请求
session = requests.Session()
try:
response = session.post(self.tts_url, headers=headers, json=payload, stream=True)
if response.status_code != 200:
print(f"❌ TTS请求失败: {response.status_code}")
return False
# 处理流式响应
total_audio_size = 0
chunk_count = 0
success_count = 0
print("🔄 开始接收TTS音频流...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get("code", 0) == 0 and "data" in data and data["data"]:
chunk_audio = base64.b64decode(data["data"])
audio_size = len(chunk_audio)
total_audio_size += audio_size
chunk_count += 1
# 将音频块发送到统一播放队列
try:
self.audio_playback_queue.put_nowait(chunk_audio)
success_count += 1
except queue.Full:
# 队列满时稍微等待
time.sleep(0.01)
try:
self.audio_playback_queue.put_nowait(chunk_audio)
success_count += 1
except queue.Full:
# 如果还是满的,跳过这个块
continue
# 减少进度显示频率
if chunk_count % 5 == 0:
print(f"\r📥 生成音频: {chunk_count} 块 | 成功: {success_count} | {total_audio_size / 1024:.1f} KB", end='', flush=True)
continue
if data.get("code", 0) == 20000000:
break
if data.get("code", 0) > 0:
print(f"\n❌ TTS错误响应: {data}")
break
except json.JSONDecodeError:
continue
success_rate = (success_count / chunk_count * 100) if chunk_count > 0 else 0
print(f"\n✅ TTS音频生成完成: {chunk_count} 块, 成功率 {success_rate:.1f}% | 总大小: {total_audio_size / 1024:.1f} KB")
return success_count > 0
finally:
response.close()
session.close()
except Exception as e:
print(f"❌ TTS音频生成失败: {e}")
return False
def _should_trigger_tts(self, sentence):
"""智能判断是否应该触发TTS"""
current_time = time.time()
# 检查缓冲区大小
if len(self.tts_buffer) >= self.tts_buffer_max_size:
return True
# 检查时间窗口
time_since_last = current_time - self.tts_last_trigger_time
if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size:
return True
# 检查句子特征 - 长句子优先调整为100字符防止回声
if len(sentence) > 100: # 超过100字符的长句子立即触发
return True
# 中等长度句子80-100字符如果有结束标点也触发
if len(sentence) > 80:
end_punctuations = ['', '', '', '.', '!', '?']
if any(sentence.strip().endswith(p) for p in end_punctuations):
return True
# 短句子只在缓冲区较多或时间窗口到期时触发
return False
def _process_tts_buffer(self):
"""处理TTS缓冲区"""
if not self.tts_buffer:
return
# 合并缓冲区的句子
combined_text = ''.join(self.tts_buffer)
# 添加到TTS任务队列
if self._add_tts_task(combined_text):
print(f"🎵 触发TTS: {combined_text[:50]}...")
self.tts_last_trigger_time = time.time()
# 清空缓冲区
self.tts_buffer.clear()
def _add_sentence_to_buffer(self, sentence):
"""添加句子到智能缓冲区"""
if not sentence.strip():
return
self.tts_buffer.append(sentence)
# 检查是否应该触发TTS
if self._should_trigger_tts(sentence):
self._process_tts_buffer()
def _flush_tts_buffer(self):
"""强制刷新TTS缓冲区"""
if self.tts_buffer:
self._process_tts_buffer()
def _load_available_characters(self):
"""加载可用角色列表"""
characters = []
@ -583,88 +893,56 @@ class EnergyBasedRecorder:
time.sleep(0.3)
def play_audio_realtime(self, audio_queue):
"""真正的实时流式播放:从队列中获取音频并立即播放"""
"""音频队列转发器:将音频数据转发到统一播放队列"""
try:
print("🔊 启动实时音频播放器...")
print("🔊 启动音频队列转发器...")
# 确保音频输入已停止
if self.recording:
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
# 清空缓冲区
self.pre_record_buffer = []
self.energy_history = []
self.zcr_history = []
# 关闭输入流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
self.is_playing = True
time.sleep(0.3) # 等待音频设备切换
# 创建播放流
playback_stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=1024 # 较小的缓冲区以实现更快的响应
)
print("🚫 音频输入已关闭,实时播放器就绪")
chunks_played = 0
chunks_forwarded = 0
total_size = 0
# 持续从队列中获取音频数据并播放
# 持续从队列中获取音频数据并转发到统一播放队列
while True:
try:
# 设置超时以避免无限等待
chunk = audio_queue.get(timeout=1.0)
chunk = audio_queue.get(timeout=0.5)
if chunk is None: # 结束信号
print("📥 收到播放结束信号")
print("\n📥 收到转发结束信号")
break
if chunk: # 确保chunk不为空
playback_stream.write(chunk)
chunks_played += 1
total_size += len(chunk)
# 显示播放进度
print(f"\r🔊 实时播放: {chunks_played} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
# 转发到统一播放队列
try:
self.audio_playback_queue.put_nowait(chunk)
chunks_forwarded += 1
total_size += len(chunk)
# 减少进度显示频率以提高性能
if chunks_forwarded % 10 == 0:
print(f"\r📤 转发音频: {chunks_forwarded} 块 | {total_size / 1024:.1f} KB", end='', flush=True)
except queue.Full:
print("⚠️ 统一播放队列满,丢弃音频块")
continue
audio_queue.task_done()
except queue.Empty:
# 队列为空,检查是否还在接收数据
if not hasattr(self, '_receiving_audio') or not self._receiving_audio:
print("\n📡 音频接收完成,播放器结束")
print("\n📡 音频接收完成,转发器结束")
break
# 继续等待,不要显示太多调试信息
continue
except Exception as e:
print(f"\n播放过程中出错: {e}")
print(f"\n转发过程中出错: {e}")
break
# 确保播放流正确关闭
playback_stream.stop_stream()
playback_stream.close()
print(f"\n✅ 实时播放完成: {chunks_played} 块, {total_size / 1024:.1f} KB")
print(f"\n✅ 音频转发完成: {chunks_forwarded} 块, {total_size / 1024:.1f} KB")
return True
except Exception as e:
print(f"\n实时播放失败: {e}")
print(f"\n❌ 音频转发失败: {e}")
return False
finally:
self.is_playing = False
time.sleep(0.3)
def play_audio_hybrid(self, audio_chunks):
"""混合模式播放:智能选择流式或传统播放"""
@ -834,13 +1112,21 @@ class EnergyBasedRecorder:
# 调用文本转语音
if self.enable_tts:
print("-" * 50)
tts_file = self.text_to_speech(llm_response)
if tts_file:
print("✅ AI语音回复完成")
# 检查是否已经在流式处理中播放了语音
if not hasattr(self, '_streaming_tts_completed') or not self._streaming_tts_completed:
tts_file = self.text_to_speech(llm_response)
if tts_file:
print("✅ AI语音回复完成")
# 删除录音文件
self._safe_delete_file(filename, "录音文件")
else:
print("❌ 文本转语音失败")
else:
print("✅ AI语音回复已完成流式播放")
# 删除录音文件
self._safe_delete_file(filename, "录音文件")
else:
print("❌ 文本转语音失败")
# 重置流式标记
self._streaming_tts_completed = False
else:
print(" 文本转语音功能已禁用")
# 如果不启用TTS直接删除录音文件
@ -862,7 +1148,19 @@ class EnergyBasedRecorder:
elif websockets is None:
print(" 请安装 websockets 库以启用语音识别功能")
print("🔄 准备重新开启音频输入")
# 等待音频播放完成后再重新开启音频输入
print("⏳ 等待音频播放完成...")
while not self.audio_playback_queue.empty() or self.currently_playing:
queue_size = self.audio_playback_queue.qsize()
playing_status = "播放中" if self.currently_playing else "等待播放"
print(f"\r🔊 {playing_status}... 队列: {queue_size}", end='', flush=True)
time.sleep(0.1)
# 额外等待1秒确保音频设备完全停止
print("\n⏳ 等待音频设备完全停止...")
time.sleep(1.0)
print("🔄 音频播放完成,准备重新开启音频输入")
self.recording = False
self.recorded_frames = []
@ -950,10 +1248,21 @@ class EnergyBasedRecorder:
if len(data) == 0:
continue
# 如果正在播放,完全跳过音频处理
if self.is_playing:
# 检查播放冷却期 - 防止回声
current_time = time.time()
time_since_last_play = current_time - self.last_playback_time
in_cooldown = time_since_last_play < self.playback_cooldown_period
# 如果正在播放、播放队列不为空、或在冷却期内,完全跳过音频处理
if self.is_playing or self.currently_playing or not self.audio_playback_queue.empty() or in_cooldown:
# 显示播放状态
status = "🔊 播放中... 跳过录音处理"
queue_size = self.audio_playback_queue.qsize()
if in_cooldown:
cooldown_time = self.playback_cooldown_period - time_since_last_play
status = f"🔊 播放冷却中... {cooldown_time:.1f}s 队列: {queue_size}"
else:
playing_status = "播放中" if self.currently_playing else "等待播放"
status = f"🔊 {playing_status}... 队列: {queue_size}"
print(f"\r{status}", end='', flush=True)
time.sleep(0.1) # 播放时增加延迟减少CPU使用
continue
@ -1058,6 +1367,18 @@ class EnergyBasedRecorder:
if self.recording:
self.stop_recording()
# 停止TTS工作线程
self.tts_worker_running = False
if self.tts_worker_thread:
self.tts_task_queue.put(None) # 发送结束信号
self.tts_worker_thread.join(timeout=2.0)
# 停止播放工作线程
self.playback_worker_running = False
if self.playback_worker_thread:
self.audio_playback_queue.put(None) # 发送结束信号
self.playback_worker_thread.join(timeout=3.0)
if self.stream:
self.stream.stop_stream()
self.stream.close()
@ -1219,12 +1540,12 @@ class EnergyBasedRecorder:
return None
def call_llm(self, user_message):
"""调用大语言模型API"""
"""调用大语言模型API - 支持流式输出"""
if not self.enable_llm:
return None
try:
print("🤖 调用大语言模型...")
print("🤖 调用大语言模型(流式输出)...")
# 获取角色配置中的系统提示词
if self.character_config and "system_prompt" in self.character_config:
@ -1266,25 +1587,123 @@ class EnergyBasedRecorder:
data = {
"model": self.llm_model,
"messages": messages,
"max_tokens": max_tokens
"max_tokens": max_tokens,
"stream": True # 启用流式输出
}
response = requests.post(self.llm_api_url, headers=headers, json=data, timeout=30)
# 使用流式请求
response = requests.post(self.llm_api_url, headers=headers, json=data, stream=True, timeout=30)
if response.status_code == 200:
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
llm_response = result["choices"][0]["message"]["content"]
# 过滤括号中的旁白内容
filtered_response = self._filter_parentheses_content(llm_response.strip())
print("🔄 开始接收流式响应...")
# 处理流式响应
accumulated_text = ""
sentence_buffer = ""
print("🔍 开始接收SSE流...")
# 使用行缓冲区处理多字节字符
buffer = ""
for line_bytes in response.iter_lines():
if not line_bytes:
continue
# 保存到历史记录
# 解码为字符串,处理可能的编码问题
try:
line = line_bytes.decode('utf-8')
except UnicodeDecodeError:
try:
line = line_bytes.decode('latin-1')
except Exception:
continue
# 添加到缓冲区
buffer += line
# 检查是否包含完整的数据行
while buffer:
# 查找完整的数据行
if '\n' in buffer:
line, buffer = buffer.split('\n', 1)
else:
line = buffer
buffer = ""
line = line.strip()
if not line:
continue
try:
# 解析SSE格式的响应
if line.startswith("data: "):
data_str = line[6:] # 移除 "data: " 前缀
if data_str == "[DONE]":
break
# 尝试解析JSON增加错误处理
try:
chunk_data = json.loads(data_str)
except json.JSONDecodeError as e:
# 尝试修复JSON检查是否是不完整的字符串
if not data_str.endswith('}'):
# 这可能是一个不完整的JSON等待下一行
buffer = data_str + buffer
continue
print(f"\n🔍 调试 - 原始行: {repr(line)}")
print(f"🔍 调试 - 数据字符串: {repr(data_str)}")
print(f"🔍 调试 - JSON错误: {e}")
continue
# 处理解析后的数据
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
accumulated_text += content
sentence_buffer += content
# 检查是否有完整句子
if self._is_complete_sentence(sentence_buffer):
print(f"📝 检测到完整句子: {sentence_buffer}")
# 过滤括号内容
filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip())
if filtered_sentence:
print(f"🎵 添加到智能缓冲区: {filtered_sentence}")
# 使用智能缓冲系统替代立即调用TTS
self._add_sentence_to_buffer(filtered_sentence)
# 标记已经进行了流式TTS播放
self._streaming_tts_completed = True
# 重置句子缓冲区
sentence_buffer = ""
# 显示进度
print(f"\r💬 已生成: {accumulated_text}", end='', flush=True)
except Exception as e:
print(f"\n⚠️ 处理流式响应时出错: {e}")
print(f"🔍 调试 - 问题行: {repr(line)}")
continue
print(f"\n✅ 流式响应完成: {accumulated_text}")
# 强制刷新TTS缓冲区确保所有内容都被处理
self._flush_tts_buffer()
# 保存完整回复到历史记录
if accumulated_text:
filtered_response = self._filter_parentheses_content(accumulated_text.strip())
self._add_to_chat_history(user_message, filtered_response)
return filtered_response
else:
print("❌ LLM API响应格式错误")
return None
return accumulated_text
else:
print(f"❌ LLM API调用失败: {response.status_code}")
print(f"响应内容: {response.text}")
@ -1317,6 +1736,86 @@ class EnergyBasedRecorder:
return filtered_text
def _is_complete_sentence(self, text):
"""检测是否为完整句子"""
import re
if not text or len(text.strip()) == 0:
return False
# 句子结束标点符号
sentence_endings = r'[。!?.!?]'
# 检查是否以句子结束符结尾
if re.search(sentence_endings + r'\s*$', text):
return True
# 检查是否包含句子结束符(可能在句子中间)
if re.search(sentence_endings, text):
# 如果后面有其他标点或字符,可能不是完整句子
remaining_text = re.split(sentence_endings, text, 1)[-1]
if len(remaining_text.strip()) > 0:
return False
return True
# 对于较短的文本,如果包含常见完整句式模式
common_patterns = [
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
r'^[\u4e00-\u9fff]{2,4}[的得了]$' # 2-4个中文字+的/了/得
]
for pattern in common_patterns:
if re.match(pattern, text):
return True
return False
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
filtered_text = re.sub(r'\([^)]*\)', '', filtered_text)
# 移除方括号内容:【内容】
filtered_text = re.sub(r'【[^】]*】', '', filtered_text)
# 移除方括号内容:[content]
filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text)
# 移除书名号内容:「内容」
filtered_text = re.sub(r'「[^」]*」', '', filtered_text)
# 清理多余空格
filtered_text = re.sub(r'\s+', ' ', filtered_text).strip()
return filtered_text
def _add_to_chat_history(self, user_message, assistant_response):
"""添加对话到历史记录"""
# 如果历史记录超过最大长度,移除最早的记录
if len(self.chat_history) >= self.max_history_length:
self.chat_history.pop(0)
# 添加新的对话记录
self.chat_history.append({
"user": user_message,
"assistant": assistant_response,
"timestamp": time.time()
})
def clear_chat_history(self):
"""清空聊天历史"""
self.chat_history = []
print("💬 聊天历史已清空")
def get_chat_history_summary(self):
"""获取聊天历史摘要"""
if not self.chat_history:
return "暂无聊天历史"
return f"当前有 {len(self.chat_history)} 轮对话记录"
def _add_to_chat_history(self, user_message, assistant_response):
"""添加对话到历史记录"""
# 如果历史记录超过最大长度,移除最早的记录
@ -1358,13 +1857,24 @@ class EnergyBasedRecorder:
timestamp = time.strftime("%Y%m%d_%H%M%S")
return f"tts_response_{timestamp}.pcm"
def text_to_speech(self, text):
"""文本转语音 - 真正实时流式播放"""
def text_to_speech(self, text, mode='normal', use_worker=True):
"""文本转语音 - 真正实时流式播放
mode: 'normal' 普通模式, 'buffered' 智能缓冲模式
use_worker: 是否使用工作线程模式
"""
if not self.enable_tts:
return None
try:
print("🔊 开始文本转语音(实时流式播放)...")
if mode == 'buffered':
print("🎵 开始智能缓冲TTS处理...")
else:
print("🔊 开始文本转语音(实时流式播放)...")
# 如果是在工作线程中,不启动播放器
if use_worker and hasattr(self, 'is_playing') and self.is_playing:
print("⚠️ 工作线程模式,跳过播放器启动")
return None
# 构建请求头
headers = {
@ -1393,8 +1903,8 @@ class EnergyBasedRecorder:
}
}
# 创建音频队列
audio_queue = queue.Queue()
# 创建音频队列 - 设置更大的队列大小以减少中断
audio_queue = queue.Queue(maxsize=50) # 增加队列大小
# 启动实时播放线程
self._receiving_audio = True
@ -1403,6 +1913,9 @@ class EnergyBasedRecorder:
player_thread.start()
print("🎵 实时播放器已启动")
# 给播放器一点时间启动
time.sleep(0.1)
# 发送请求
session = requests.Session()
try:
@ -1415,11 +1928,14 @@ class EnergyBasedRecorder:
audio_queue.put(None)
return None
# 处理流式响应 - 实时播放模式
# 处理流式响应 - 根据模式优化
total_audio_size = 0
chunk_count = 0
print("🔄 开始接收TTS音频流实时播放...")
if mode == 'buffered':
print("🔄 开始接收TTS音频流智能缓冲模式...")
else:
print("🔄 开始接收TTS音频流实时播放...")
for chunk in response.iter_lines(decode_unicode=True):
if not chunk:
@ -1434,11 +1950,22 @@ class EnergyBasedRecorder:
total_audio_size += audio_size
chunk_count += 1
# 将音频块放入队列进行实时播放
audio_queue.put(chunk_audio)
# 将音频块放入队列进行实时播放 - 使用非阻塞方式
try:
audio_queue.put_nowait(chunk_audio)
except queue.Full:
# 如果队列满了,稍微等待一下
time.sleep(0.01)
try:
audio_queue.put_nowait(chunk_audio)
except queue.Full:
# 如果还是满的,跳过这个块以避免阻塞
print(f"\n⚠️ 音频队列已满,跳过块 {chunk_count}")
continue
# 显示接收进度
print(f"\r📥 接收并播放: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB", end='', flush=True)
# 减少进度显示频率以提高性能
if chunk_count % 5 == 0:
print(f"\r📥 接收并播放: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB", end='', flush=True)
continue
if data.get("code", 0) == 0 and "sentence" in data and data["sentence"]:

108
test_streaming.py Normal file
View File

@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试流式响应解析的脚本
"""
import json
import requests
import os
def test_streaming_response():
"""测试流式响应解析"""
# 检查API密钥
api_key = os.environ.get("ARK_API_KEY")
if not api_key:
print("❌ 请设置 ARK_API_KEY 环境变量")
return
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"messages": [
{
"content": "你是一个智能助手,请回答问题。",
"role": "system"
},
{
"content": "你好,请简单介绍一下自己",
"role": "user"
}
],
"model": "doubao-1-5-pro-32k-250115",
"stream": True
}
print("🚀 开始测试流式响应...")
try:
response = requests.post(
"https://ark.cn-beijing.volces.com/api/v3/chat/completions",
headers=headers,
json=data,
stream=True,
timeout=30
)
print(f"📊 响应状态: {response.status_code}")
if response.status_code != 200:
print(f"❌ 请求失败: {response.text}")
return
print("🔍 开始解析流式响应...")
accumulated_text = ""
line_count = 0
for line in response.iter_lines(decode_unicode=True):
line_count += 1
if not line or not line.strip():
continue
# 预处理
line = line.strip()
print(f"\n--- 第{line_count}行 ---")
print(f"原始内容: {repr(line)}")
if line.startswith("data: "):
data_str = line[6:] # 移除 "data: " 前缀
print(f"处理后: {repr(data_str)}")
if data_str == "[DONE]":
print("✅ 流结束")
break
try:
chunk_data = json.loads(data_str)
print(f"✅ JSON解析成功: {chunk_data}")
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
accumulated_text += content
print(f"💬 累计内容: {accumulated_text}")
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}")
print(f"🔍 问题数据: {repr(data_str)}")
except Exception as e:
print(f"❌ 其他错误: {e}")
print(f"\n✅ 测试完成,总共处理了 {line_count}")
print(f"📝 最终内容: {accumulated_text}")
except Exception as e:
print(f"❌ 测试失败: {e}")
if __name__ == "__main__":
test_streaming_response()