#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试缓存音频播放完成检测修复 """ import sys import os sys.path.append(os.path.dirname(__file__)) from audio_processes import OutputProcess import multiprocessing as mp import time def test_cached_audio_completion(): """测试缓存音频播放完成检测""" print("🧪 开始测试缓存音频播放完成检测修复...") # 创建测试队列 audio_queue = mp.Queue(maxsize=100) event_queue = mp.Queue(maxsize=100) # 创建输出进程实例 config = { 'buffer_size': 1000, 'show_progress': True, 'progress_interval': 100, 'tts_speaker': 'zh_female_wanqudashu_moon_bigtts' } output_process = OutputProcess(audio_queue, config, event_queue) # 测试1: 检查新添加的状态变量 print("\n📋 测试1: 检查新增的状态变量") assert hasattr(output_process, 'is_playing_cached_audio'), "缺少 is_playing_cached_audio 状态变量" assert output_process.is_playing_cached_audio == False, "初始状态应该为 False" print("✅ 状态变量检查通过") # 测试2: 检查新添加的方法 print("\n📋 测试2: 检查新增的方法") assert hasattr(output_process, '_check_cached_audio_completion'), "缺少 _check_cached_audio_completion 方法" print("✅ 方法检查通过") # 测试3: 检查增强播放完成检测方法 print("\n📋 测试3: 检查增强播放完成检测方法") assert hasattr(output_process, '_check_enhanced_playback_completion'), "缺少 _check_enhanced_playback_completion 方法" print("✅ 增强播放完成检测方法检查通过") # 测试4: 模拟缓存音频播放状态 print("\n📋 测试4: 模拟缓存音频播放状态") output_process.is_playing_cached_audio = True output_process.end_signal_received = True output_process.currently_playing = False output_process.preload_buffer = [] output_process.playback_buffer = [] output_process.last_audio_chunk_time = time.time() - 2.0 # 2秒前播放 # 测试缓存音频完成检测 result = output_process._check_cached_audio_completion() assert result == True, "缓存音频应该检测为播放完成" print("✅ 缓存音频完成检测逻辑正确") # 测试5: 模拟缓存音频仍在播放 print("\n📋 测试5: 模拟缓存音频仍在播放") output_process.playback_buffer = [b'fake_audio_data'] # 还有数据在播放缓冲区 result = output_process._check_cached_audio_completion() assert result == False, "缓存音频仍在播放时应该检测为未完成" print("✅ 缓存音频播放中检测逻辑正确") print("\n🎉 所有测试通过!修复成功!") # 清理 audio_queue.close() event_queue.close() if __name__ == "__main__": test_cached_audio_completion()