#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 音频播放优化测试脚本 用于验证缓冲区优化和播放性能改进 """ import time import json import os import sys def test_optimization(): """测试优化效果""" print("🧪 音频播放优化测试") print("=" * 50) # 显示优化后的配置参数 print("📋 优化后的配置参数:") print(" - 预加载缓冲区: 6个音频块(原:3个)") print(" - 智能句子缓冲: 最多5个句子(原:3个)") print(" - 最小触发句子: 2个句子(原:1个)") print(" - 积累时间窗口: 150ms(原:200ms)") print(" - TTS任务队列: 20个任务(原:10个)") print(" - 音频块大小: 1024字节(原:2048字节)") print(" - 播放冷却期: 0.05秒(原:0.1秒)") print(" - 长句子触发阈值: 30字符(原:50字符)") print(" - 播放缓冲区维护: 4个块(原:3个)") print() # 测试智能句子缓冲逻辑 print("🧠 测试智能句子缓冲逻辑:") print(" 🔄 首次播放逻辑测试:") test_sentences = [ "你好", # 短句子 "今天天气怎么样?", # 中等长度 "我觉得这个方案很不错,我们可以试试看。", # 长句子 "这是一个超过三十个字符的句子,应该会立即触发TTS生成。", # 超过30字符 "短句。", # 带标点的短句 ] # 模拟首次播放的缓冲区状态 tts_buffer = [] first_playback_started = False total_buffered_text = "" trigger_count = 0 for i, sentence in enumerate(test_sentences): tts_buffer.append(sentence) total_buffered_text = ''.join(tts_buffer) # 首次播放逻辑 if not first_playback_started: should_trigger = False trigger_reason = "" # 条件1: 总文本长度超过40字符且至少有2个句子 if len(total_buffered_text) >= 40 and len(tts_buffer) >= 2: should_trigger = True trigger_reason = f"总长度{len(total_buffered_text)}字符,{len(tts_buffer)}个句子" # 条件2: 有1个完整长句子(超过25字符) elif len(sentence) >= 25 and sentence.endswith(('。', '!', '?', '.', '!', '?')) and len(tts_buffer) >= 2: should_trigger = True trigger_reason = f"长句子{len(sentence)}字符+缓冲内容" # 条件3: 缓冲区达到最大值(5个) elif len(tts_buffer) >= 5: should_trigger = True trigger_reason = f"缓冲区达到最大值{len(tts_buffer)}" # 条件4: 超过500ms(模拟) if should_trigger: trigger_count += 1 first_playback_started = True print(f" 🎵 首次触发TTS: {trigger_reason}") print(f" 📝 发送内容: '{total_buffered_text[:50]}...'") tts_buffer = [] else: print(f" ⏳ 首次缓冲: '{sentence}' (累计: {len(total_buffered_text)}字符, {len(tts_buffer)}个句子)") else: # 正常播放逻辑 if len(sentence) > 30 or len(tts_buffer) >= 3: should_trigger = True trigger_reason = "长句子" if len(sentence) > 30 else "缓冲区满" if should_trigger: trigger_count += 1 print(f" ✅ 正常触发TTS: {trigger_reason}") print(f" 📝 发送内容: '{''.join(tts_buffer)[:30]}...'") tts_buffer = [] else: print(f" ⏳ 正常缓冲: '{sentence}'") print(f" 📊 总触发次数: {trigger_count}") print() print(" 📋 首次播放优化效果:") print(" • 确保首句有足够长度(40+字符或25+字符完整句)") print(" • 积累多个句子避免播放卡顿") print(" • 5秒超时机制防止无限等待") print(" • 后续句子正常流式处理") print() # 显示性能监控信息 print("📊 性能监控功能:") print(" - 实时缓冲区大小统计") print(" - 平均和最大缓冲区大小") print(" - 播放块数和音频大小统计") print(" - 每5秒自动输出性能报告") print() print("🎯 预期改进效果:") print(" 1. ✅ 减少音频播放卡顿(更大的缓冲区)") print(" 2. ✅ 更快的TTS响应(优化的触发条件)") print(" 3. ✅ 更流畅的播放体验(减少冷却期)") print(" 4. ✅ 更好的资源利用(更小的音频块)") print(" 5. ✅ 实时性能监控(调试和优化)") print() print("📝 测试建议:") print(" 1. 运行主程序观察播放流畅度") print(" 2. 查看性能统计输出") print(" 3. 监控缓冲区大小变化") print(" 4. 测试不同长度的语音响应") print() print("🚀 测试完成!可以运行主程序验证优化效果。") def show_optimization_summary(): """显示优化总结""" print("📈 音频播放优化总结") print("=" * 50) summary = { "缓冲区优化": [ "预加载缓冲区: 3→6个块", "智能句子缓冲: 3→5个句子", "最小触发缓冲: 1→2个句子", "TTS任务队列: 10→20个任务" ], "响应性优化": [ "积累时间窗口: 200ms→150ms", "长句子触发: 50→30字符", "中等长度触发: 30→20字符", "播放冷却期: 0.1s→0.05s" ], "播放优化": [ "音频块大小: 2048→1024字节", "播放缓冲维护: 3→4个块", "数据转移: 2→3个块/次" ], "监控功能": [ "实时性能统计", "缓冲区大小监控", "自动性能报告", "播放进度追踪" ] } for category, improvements in summary.items(): print(f"\n🔧 {category}:") for improvement in improvements: print(f" • {improvement}") print(f"\n🎯 总体目标: 减少音频播放卡顿,提升用户体验") print(f"📊 预期效果: 更流畅的实时语音交互") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--summary": show_optimization_summary() else: test_optimization()