#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 首次播放逻辑专项测试 验证第一句TTS触发机制的优化效果 """ import sys def test_first_playback_logic(): """测试首次播放逻辑""" print("🎵 首次播放逻辑专项测试") print("=" * 50) # 测试场景1: 短句子积累 print("📝 测试场景1: 短句子积累") print(" 模拟LLM生成的短句子流:") short_sentences = [ "你好", "嗯", "这个", "问题", "确实", "需要", "仔细", "思考", ] buffer = [] total_length = 0 first_trigger = None for i, sentence in enumerate(short_sentences): buffer.append(sentence) total_length = sum(len(s) for s in buffer) # 模拟首次播放逻辑 trigger_reason = None if total_length >= 40 and len(buffer) >= 2: trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子" elif len(buffer) >= 5: trigger_reason = f"缓冲区达到最大值{len(buffer)}" if trigger_reason and first_trigger is None: first_trigger = i + 1 print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}") print(f" 📝 发送内容: '{''.join(buffer)}'") break else: print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)") if first_trigger is None: print(" ⚠️ 未触发TTS(需要超时机制)") print() # 测试场景2: 中等长度句子 print("📝 测试场景2: 中等长度句子") medium_sentences = [ "你好,很高兴见到你。", "今天天气真不错呢。", "我们可以一起去公园玩吗?", "我想那会是一个很好的主意。", ] buffer = [] total_length = 0 first_trigger = None for i, sentence in enumerate(medium_sentences): buffer.append(sentence) total_length = sum(len(s) for s in buffer) # 模拟首次播放逻辑 trigger_reason = None if total_length >= 40 and len(buffer) >= 2: trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子" elif len(sentence) >= 25 and sentence.endswith(('。', '!', '?', '.', '!', '?')) and len(buffer) >= 2: trigger_reason = f"长句子{len(sentence)}字符+缓冲内容" if trigger_reason and first_trigger is None: first_trigger = i + 1 print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}") print(f" 📝 发送内容: '{''.join(buffer)}'") break else: print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)") if first_trigger is None: print(" ⚠️ 未触发TTS(需要超时机制)") print() # 测试场景3: 长句子 print("📝 测试场景3: 长句子") long_sentences = [ "你好,", "我认为这个问题需要我们从多个角度来分析。", "首先,让我们仔细了解一下具体情况。", ] buffer = [] total_length = 0 first_trigger = None for i, sentence in enumerate(long_sentences): buffer.append(sentence) total_length = sum(len(s) for s in buffer) # 模拟首次播放逻辑 trigger_reason = None if total_length >= 40 and len(buffer) >= 2: trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子" elif len(sentence) >= 25 and sentence.endswith(('。', '!', '?', '.', '!', '?')) and len(buffer) >= 2: trigger_reason = f"长句子{len(sentence)}字符+缓冲内容" if trigger_reason and first_trigger is None: first_trigger = i + 1 print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}") print(f" 📝 发送内容: '{''.join(buffer)}'") break else: print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)") if first_trigger is None: print(" ⚠️ 未触发TTS(需要超时机制)") print() # 测试场景4: 超长单句 print("📝 测试场景4: 超长单句") ultra_long_sentence = "根据我的分析,这个问题的解决方案需要综合考虑多个因素,包括时间成本、资源投入以及最终的实施效果。" buffer = ["你好"] buffer.append(ultra_long_sentence) total_length = sum(len(s) for s in buffer) if total_length >= 40 and len(buffer) >= 2: print(f" 🎵 第2句触发TTS: 总长度{total_length}字符,{len(buffer)}个句子") print(f" 📝 发送内容: '{''.join(buffer)[:50]}...'") else: print(" ⚠️ 未触发TTS") print() def show_optimization_comparison(): """显示优化对比""" print("📈 首次播放逻辑优化对比") print("=" * 50) comparison = { "优化前": { "触发条件": "任何完整句子或长句子", "最小长度": "无明确要求", "积攒机制": "基本没有", "可能导致": "播放卡顿,等待数据", }, "优化后": { "触发条件": "40+字符且2+句子 或 25+字符完整句+缓冲", "最小长度": "总长度40字符或单句25字符", "积攒机制": "智能积累多个句子", "超时保护": "5秒超时机制", "效果": "确保有足够数据才开始播放" } } for aspect, details in comparison.items(): print(f"\n🔧 {aspect}:") for key, value in details.items(): print(f" • {key}: {value}") print(f"\n🎯 核心改进: 确保首次播放有足够的内容,避免因为数据不足导致的播放卡顿") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--comparison": show_optimization_comparison() else: test_first_playback_logic() show_optimization_comparison()