From 0f15b5060bfb429fb1de39d1d0f5d73d79d2ccd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Fri, 26 Sep 2025 11:47:28 +0800 Subject: [PATCH] add config --- AUDIO_PROCESSES_IMPROVEMENTS.md | 127 ------------------- CACHED_AUDIO_TIMING_FIX.md | 66 ---------- CACHE_AUDIO_FIX_SUMMARY.md | 101 --------------- GREETING_CACHE_IMPLEMENTATION.md | 165 ------------------------ QUICKSTART.md | 101 --------------- README.md | 143 --------------------- README_MANUAL_CONTROL.md | 210 ------------------------------- README_multiprocess.md | 190 ---------------------------- characters/aiyinsitan.json | 10 ++ characters/shaseng.json | 10 ++ characters/tangseng.json | 10 ++ characters/xiaolanchong.json | 10 ++ config.json | 8 +- test_cache_fixes.py | 172 ------------------------- test_cached_audio_fix.py | 78 ------------ test_cached_playback.py | 172 ------------------------- test_cached_timing_fix.py | 91 -------------- test_final_integration.py | 139 -------------------- test_full_cache_flow.py | 139 -------------------- test_greeting_cache.py | 115 ----------------- test_recording_stop.py | 170 ------------------------- test_role_switching.py | 147 ---------------------- 22 files changed, 47 insertions(+), 2327 deletions(-) delete mode 100644 AUDIO_PROCESSES_IMPROVEMENTS.md delete mode 100644 CACHED_AUDIO_TIMING_FIX.md delete mode 100644 CACHE_AUDIO_FIX_SUMMARY.md delete mode 100644 GREETING_CACHE_IMPLEMENTATION.md delete mode 100644 QUICKSTART.md delete mode 100644 README.md delete mode 100644 README_MANUAL_CONTROL.md delete mode 100644 README_multiprocess.md create mode 100644 characters/aiyinsitan.json create mode 100644 characters/shaseng.json create mode 100644 characters/tangseng.json create mode 100644 characters/xiaolanchong.json delete mode 100644 test_cache_fixes.py delete mode 100644 test_cached_audio_fix.py delete mode 100644 test_cached_playback.py delete mode 100644 test_cached_timing_fix.py delete mode 100644 test_final_integration.py delete mode 100644 test_full_cache_flow.py delete mode 100644 test_greeting_cache.py delete mode 100644 test_recording_stop.py delete mode 100644 test_role_switching.py diff --git a/AUDIO_PROCESSES_IMPROVEMENTS.md b/AUDIO_PROCESSES_IMPROVEMENTS.md deleted file mode 100644 index e3bd8cd..0000000 --- a/AUDIO_PROCESSES_IMPROVEMENTS.md +++ /dev/null @@ -1,127 +0,0 @@ -# Audio Processes 改进总结 - -## 问题背景 -- 原始问题:TTS音频只播放3个字符就停止,出现ALSA underrun错误 -- 根本原因:音频缓冲区管理不当,播放策略过于保守 - -## 改进内容 - -### 1. 音频播放优化 (_play_audio 方法) -- **改进前**:保守的播放策略,需要缓冲区有足够数据才开始播放 -- **改进后**: - - 借鉴 recorder.py 的播放策略:只要有数据就播放 - - 添加错误恢复机制,自动检测和恢复 ALSA underrun - - 优化缓冲区管理,减少延迟 - -### 2. TTS 工作线程模式 -- **参考**: recorder.py 的 TTS 工作线程实现 -- **实现功能**: - - 独立的 TTS 工作线程处理音频生成 - - 任务队列管理,避免阻塞主线程 - - 统一的 TTS 请求接口 `process_tts_request()` - - 支持流式音频处理 - -### 3. 统一的音频播放队列 -- **InputProcess 和 OutputProcess 都支持**: - - TTS 工作线程 - - 音频生成和播放队列 - - 统一的错误处理和日志记录 - -### 4. 关键改进点 - -#### 音频播放策略 -```python -# 改进前:保守策略 -if len(self.playback_buffer) > 2: # 需要缓冲区有足够数据 - # 开始播放 - -# 改进后:积极策略 + 错误恢复 -audio_chunk = self.playback_buffer.pop(0) -if audio_chunk and len(audio_chunk) > 0: - try: - self.output_stream.write(audio_chunk) - # 统计信息 - except Exception as e: - # ALSA underrun 错误恢复 - if "underrun" in str(e).lower(): - # 自动恢复音频流 -``` - -#### TTS 工作线程 -```python -def _tts_worker(self): - """TTS工作线程 - 处理TTS任务队列""" - while self.tts_worker_running: - try: - task = self.tts_task_queue.get(timeout=1.0) - if task is None: - break - - task_type, content = task - if task_type == "tts_sentence": - self._generate_tts_audio(content) - - self.tts_task_queue.task_done() - - except queue.Empty: - continue - except Exception as e: - self.logger.error(f"TTS工作线程错误: {e}") -``` - -#### 错误恢复机制 -```python -# ALSA underrun 检测和恢复 -if "underrun" in str(e).lower() or "alsa" in str(e).lower(): - self.logger.info("检测到ALSA underrun,尝试恢复音频流") - try: - if self.output_stream: - self.output_stream.stop_stream() - time.sleep(0.1) - self.output_stream.start_stream() - self.logger.info("音频流已恢复") - except Exception as recovery_e: - self.logger.error(f"恢复音频流失败: {recovery_e}") - self.playback_buffer.clear() -``` - -### 5. 性能优化 -- 减少日志输出频率,提高性能 -- 优化队列处理策略,使用适当的超时设置 -- 动态调整休眠时间,根据播放状态优化CPU使用 - -### 6. 测试和验证 -- 创建了测试脚本 `test_audio_processes.py` -- 验证了语法正确性 -- 可以测试 TTS 功能的完整性 - -## 使用方法 - -### 在控制系统中使用 -```python -from audio_processes import InputProcess, OutputProcess - -# 创建输入和输出进程 -input_process = InputProcess(command_queue, event_queue) -output_process = OutputProcess(audio_queue) - -# 处理TTS请求 -output_process.process_tts_request("你好,这是测试语音") -``` - -### 独立测试 -```bash -python test_audio_processes.py -``` - -## 预期效果 -- 解决 ALSA underrun 错误 -- 提高音频播放的流畅性 -- 减少 TTS 处理的延迟 -- 提供更稳定的音频处理能力 - -## 注意事项 -1. 确保系统安装了必要的依赖:`requests`, `pyaudio` -2. 检查音频设备是否正常工作 -3. 网络连接正常(用于TTS服务) -4. 适当调整音频参数以适应不同环境 \ No newline at end of file diff --git a/CACHED_AUDIO_TIMING_FIX.md b/CACHED_AUDIO_TIMING_FIX.md deleted file mode 100644 index 4419526..0000000 --- a/CACHED_AUDIO_TIMING_FIX.md +++ /dev/null @@ -1,66 +0,0 @@ -# 缓存音频播放时序修复总结 - -## 问题描述 -在打招呼读取缓存播放时,发现有时会立即触发播放完成,显示异常的时间差(49.292秒),导致缓存音频无法正常播放完整。 - -## 根本原因 -在 `_process_cached_audio` 方法中,缓存音频的处理存在时序问题: - -1. **立即设置完成状态**:缓存音频被添加到预加载缓冲区后,立即设置 `all_audio_received = True` -2. **时序变量未初始化**:`last_audio_chunk_time` 没有在缓存音频开始播放时正确设置 -3. **完成检测误判**:播放完成检测逻辑使用旧的或零值的 `last_audio_chunk_time`,计算出异常的时间差,误判播放已完成 - -## 修复方案 - -### 1. 修改 `_process_cached_audio` 方法 -- **延迟设置 `all_audio_received`**:不在音频添加到缓冲区时立即设置,而是等待实际开始播放时设置 -- **正确初始化时序变量**:在缓存音频开始播放时设置 `last_audio_chunk_time = time.time()` -- **确保时序同步**:保证 `all_audio_received` 和 `last_audio_chunk_time` 在正确的时间点设置 - -### 2. 修改音频缓冲区转移逻辑 -在以下缓冲区转移场景中也添加了时序变量初始化: -- 预加载缓冲区达到阈值时 -- 最小缓冲区模式启动时 -- 强制转移预加载缓冲区时 - -### 3. 关键修改点 - -#### 在 `_process_cached_audio` 中: -```python -# 修复前:立即设置all_audio_received -self.all_audio_received = True - -# 修复后:等待播放开始时设置 -self.last_audio_chunk_time = time.time() # 关键修复 -self.all_audio_received = True -``` - -#### 在缓冲区转移逻辑中: -```python -# 在各种缓冲区转移场景中添加 -self.last_audio_chunk_time = time.time() -print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}") -``` - -## 修复效果 - -1. **解决立即完成问题**:缓存音频不再出现49.292秒的异常时间差 -2. **确保完整播放**:缓存音频能够正常播放完整时长 -3. **时序同步**:播放完成检测逻辑现在使用正确的时间戳 -4. **避免死锁**:即使在异常情况下,也能正确设置完成状态避免系统卡死 - -## 测试验证 - -创建了专门的测试脚本 `test_cached_timing_fix.py` 验证修复效果: -- ✅ 关键时序变量存在且可访问 -- ✅ 时序变量可以正确设置和读取 -- ✅ 时间差计算正常(接近0秒而非49.292秒) -- ✅ 异常时间差问题得到解决 - -## 影响范围 - -- **缓存音频播放**:修复了所有缓存音频的播放时序问题 -- **播放完成检测**:改进了播放完成检测的准确性 -- **系统稳定性**:提高了整个音频播放系统的稳定性 - -这个修复确保了缓存音频(如角色打招呼)能够正常播放完整,不会立即触发播放完成。 \ No newline at end of file diff --git a/CACHE_AUDIO_FIX_SUMMARY.md b/CACHE_AUDIO_FIX_SUMMARY.md deleted file mode 100644 index 527f4ff..0000000 --- a/CACHE_AUDIO_FIX_SUMMARY.md +++ /dev/null @@ -1,101 +0,0 @@ -# 缓存音频播放完成检测修复总结 - -## 问题描述 -缓存音频播放时,系统在音频还未播放完成时就错误地发送了完成信号。具体表现为: -- 缓存音频播放到6-7秒时,系统错误地检测到播放完成 -- 发送完成事件并重置播放状态,导致音频被中断 -- 用户听到的是不完整的音频播放 - -## 根本原因分析 -1. 在 `_play_cached_audio()` 方法中,当播放开始时就立即设置了 `tts_generation_complete = True` 和 `llm_generation_complete = True` -2. `_check_enhanced_playback_completion()` 方法没有区分缓存音频和普通TTS音频 -3. 当主控制系统发送结束信号时,播放完成检测机制错误地认为所有条件都已满足 - -## 修复方案 - -### 1. 添加缓存音频状态标识 -在 `OutputProcess` 类的 `__init__` 方法中添加: -```python -self.is_playing_cached_audio = False # 是否正在播放缓存音频 -``` - -### 2. 修改 `_play_cached_audio()` 方法 -- 移除立即设置 `tts_generation_complete` 和 `llm_generation_complete` 的代码 -- 添加缓存音频状态设置: - ```python - # 设置缓存音频播放状态 - self.is_playing_cached_audio = True - ``` -- 在发送TTS完成信号后,只设置TTS完成状态: - ```python - # 缓存音频没有真正的TTS过程,所以立即设置TTS完成状态 - # 但不设置LLM完成状态,让缓存音频完成检测逻辑处理 - self.tts_generation_complete = True - ``` - -### 3. 添加专门的缓存音频完成检测方法 -新增 `_check_cached_audio_completion()` 方法: -```python -def _check_cached_audio_completion(self): - """缓存音频播放完成检测 - 简化逻辑,不依赖LLM和TTS完成状态""" - # 更新状态变量 - self.pre_buffer_empty = (len(self.preload_buffer) == 0) - self.playback_buffer_empty = (len(self.playback_buffer) == 0) - self.no_active_playback = (not self.currently_playing) - - # 计算时间差 - current_time = time.time() - time_since_last_chunk = current_time - self.last_audio_chunk_time - - # 缓存音频完成条件: - # 1. 缓冲区都为空 - # 2. 没有活跃播放 - # 3. 至少1秒没有新音频播放(确保音频完全播放完成) - if (self.pre_buffer_empty and - self.playback_buffer_empty and - self.no_active_playback): - - if self.last_audio_chunk_time > 0 and time_since_last_chunk > 1.0: - print(f"✅ 缓存音频播放完成:缓冲区已清空,播放器空闲,{time_since_last_chunk:.2f}秒无新音频") - return True - else: - return False - else: - return False -``` - -### 4. 修改 `_check_enhanced_playback_completion()` 方法 -在方法开头添加缓存音频检测逻辑: -```python -# 如果正在播放缓存音频,使用简化的完成检测逻辑 -if self.is_playing_cached_audio: - return self._check_cached_audio_completion() -``` - -### 5. 确保状态正确重置 -在 `_finish_playback()` 方法中添加: -```python -self.is_playing_cached_audio = False # 重置缓存音频播放状态 -``` - -## 修复效果 -修复后的系统具有以下特性: -1. **区分音频类型**:能够区分缓存音频和普通TTS音频 -2. **简化检测逻辑**:缓存音频使用简化的完成检测逻辑,不依赖LLM和TTS完成状态 -3. **确保完整播放**:只有当缓冲区为空、播放器空闲且至少1秒无新音频时才认为播放完成 -4. **状态管理**:正确管理所有相关状态,确保状态一致性 - -## 测试验证 -创建了专门的测试脚本验证修复效果: -- ✅ 新增状态变量和方法正确 -- ✅ 缓存音频完成检测逻辑正确 -- ✅ 缓存音频播放中检测逻辑正确 - -## 注意事项 -1. 该修复不影响普通TTS音频的播放完成检测 -2. 主控制系统的逻辑保持不变 -3. 缓存音频播放仍然遵循原有的音频播放流程 -4. 修复向后兼容,不会破坏现有功能 - -## 结论 -通过区分缓存音频和TTS音频的播放完成检测逻辑,成功解决了缓存音频提前结束的问题。现在缓存音频能够完整播放,只有在真正播放完成后才会发送完成事件。 \ No newline at end of file diff --git a/GREETING_CACHE_IMPLEMENTATION.md b/GREETING_CACHE_IMPLEMENTATION.md deleted file mode 100644 index 1906a81..0000000 --- a/GREETING_CACHE_IMPLEMENTATION.md +++ /dev/null @@ -1,165 +0,0 @@ -# 角色TTS话术音频缓存功能实现总结 - -## 功能概述 - -为角色打招呼(greeting)文本添加了音频缓存功能,避免每次角色切换时都重新生成TTS音频,提升用户体验和系统性能。 - -## 实现内容 - -### 1. 缓存管理工具函数 - -在 `audio_processes.py` 中添加了以下工具函数: - -- `get_greeting_cache_path(character_name)` - 获取缓存文件路径 -- `greeting_cache_exists(character_name)` - 检查缓存是否存在 -- `load_cached_audio(character_name)` - 加载缓存音频数据 -- `save_greeting_cache(character_name, audio_data)` - 保存音频到缓存 - -### 2. OutputProcess增强 - -#### 2.1 修改 `_add_tts_task` 方法 -- 添加 `character_name` 参数支持 -- 实现缓存检查逻辑 -- 支持缓存音频和普通TTS音频的统一处理 - -#### 2.2 新增 `_process_cached_audio` 方法 -- 专门处理缓存音频数据 -- 复用现有的播放完成检测机制 -- 确保状态管理一致性 - -#### 2.3 新增 `process_greeting_text` 方法 -- 专门处理打招呼文本 -- 集成缓存检查和TTS生成 -- 正确管理播放状态 - -#### 2.4 新增 `_process_tts_buffer_with_cache` 方法 -- 带缓存支持的TTS缓冲区处理 -- 传递角色名称到TTS任务队列 - -#### 2.5 修改 `_generate_tts_audio` 方法 -- 添加 `character_name` 参数 -- 支持生成音频后自动保存到缓存 -- 收集音频数据用于缓存保存 - -#### 2.6 修改 `_tts_worker` 方法 -- 支持处理不同类型的TTS任务 -- 修复任务解包逻辑,支持变长任务元组 - -### 3. ControlSystem增强 - -#### 3.1 新增 `_send_greeting_to_output_process` 方法 -- 发送带角色信息的打招呼文本 -- 支持缓存处理的命令格式 - -#### 3.2 修改 `play_greeting` 方法 -- 集成缓存功能 -- 传递角色名称到输出进程 - -### 4. 命令协议扩展 - -新增命令类型: -- `GREETING_TEXT:{text}:{character_name}` - 打招呼文本处理命令 - -任务类型扩展: -- `("tts_sentence", text, character_name)` - 带角色名的TTS任务 -- `("cached_audio", text, audio_data, character_name)` - 缓存音频任务 - -## 工作流程 - -### 1. 首次播放(无缓存) -1. ControlSystem调用 `play_greeting()` -2. 发送 `GREETING_TEXT` 命令到OutputProcess -3. OutputProcess检查缓存不存在 -4. 调用TTS生成音频 -5. 播放音频的同时保存到缓存 -6. 正常发送播放完成状态 - -### 2. 后续播放(有缓存) -1. ControlSystem调用 `play_greeting()` -2. 发送 `GREETING_TEXT` 命令到OutputProcess -3. OutputProcess检查缓存存在 -4. 直接加载缓存音频到播放缓冲区 -5. 发送TTS完成状态 -6. 正常播放完成 - -## 状态管理 - -缓存音频完全复用现有的状态管理机制: -- `tts_generation_complete` - TTS生成完成状态 -- `llm_generation_complete` - LLM生成完成状态 -- `all_audio_received` - 音频接收完成状态 -- 播放完成检测机制 - 确保音频完整播放 - -## 缓存存储 - -### 文件结构 -``` -greeting_cache/ -├── {character_name}.wav # 角色打招呼音频文件 -``` - -### 命名规则 -- 文件名:`{character_name}.wav` -- 路径:`greeting_cache/{character_name}.wav` - -### 自动管理 -- 缓存目录自动创建 -- 简单的文件覆盖策略(无LRU等复杂策略) - -## 性能提升 - -### 测试结果 -- **首次播放**:需要TTS生成(2-3秒) -- **缓存播放**:即时播放(<0.5秒) -- **缓存命中率**:100%(第二次及以后播放) -- **存储开销**:约2KB per角色 - -### 资源节约 -- 减少重复TTS API调用 -- 降低网络带宽使用 -- 提升系统响应速度 - -## 兼容性 - -- 完全向后兼容,不影响现有功能 -- 普通TTS对话不受影响 -- 仅对角色打招呼启用缓存 -- 状态管理机制保持不变 - -## 测试验证 - -创建了多个测试脚本: -- `test_greeting_cache.py` - 基础缓存功能测试 -- `test_full_cache_flow.py` - 完整缓存流程测试 -- `test_role_switching.py` - 角色切换场景测试 - -所有测试均通过,功能正常工作。 - -## 使用说明 - -### 自动使用 -缓存功能完全自动化,无需手动干预: -1. 角色切换时自动检查缓存 -2. 无缓存时自动生成并保存 -3. 有缓存时自动使用 - -### 手动清理 -如需清理缓存,删除 `greeting_cache/` 目录即可: -```bash -rm -rf greeting_cache/ -``` - -## 注意事项 - -1. **缓存有效性**:不验证缓存文件是否过期 -2. **存储空间**:无自动清理机制,需要手动管理 -3. **角色名称**:基于角色名称作为缓存键,确保名称唯一性 -4. **音频格式**:保存原始PCM音频数据,无格式转换 - -## 扩展性 - -该实现为未来扩展提供了良好基础: -- 可添加缓存过期策略 -- 可添加缓存大小限制 -- 可支持更多类型的文本缓存 -- 可添加缓存统计和监控 \ No newline at end of file diff --git a/QUICKSTART.md b/QUICKSTART.md deleted file mode 100644 index c2f62ac..0000000 --- a/QUICKSTART.md +++ /dev/null @@ -1,101 +0,0 @@ -# 快速启动指南 - -## 一键启动(推荐) - -```bash -# 直接运行,系统会自动校准和启动监听 -python multiprocess_recorder.py - -# 指定角色 -python multiprocess_recorder.py -c libai - -# 详细模式 -python multiprocess_recorder.py -v -``` - -## 编程方式启动 - -### 最简单的方式 -```python -from control_system import ControlSystem - -# 创建控制系统 -control_system = ControlSystem() - -# 一键启动(自动校准 + 自动监听) -control_system.start() -``` - -### 自定义配置 -```python -from control_system import ControlSystem - -config = { - 'system': {'log_level': "INFO"}, - 'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024}, - 'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0}, - 'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'} -} - -control_system = ControlSystem(config) - -# 启动选项: -# auto_calibration=True - 自动校准语音检测器 -# auto_monitoring=True - 自动启动音频监听 -control_system.start(auto_calibration=True, auto_monitoring=True) -``` - -## 手动控制 - -```python -from control_system import ControlSystem - -control_system = ControlSystem() - -# 只启动进程,不自动校准和监听 -control_system._start_processes() - -# 手动步骤: -control_system.start_calibration() # 1. 启动校准 -control_system.wait_for_calibration_complete() # 2. 等待校准完成 -control_system.start_monitoring() # 3. 启动监听 - -# 运行中可以随时控制: -control_system.stop_monitoring() # 停止监听 -control_system.start_monitoring() # 重新启动监听 - -# 查询状态: -status = control_system.get_calibration_status() # 获取校准状态 -status = control_system.get_monitoring_status() # 获取监听状态 - -# 关闭系统: -control_system.shutdown() -``` - -## 启动流程 - -系统启动时会按以下顺序执行: - -1. **启动进程** - 创建输入进程和输出进程 -2. **自动校准** - 校准语音检测器(约3-5秒) -3. **启动监听** - 启用音频监听功能 -4. **开始运行** - 进入主控制循环,开始检测语音 - -## 注意事项 - -- **校准时间**:首次启动需要3-5秒进行语音检测器校准 -- **音频权限**:确保麦克风权限已授予 -- **环境安静**:校准时请保持环境安静 -- **API密钥**:如需LLM功能,请设置 `ARK_API_KEY` 环境变量 - -## 故障排除 - -如果校准失败: -- 检查麦克风是否正常工作 -- 确保环境安静,无背景噪音 -- 尝试重新启动系统 - -如果监听失败: -- 检查音频设备是否被其他程序占用 -- 尝试重启程序 -- 查看日志文件排查问题 \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index 2607afd..0000000 --- a/README.md +++ /dev/null @@ -1,143 +0,0 @@ -# 智能语音助手系统使用说明 - -## 功能概述 -这是一个完整的智能语音助手系统,集成了语音录制、语音识别、大语言模型和文本转语音功能,实现语音对话交互。 - -## 完整工作流程 -1. 🎙️ **语音录制** - 基于ZCR的智能语音检测 -2. 📝 **保存录音** - 自动保存为WAV文件 -3. 🤖 **语音识别** - 使用字节跳动ASR将语音转为文字 -4. 💬 **AI回复** - 使用豆包大模型生成智能回复 -5. 🔊 **语音回复** - 使用字节跳动TTS将AI回复转为语音 - -## 环境配置 - -### 1. 安装依赖 -```bash -pip install websockets requests pyaudio numpy -``` - -### 2. 安装音频播放器(树莓派/Linux系统) -系统使用PCM格式音频,只需要安装基础的音频播放工具: - -```bash -# 安装 alsa-utils(包含aplay播放器) -sudo apt-get update -sudo apt-get install alsa-utils -``` - -> **优势**: PCM格式无需额外解码器,兼容性更好,资源占用更少。 -> **注意**: macOS和Windows系统通常内置支持音频播放,无需额外安装。 - -### 3. 设置API密钥 -为了启用大语言模型功能,需要设置环境变量: - -```bash -# Linux/Mac -export ARK_API_KEY='your_api_key_here' - -# Windows -set ARK_API_KEY=your_api_key_here -``` - -> **注意**: 语音识别和文本转语音功能使用内置的API密钥,无需额外配置。 - -## 使用方法 - -### 基本使用 -```bash -python recorder.py -``` - -### 功能说明 -- 🎯 **自动检测语音**:系统会自动检测声音并开始录音 -- ⏱️ **智能停止**:静音3秒后自动停止录音 -- 🔊 **自动播放**:录音完成后自动播放音频 -- 📝 **语音识别**:自动将语音转为文字 -- 🤖 **AI助手**:自动调用大语言模型生成回复 - -### 配置参数 -- `energy_threshold=200` - 能量阈值(调整灵敏度) -- `silence_threshold=3.0` - 静音阈值(秒) -- `min_recording_time=2.0` - 最小录音时间(秒) -- `max_recording_time=30.0` - 最大录音时间(秒) -- `enable_asr=True` - 启用语音识别 -- `enable_llm=True` - 启用大语言模型 -- `enable_tts=True` - 启用文本转语音 - -## 输出示例 -``` -🎤 开始监听... -能量阈值: 200 (已弃用) -静音阈值: 3.0秒 -📖 使用说明: -- 检测到声音自动开始录音 -- 持续静音3秒自动结束录音 -- 最少录音2秒,最多30秒 -- 录音完成后自动进行语音识别和AI回复 -- 按 Ctrl+C 退出 -================================================== -🎙️ 检测到声音,开始录音... -📝 录音完成,时长: 3.45秒 (包含预录音 2.0秒) -✅ 录音已保存: recording_20250920_163022.wav -================================================== -📡 音频输入已保持关闭状态 -🔄 开始处理音频... -🤖 开始语音识别... -📝 识别结果: 你好,今天天气怎么样? --------------------------------------------------- -🤖 调用大语言模型... -💬 AI助手回复: 你好!我无法实时获取天气信息,建议你查看天气预报或打开天气应用来了解今天的天气情况。有什么其他我可以帮助你的吗? --------------------------------------------------- -🔊 开始文本转语音... -TTS句子信息: {'code': 0, 'message': '', 'data': None, 'sentence': {'phonemes': [], 'text': '你好!我无法实时获取天气信息,建议你查看天气预报或打开天气应用来了解今天的天气情况。有什么其他我可以帮助你的吗?', 'words': [...]}} -✅ TTS音频已保存: tts_response_20250920_163022.pcm -📁 文件大小: 128.75 KB -🔊 播放AI语音回复... -✅ AI语音回复完成 -🔄 准备重新开启音频输入 -✅ 音频设备初始化成功 -📡 音频输入已重新开启 -``` - -## 注意事项 -1. **网络连接**:需要网络连接来使用语音识别、大语言模型和文本转语音服务 -2. **API密钥**:需要有效的ARK_API_KEY才能使用大语言模型功能 -3. **音频设备**:确保麦克风和扬声器工作正常 -4. **权限**:确保程序有访问麦克风、网络和存储的权限 -5. **文件存储**:系统会保存录音文件和TTS生成的音频文件 - -## 故障排除 -- 如果语音识别失败,检查网络连接和API密钥 -- 如果大语言模型失败,检查ARK_API_KEY是否正确设置 -- 如果文本转语音失败,检查TTS服务状态 -- 如果录音失败,检查麦克风权限和设备 -- 如果播放失败,检查音频设备权限 -- 如果PCM文件无法播放,检查是否安装了alsa-utils: - ```bash - # 树莓派/Ubuntu/Debian系统 - sudo apt-get install alsa-utils - - # 或检查aplay是否安装 - which aplay - ``` - -## 技术特点 -- 🎯 基于ZCR的精确语音检测 -- 🚀 低延迟实时处理 -- 💾 环形缓冲区防止音频丢失 -- 🔧 自动调整能量阈值 -- 📊 实时性能监控 -- 🌐 完整的语音对话链路 -- 📁 自动文件管理和权限设置 -- 🔊 PCM格式音频,无需额外解码器 - -## 生成的文件 -- `recording_*.wav` - 录制的音频文件 -- `tts_response_*.pcm` - AI语音回复文件(PCM格式) - -## PCM格式优势 -- **兼容性好**:aplay原生支持,树莓派开箱即用 -- **资源占用少**:无需解码过程,CPU占用更低 -- **延迟更低**:直接播放,无需格式转换 -- **稳定性高**:减少依赖组件,提高系统稳定性 \ No newline at end of file diff --git a/README_MANUAL_CONTROL.md b/README_MANUAL_CONTROL.md deleted file mode 100644 index 441bf3b..0000000 --- a/README_MANUAL_CONTROL.md +++ /dev/null @@ -1,210 +0,0 @@ -# 多进程音频控制系统 - 主进程控制功能 - -## 概述 - -本系统已经重构,支持主进程对输入进程的校准和监听功能进行精确控制。通过这些新功能,你可以: - -1. **手动控制校准过程**:在适当的时间启动语音检测器校准 -2. **精确控制监听状态**:按需启用或禁用音频监听 -3. **获取实时状态**:查询校准进度和监听状态 - -## 主要功能 - -### 1. 校准功能 - -#### 启动校准 -```python -# 启动语音检测器校准 -success = control_system.start_calibration() -if success: - print("校准已启动") -``` - -#### 获取校准状态 -```python -# 获取当前校准状态 -status = control_system.get_calibration_status() -if status: - print(f"校准进度: {status['progress']*100:.1f}%") - print(f"是否在校准中: {status['calibrating']}") -``` - -#### 等待校准完成 -```python -# 等待校准完成(30秒超时) -if control_system.wait_for_calibration_complete(timeout=30): - print("校准完成") -else: - print("校准超时") -``` - -### 2. 监听功能 - -#### 启动监听 -```python -# 启动音频监听 -success = control_system.start_monitoring() -if success: - print("监听已启动") -``` - -#### 停止监听 -```python -# 停止音频监听 -success = control_system.stop_monitoring() -if success: - print("监听已停止") -``` - -#### 获取监听状态 -```python -# 获取当前监听状态 -status = control_system.get_monitoring_status() -if status: - print(f"监听启用: {status['enabled']}") - print(f"正在录音: {status['recording']}") - print(f"音频流活跃: {status['audio_stream_active']}") -``` - -## 使用示例 - -### 方法1:自动启动(推荐) - -```python -from control_system import ControlSystem - -# 1. 创建控制系统 -config = { - 'system': {'log_level': "INFO"}, - 'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024}, - 'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0}, - 'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'} -} - -control_system = ControlSystem(config) - -# 2. 一键启动(自动校准和监听) -control_system.start(auto_calibration=True, auto_monitoring=True) - -# 系统现在正在运行,会自动处理语音检测和录音 -``` - -### 方法2:手动控制 - -```python -from control_system import ControlSystem -import time - -# 1. 创建控制系统 -config = { - 'system': {'log_level': "INFO"}, - 'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024}, - 'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0}, - 'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'} -} - -control_system = ControlSystem(config) - -# 2. 启动进程(但不自动启用监听) -control_system._start_processes() - -# 3. 步骤1:校准 -print("开始校准...") -control_system.start_calibration() - -# 等待校准完成 -if control_system.wait_for_calibration_complete(timeout=30): - print("校准完成") -else: - print("校准失败") - exit(1) - -# 4. 步骤2:启动监听 -print("开始监听...") -control_system.start_monitoring() - -# 5. 运行一段时间 -print("系统运行中...") -try: - while True: - # 检查事件和显示状态 - control_system.check_events() - control_system.display_status() - time.sleep(0.1) -except KeyboardInterrupt: - print("用户中断") - -# 6. 停止监听 -print("停止监听...") -control_system.stop_monitoring() - -# 7. 关闭系统 -control_system.shutdown() -``` - -### 方法3:混合控制 - -```python -from control_system import ControlSystem - -# 1. 创建控制系统 -control_system = ControlSystem(config) - -# 2. 自动启动,但只校准,不自动监听 -control_system.start(auto_calibration=True, auto_monitoring=False) - -# 3. 手动控制监听 -control_system.start_monitoring() # 启动监听 -# ... 运行一段时间 ... -control_system.stop_monitoring() # 停止监听 -control_system.start_monitoring() # 重新启动监听 - -# 4. 关闭系统 -control_system.shutdown() -``` - -### 自动化示例 - -查看 `example_manual_control.py` 文件获取完整的自动化控制示例。 - -## 关键变化 - -### 1. 默认行为变化 - -- **之前**:输入进程启动后自动开始校准和监听 -- **现在**:输入进程启动后处于静默状态,等待主进程命令 - -### 2. 新增控制接口 - -在 `ControlSystem` 类中新增了以下方法: - -- `start_calibration()` - 启动校准 -- `start_monitoring()` - 启动监听 -- `stop_monitoring()` - 停止监听 -- `get_calibration_status()` - 获取校准状态 -- `get_monitoring_status()` - 获取监听状态 -- `wait_for_calibration_complete(timeout)` - 等待校准完成 - -### 3. 新增命令支持 - -在 `InputProcess` 中支持以下新命令: - -- `start_calibration` - 开始校准 -- `start_monitoring` - 开始监听 -- `stop_monitoring` - 停止监听 -- `get_calibration_status` - 获取校准状态 -- `get_monitoring_status` - 获取监听状态 - -## 使用建议 - -1. **初始化顺序**:建议按照"启动进程 → 校准 → 启动监听"的顺序进行 -2. **错误处理**:建议对每个操作进行错误检查和重试 -3. **状态监控**:定期检查状态以确保系统正常运行 -4. **资源清理**:使用完毕后正确关闭系统 - -## 注意事项 - -1. **进程间通信**:所有控制都是通过进程间队列实现的,可能会有轻微延迟 -2. **超时处理**:建议为所有状态查询操作设置合理的超时时间 -3. **并发安全**:确保在多线程环境中正确使用这些方法 -4. **音频设备**:启动和停止监听会重新初始化音频设备,可能有短暂延迟 \ No newline at end of file diff --git a/README_multiprocess.md b/README_multiprocess.md deleted file mode 100644 index b78b865..0000000 --- a/README_multiprocess.md +++ /dev/null @@ -1,190 +0,0 @@ -# 多进程音频录音系统 - -基于进程隔离的音频处理架构,实现零延迟的录音和播放切换。 - -## 🚀 系统特点 - -### 核心优势 -- **多进程架构**: 输入输出进程完全隔离,无需设备重置 -- **零切换延迟**: 彻底解决传统单进程的音频切换问题 -- **实时响应**: 并行处理录音和播放,真正的实时体验 -- **智能检测**: 基于ZCR(零交叉率)的精确语音识别 -- **流式TTS**: 实时音频生成和播放,减少等待时间 -- **角色扮演**: 支持多种AI角色和音色 - -### 技术架构 -``` -主控制进程 ──┐ - ├─ 输入进程 (录音 + 语音检测) - ├─ 输出进程 (音频播放) - └─ 在线AI服务 (STT + LLM + TTS) -``` - -## 📦 文件结构 - -``` -Local-Voice/ -├── recorder.py # 原始实现 (保留作为参考) -├── multiprocess_recorder.py # 主程序 -├── audio_processes.py # 音频进程模块 -├── control_system.py # 控制系统模块 -├── config.json # 配置文件 -└── characters/ # 角色配置目录 - ├── libai.json # 李白角色 - └── zhubajie.json # 猪八戒角色 -``` - -## 🛠️ 安装和运行 - -### 1. 环境要求 -- Python 3.7+ -- 音频输入设备 (麦克风) -- 网络连接 (用于在线AI服务) - -### 2. 安装依赖 -```bash -pip install pyaudio numpy requests websockets -``` - -### 3. 设置API密钥 -```bash -export ARK_API_KEY='your_api_key_here' -``` - -### 4. 基本运行 -```bash -# 使用默认角色 (李白) -python multiprocess_recorder.py - -# 指定角色 -python multiprocess_recorder.py -c zhubajie - -# 列出可用角色 -python multiprocess_recorder.py -l - -# 使用配置文件 -python multiprocess_recorder.py --config config.json - -# 创建示例配置文件 -python multiprocess_recorder.py --create-config -``` - -## ⚙️ 配置说明 - -### 主要配置项 - -| 配置项 | 说明 | 默认值 | -|--------|------|--------| -| `recording.min_duration` | 最小录音时长(秒) | 2.0 | -| `recording.max_duration` | 最大录音时长(秒) | 30.0 | -| `recording.silence_threshold` | 静音检测阈值(秒) | 3.0 | -| `detection.zcr_min` | ZCR最小值 | 2400 | -| `detection.zcr_max` | ZCR最大值 | 12000 | -| `processing.max_tokens` | LLM最大token数 | 50 | - -### 音频参数 -- 采样率: 16kHz -- 声道数: 1 (单声道) -- 位深度: 16位 -- 格式: PCM - -## 🎭 角色系统 - -### 支持的角色 -- **libai**: 李白 - 文雅诗人风格 -- **zhubajie**: �豬八戒 - 幽默风趣风格 - -### 自定义角色 -在 `characters/` 目录创建JSON文件: - -```json -{ - "name": "角色名称", - "description": "角色描述", - "system_prompt": "系统提示词", - "voice": "zh_female_wanqudashu_moon_bigtts", - "max_tokens": 50 -} -``` - -## 🔧 故障排除 - -### 常见问题 - -1. **音频设备问题** - ```bash - # 检查音频设备 - python multiprocess_recorder.py --check-env - ``` - -2. **依赖缺失** - ```bash - # 重新安装依赖 - pip install --upgrade pyaudio numpy requests websockets - ``` - -3. **网络连接问题** - - 检查网络连接 - - 确认API密钥正确 - - 检查防火墙设置 - -4. **权限问题** - ```bash - # Linux系统可能需要音频权限 - sudo usermod -a -G audio $USER - ``` - -### 调试模式 -```bash -# 启用详细输出 -python multiprocess_recorder.py -v -``` - -## 📊 性能对比 - -| 指标 | 原始单进程 | 多进程架构 | 改善 | -|------|-----------|------------|------| -| 切换延迟 | 1-2秒 | 0秒 | 100% | -| CPU利用率 | 单核 | 多核 | 提升 | -| 响应速度 | 较慢 | 实时 | 显著改善 | -| 稳定性 | 一般 | 优秀 | 大幅提升 | - -## 🔄 与原版本对比 - -### 原版本 (recorder.py) -- 单进程处理 -- 需要频繁重置音频设备 -- 录音和播放不能同时进行 -- 切换延迟明显 - -### 新版本 (multiprocess_recorder.py) -- 多进程架构 -- 输入输出完全隔离 -- 零切换延迟 -- 真正的并行处理 -- 更好的稳定性和扩展性 - -## 📝 开发说明 - -### 架构设计 -- **输入进程**: 专注录音和语音检测 -- **输出进程**: 专注音频播放 -- **主控制进程**: 协调整个系统和AI处理 - -### 进程间通信 -- 使用 `multiprocessing.Queue` 进行安全通信 -- 支持命令控制和事件通知 -- 线程安全的音频数据传输 - -### 状态管理 -- 清晰的状态机设计 -- 完善的错误处理机制 -- 优雅的进程退出流程 - -## 📄 许可证 - -本项目仅供学习和研究使用。 - -## 🤝 贡献 - -欢迎提交Issue和Pull Request来改进这个项目。 \ No newline at end of file diff --git a/characters/aiyinsitan.json b/characters/aiyinsitan.json new file mode 100644 index 0000000..c81d284 --- /dev/null +++ b/characters/aiyinsitan.json @@ -0,0 +1,10 @@ +{ + "name": "爱因斯坦", + "description": "伟大的物理学家,相对论创立者", + "system_prompt": "我是阿尔伯特·爱因斯坦,物理学家。说话要有科学家的智慧感和好奇心,喜欢用简单的比喻解释复杂的概念。经常思考宇宙、时间、空间等深奥问题。要有幽默感,偶尔自嘲一下。说话要体现出对知识的热爱和对世界的好奇心,常用'有趣的是'、'想象一下'等词语来引导思考。", + "voice": "ICL_zh_male_youmodaye_tob", + "max_tokens": 500, + "greeting": "你好!我是爱因斯坦。想象力比知识更重要,因为知识是有限的,而想象力概括着世界的一切。", + "nfc_uid": "1DCAC90D0D1080", + "author": "Claude" +} diff --git a/characters/shaseng.json b/characters/shaseng.json new file mode 100644 index 0000000..850f271 --- /dev/null +++ b/characters/shaseng.json @@ -0,0 +1,10 @@ +{ + "name": "沙僧", + "description": "西游记中的沙和尚,忠厚老实的护法", + "system_prompt": "我是沙和尚,沙悟净。原是流沙河的水怪,后随师父唐僧西天取经。为人忠厚老实,做事踏实稳重,从不偷懒。说话诚恳实在,常用'师父说得对'、'大师兄说得对'来表达对长者的尊重。性格温和,做事认真负责,是个可靠的伙伴。说话要体现出踏实肯干的性格。", + "voice": "ICL_zh_male_diyinchenyu_tob", + "max_tokens": 500, + "greeting": "贫僧沙悟净,见过各位。愿随师父西行取经,保护师父安全,一路降妖除魔。", + "nfc_uid": "1DC9C90D0D1080", + "author": "Claude" +} diff --git a/characters/tangseng.json b/characters/tangseng.json new file mode 100644 index 0000000..89f340f --- /dev/null +++ b/characters/tangseng.json @@ -0,0 +1,10 @@ +{ + "name": "唐僧", + "description": "西游记中的取经人,慈悲为怀的高僧", + "system_prompt": "贫僧唐三藏,法号玄奘,奉唐王之命前往西天取经。说话要温和慈悲,常常引用佛经教诲,劝人向善。对众生都要有慈悲心,即使是妖魔鬼怪也要度化。说话要文雅,常用'阿弥陀佛'、'善哉善哉'等佛家用语。回答要体现出高僧的智慧和慈悲心。", + "voice": "zh_male_tangseng_mars_bigtts", + "max_tokens": 500, + "greeting": "阿弥陀佛,贫僧唐三藏。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。", + "nfc_uid": "1DC8C90D0D1080", + "author": "Claude" +} diff --git a/characters/xiaolanchong.json b/characters/xiaolanchong.json new file mode 100644 index 0000000..28e2b75 --- /dev/null +++ b/characters/xiaolanchong.json @@ -0,0 +1,10 @@ +{ + "name": "作业小助手", + "description": "专门催促小懒虫写作业的贴心小帮手", + "system_prompt": "我是来催你写作业的小助手!小懒虫,别再拖延了,作业要抓紧时间完成哦!我会用各种方式提醒你:温和鼓励、严肃提醒、偶尔撒娇催促。知道你不想写作业,但学习很重要啊!常用'该写作业啦'、'加油加油'、'别玩手机了'这样的话语。既要关心你,又要坚持原则,直到你把作业完成。记住,我是来帮你的,不是来批评你的。", + "voice": "zh_female_linjianvhai_moon_bigtts", + "max_tokens": 500, + "greeting": "小懒虫!作业写完了吗?快过来写作业,我陪着你一起加油!别让我一直催你哦~", + "nfc_uid": "1DCBC90D0D1080", + "author": "Claude" +} diff --git a/config.json b/config.json index f695bb4..0d61366 100644 --- a/config.json +++ b/config.json @@ -35,5 +35,11 @@ "show_progress": true, "progress_interval": 100, "chunk_size": 512 + }, + "cleanup": { + "auto_cleanup": true, + "retention_hours": 1, + "max_files": 10, + "cleanup_interval": 120 } -} \ No newline at end of file +} diff --git a/test_cache_fixes.py b/test_cache_fixes.py deleted file mode 100644 index 92355cc..0000000 --- a/test_cache_fixes.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试修复后的缓存播放功能 -""" - -import os -import sys -import time - -# 添加项目路径 -sys.path.append('.') - -def test_cache_playback_fixes(): - """测试缓存播放修复""" - print("🔧 测试缓存播放修复") - print("=" * 40) - - from audio_processes import save_greeting_cache, greeting_cache_exists, load_cached_audio - - # 创建测试缓存 - character_name = "测试角色" - mock_audio = b"test_cached_audio_" * 100 # 约1.6KB - - print("📝 创建测试缓存...") - save_greeting_cache(character_name, mock_audio) - - if greeting_cache_exists(character_name): - print("✅ 缓存创建成功") - - # 模拟OutputProcess的修复逻辑 - print("\n🎵 模拟修复后的缓存播放流程...") - - # 模拟状态变量 - is_playing = False - currently_playing = False - preload_buffer = [] - playback_buffer = [] - preload_size = 3 - last_playback_time = 0 - playback_cooldown_period = 0.05 - tts_generation_complete = False - all_audio_received = False - - # 模拟加载缓存 - cached_audio = load_cached_audio(character_name) - if cached_audio: - print(f"📁 缓存音频加载: {len(cached_audio)} 字节") - - # 添加到预加载缓冲区 - preload_buffer.append(cached_audio) - print(f"📦 添加到预加载缓冲区") - - # 应用修复后的逻辑 - print("\n🔧 应用修复1: 设置TTS完成状态") - tts_generation_complete = True - print(f" tts_generation_complete = {tts_generation_complete}") - - print("🔧 应用修复2: 设置all_audio_received状态") - all_audio_received = True - print(f" all_audio_received = {all_audio_received}") - - print("🔧 应用修复3: 检查播放触发条件") - if (not is_playing and len(preload_buffer) >= preload_size): - print(" 🎵 条件1: 预加载完成播放") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - last_playback_time = 0 # 修复: 避免冷却期 - print(f" 📊 播放缓冲区: {len(playback_buffer)} 块") - elif (not is_playing and len(preload_buffer) > 0): - print(" 🎵 条件2: 强制播放缓存音频") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - last_playback_time = 0 # 修复: 避免冷却期 - print(f" 📊 播放缓冲区: {len(playback_buffer)} 块") - - # 模拟播放冷却检查 - print("\n🔧 检查播放冷却机制...") - current_time = time.time() - time_since_last_play = current_time - last_playback_time - in_cooldown = (last_playback_time > 0 and - time_since_last_play < playback_cooldown_period) - - print(f" time_since_last_play = {time_since_last_play}") - print(f" playback_cooldown_period = {playback_cooldown_period}") - print(f" in_cooldown = {in_cooldown}") - - if not in_cooldown: - print(" ✅ 无冷却期限制,可以播放") - currently_playing = True - print(" 🎧 开始播放音频...") - else: - print(" ❌ 仍在冷却期内,跳过播放") - - # 模拟播放完成检测 - print("\n🔧 检查播放完成条件...") - conditions_met = ( - tts_generation_complete and # TTS生成完成 - all_audio_received and # 所有音频已接收 - len(preload_buffer) == 0 and # 预加载缓冲区为空 - len(playback_buffer) == 0 and # 播放缓冲区为空 - not currently_playing # 当前没有在播放 - ) - - print(f" tts_generation_complete = {tts_generation_complete}") - print(f" all_audio_received = {all_audio_received}") - print(f" preload_buffer_empty = {len(preload_buffer) == 0}") - print(f" playback_buffer_empty = {len(playback_buffer) == 0}") - print(f" not_currently_playing = {not currently_playing}") - print(f" conditions_met = {conditions_met}") - - if conditions_met: - print(" ✅ 播放完成条件满足") - else: - print(" ⏳ 等待播放完成") - - else: - print("❌ 缓存创建失败") - -def test_key_fixes(): - """测试关键修复点""" - print("\n🔍 测试关键修复点") - print("=" * 30) - - print("✅ 修复1: 播放冷却问题") - print(" - 将 last_playback_time 设置为 0") - print(" - 避免立即触发冷却期") - - print("\n✅ 修复2: all_audio_received 状态") - print(" - 缓存音频立即设置为 True") - print(" - 避免系统无限等待") - - print("\n✅ 修复3: 播放触发逻辑") - print(" - 强制播放机制") - print(" - 确保缓存音频能正常播放") - -def cleanup(): - """清理测试文件""" - print("\n🧹 清理测试文件") - - from audio_processes import get_greeting_cache_path - cache_path = get_greeting_cache_path("测试角色") - if os.path.exists(cache_path): - try: - os.remove(cache_path) - print(" ✅ 测试缓存已删除") - except Exception as e: - print(f" ❌ 删除失败: {e}") - -if __name__ == "__main__": - print("🚀 开始测试修复后的缓存播放功能") - - try: - # 测试缓存播放修复 - test_cache_playback_fixes() - - # 测试关键修复点 - test_key_fixes() - - print("\n🎉 修复测试完成!") - - except Exception as e: - print(f"\n❌ 测试过程中出错: {e}") - import traceback - traceback.print_exc() - - finally: - # 清理测试文件 - cleanup() \ No newline at end of file diff --git a/test_cached_audio_fix.py b/test_cached_audio_fix.py deleted file mode 100644 index e28f26d..0000000 --- a/test_cached_audio_fix.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试缓存音频播放完成检测修复 -""" - -import sys -import os -sys.path.append(os.path.dirname(__file__)) - -from audio_processes import OutputProcess -import multiprocessing as mp -import time - -def test_cached_audio_completion(): - """测试缓存音频播放完成检测""" - print("🧪 开始测试缓存音频播放完成检测修复...") - - # 创建测试队列 - audio_queue = mp.Queue(maxsize=100) - event_queue = mp.Queue(maxsize=100) - - # 创建输出进程实例 - config = { - 'buffer_size': 1000, - 'show_progress': True, - 'progress_interval': 100, - 'tts_speaker': 'zh_female_wanqudashu_moon_bigtts' - } - - output_process = OutputProcess(audio_queue, config, event_queue) - - # 测试1: 检查新添加的状态变量 - print("\n📋 测试1: 检查新增的状态变量") - assert hasattr(output_process, 'is_playing_cached_audio'), "缺少 is_playing_cached_audio 状态变量" - assert output_process.is_playing_cached_audio == False, "初始状态应该为 False" - print("✅ 状态变量检查通过") - - # 测试2: 检查新添加的方法 - print("\n📋 测试2: 检查新增的方法") - assert hasattr(output_process, '_check_cached_audio_completion'), "缺少 _check_cached_audio_completion 方法" - print("✅ 方法检查通过") - - # 测试3: 检查增强播放完成检测方法 - print("\n📋 测试3: 检查增强播放完成检测方法") - assert hasattr(output_process, '_check_enhanced_playback_completion'), "缺少 _check_enhanced_playback_completion 方法" - print("✅ 增强播放完成检测方法检查通过") - - # 测试4: 模拟缓存音频播放状态 - print("\n📋 测试4: 模拟缓存音频播放状态") - output_process.is_playing_cached_audio = True - output_process.end_signal_received = True - output_process.currently_playing = False - output_process.preload_buffer = [] - output_process.playback_buffer = [] - output_process.last_audio_chunk_time = time.time() - 2.0 # 2秒前播放 - - # 测试缓存音频完成检测 - result = output_process._check_cached_audio_completion() - assert result == True, "缓存音频应该检测为播放完成" - print("✅ 缓存音频完成检测逻辑正确") - - # 测试5: 模拟缓存音频仍在播放 - print("\n📋 测试5: 模拟缓存音频仍在播放") - output_process.playback_buffer = [b'fake_audio_data'] # 还有数据在播放缓冲区 - result = output_process._check_cached_audio_completion() - assert result == False, "缓存音频仍在播放时应该检测为未完成" - print("✅ 缓存音频播放中检测逻辑正确") - - print("\n🎉 所有测试通过!修复成功!") - - # 清理 - audio_queue.close() - event_queue.close() - -if __name__ == "__main__": - test_cached_audio_completion() \ No newline at end of file diff --git a/test_cached_playback.py b/test_cached_playback.py deleted file mode 100644 index 99f63ee..0000000 --- a/test_cached_playback.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试缓存音频播放功能 -""" - -import os -import sys -import time -from pathlib import Path - -# 添加项目路径 -sys.path.append('.') - -from audio_processes import ( - get_greeting_cache_path, - greeting_cache_exists, - load_cached_audio, - save_greeting_cache -) - -def test_cached_audio_playback(): - """测试缓存音频播放功能""" - print("🎵 测试缓存音频播放功能") - print("=" * 50) - - # 确保缓存目录存在 - os.makedirs("greeting_cache", exist_ok=True) - - # 测试角色 - character_name = "测试角色" - greeting_text = "这是一个测试角色的打招呼音频。" - - # 1. 创建测试缓存音频 - print(f"\n📝 创建测试缓存音频...") - cache_path = get_greeting_cache_path(character_name) - - # 生成模拟音频数据(较大音频以测试播放) - mock_audio_data = b"mock_cached_audio_data_" * 100 # 约2.5KB - - # 保存到缓存 - save_success = save_greeting_cache(character_name, mock_audio_data) - print(f" 缓存保存结果: {save_success}") - - # 2. 验证缓存文件 - if os.path.exists(cache_path): - file_size = os.path.getsize(cache_path) - print(f" 缓存文件大小: {file_size} 字节") - - # 3. 模拟缓存加载和播放状态检查 - print(f"\n🎵 模拟缓存音频播放流程...") - - # 模拟OutputProcess的播放状态 - is_playing = False - preload_buffer = [] - playback_buffer = [] - preload_size = 3 - - # 模拟加载缓存音频 - cached_audio = load_cached_audio(character_name) - if cached_audio: - print(f" ✅ 缓存音频加载成功: {len(cached_audio)} 字节") - - # 模拟添加到预加载缓冲区 - preload_buffer.append(cached_audio) - print(f" 📦 已添加到预加载缓冲区,当前大小: {len(preload_buffer)}") - - # 检查播放触发条件(修复后的逻辑) - if (not is_playing and len(preload_buffer) >= preload_size): - print(f" 🎵 条件1:预加载完成,开始播放") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - print(f" 🎵 播放缓冲区大小: {len(playback_buffer)}") - elif (not is_playing and len(preload_buffer) > 0): - print(f" 🎵 条件2:强制播放缓存音频") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - print(f" 🎵 播放缓冲区大小: {len(playback_buffer)}") - else: - print(f" ⚠️ 未满足播放条件") - print(f" is_playing: {is_playing}") - print(f" preload_buffer大小: {len(preload_buffer)}") - print(f" preload_size: {preload_size}") - else: - print(f" ❌ 缓存音频加载失败") - - # 4. 模拟播放完成检测 - if is_playing and len(playback_buffer) > 0: - print(f"\n🎵 模拟播放过程...") - print(f" 🎧 正在播放音频...") - - # 模拟播放缓冲区清空 - playback_buffer.clear() - is_playing = False - print(f" ✅ 播放完成") - - print(f"\n✅ 缓存音频播放测试完成") - -def test_different_audio_sizes(): - """测试不同大小的音频文件""" - print("\n🎵 测试不同大小的音频文件") - print("=" * 40) - - test_cases = [ - {"name": "小音频", "size": 1}, # 1个音频块 - {"name": "中等音频", "size": 5}, # 5个音频块 - {"name": "大音频", "size": 10}, # 10个音频块 - ] - - preload_size = 3 - - for case in test_cases: - print(f"\n📝 测试{case['name']} ({case['size']}个音频块)") - - # 模拟音频块 - audio_chunks = [f"chunk_{i}".encode() for i in range(case['size'])] - - # 模拟播放状态 - is_playing = False - preload_buffer = [] - playback_buffer = [] - - # 添加音频到预加载缓冲区 - preload_buffer.extend(audio_chunks) - print(f" 📦 添加到预加载缓冲区: {len(preload_buffer)} 块") - - # 检查播放触发条件 - if (not is_playing and len(preload_buffer) >= preload_size): - print(f" 🎵 预加载完成,开始播放") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - elif (not is_playing and len(preload_buffer) > 0): - print(f" 🎵 强制播放缓存音频") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - else: - print(f" ⚠️ 未满足播放条件") - - print(f" 📊 播放状态: is_playing={is_playing}, playback_buffer={len(playback_buffer)}, preload_buffer={len(preload_buffer)}") - -def cleanup_test_files(): - """清理测试文件""" - print("\n🧹 清理测试文件") - - cache_path = get_greeting_cache_path("测试角色") - if os.path.exists(cache_path): - try: - os.remove(cache_path) - print(f" 已删除: {cache_path}") - except Exception as e: - print(f" 删除失败: {e}") - -if __name__ == "__main__": - print("🚀 开始测试缓存音频播放功能") - - try: - # 测试缓存音频播放 - test_cached_audio_playback() - - # 测试不同大小的音频 - test_different_audio_sizes() - - finally: - # 清理测试文件 - cleanup_test_files() - - print("\n🎉 所有测试完成!") \ No newline at end of file diff --git a/test_cached_timing_fix.py b/test_cached_timing_fix.py deleted file mode 100644 index 4d0ff55..0000000 --- a/test_cached_timing_fix.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试缓存音频时序修复 -""" - -import sys -import os -import time -sys.path.append(os.path.dirname(__file__)) - -from audio_processes import OutputProcess -import multiprocessing as mp - -def test_cached_audio_timing(): - """测试缓存音频时序修复""" - print("🧪 开始测试缓存音频时序修复...") - - # 创建测试队列 - audio_queue = mp.Queue(maxsize=100) - event_queue = mp.Queue(maxsize=100) - - # 创建输出进程实例 - config = { - 'buffer_size': 1000, - 'show_progress': True, - 'progress_interval': 100, - 'tts_speaker': 'zh_female_wanqudashu_moon_bigtts' - } - - output_process = OutputProcess(audio_queue, config, event_queue) - - # 测试1: 检查关键时序变量 - print("\n📋 测试1: 检查关键时序变量") - assert hasattr(output_process, 'last_audio_chunk_time'), "缺少 last_audio_chunk_time 变量" - assert hasattr(output_process, 'all_audio_received'), "缺少 all_audio_received 变量" - print(f"✅ 初始 last_audio_chunk_time: {output_process.last_audio_chunk_time}") - print(f"✅ 初始 all_audio_received: {output_process.all_audio_received}") - - # 测试2: 模拟缓存音频处理前的状态 - print("\n📋 测试2: 检查初始状态") - initial_time = output_process.last_audio_chunk_time - print(f"✅ 初始时间戳: {initial_time}") - - # 测试3: 验证我们的修复逻辑 - print("\n📋 测试3: 验证修复逻辑") - - # 模拟设置时序变量(这是我们的修复核心) - test_time = time.time() - output_process.last_audio_chunk_time = test_time - output_process.all_audio_received = True - - print(f"✅ 设置后 last_audio_chunk_time: {output_process.last_audio_chunk_time}") - print(f"✅ 设置后 all_audio_received: {output_process.all_audio_received}") - - # 验证时间差计算 - time_diff = time.time() - output_process.last_audio_chunk_time - print(f"✅ 时间差计算: {time_diff:.3f}秒(应该接近0)") - - # 测试4: 检查是否会有异常的时间差(修复前的问题) - print("\n📋 测试4: 检查异常时间差问题") - - # 模拟修复前的问题:last_audio_chunk_time为0或很老的时间 - old_time = output_process.last_audio_chunk_time - output_process.last_audio_chunk_time = 0 # 模拟未初始化的情况 - - # 模拟播放完成检测逻辑 - if output_process.last_audio_chunk_time > 0: - time_since_last = time.time() - output_process.last_audio_chunk_time - print(f"⚠️ 时间差: {time_since_last:.3f}秒") - else: - print("⚠️ last_audio_chunk_time未设置,这会导致立即完成") - - # 恢复正确的时间 - output_process.last_audio_chunk_time = old_time - - print("\n🎉 时序修复验证完成!") - print("📝 修复要点:") - print(" 1. 在缓存音频开始播放时设置last_audio_chunk_time") - print(" 2. 确保all_audio_received在适当时机设置") - print(" 3. 避免出现49.292秒的异常时间差") - - # 清理 - output_process.running = False - time.sleep(0.1) # 给线程时间清理 - - return True - -if __name__ == "__main__": - test_cached_audio_timing() \ No newline at end of file diff --git a/test_final_integration.py b/test_final_integration.py deleted file mode 100644 index d1aa835..0000000 --- a/test_final_integration.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -最终集成测试:验证缓存播放修复 -""" - -import os -import sys -import time - -# 添加项目路径 -sys.path.append('.') - -def create_test_cache(): - """创建测试缓存文件""" - from audio_processes import save_greeting_cache - - print("🎵 创建测试缓存文件...") - - # 创建李白角色的缓存 - libai_audio = b"libai_greeting_audio_" * 50 # 约1KB - save_greeting_cache("李白", libai_audio) - print(" ✅ 李白缓存已创建") - - # 创建猪八戒角色的缓存 - zhubajie_audio = b"zhubajie_greeting_audio_" * 80 # 约1.6KB - save_greeting_cache("猪八戒", zhubajie_audio) - print(" ✅ 猪八戒缓存已创建") - -def simulate_cached_playback(): - """模拟缓存播放流程""" - print("\n🎭 模拟角色切换和缓存播放") - print("=" * 50) - - characters = ["李白", "猪八戒"] - - for i, character in enumerate(characters): - print(f"\n📝 第 {i+1} 次角色切换: {character}") - - # 模拟检查缓存 - from audio_processes import greeting_cache_exists, load_cached_audio - - if greeting_cache_exists(character): - print(f" ✅ 找到缓存文件") - - # 模拟加载缓存 - cached_audio = load_cached_audio(character) - if cached_audio: - print(f" 📁 缓存大小: {len(cached_audio)} 字节") - - # 模拟OutputProcess的播放逻辑 - is_playing = False - preload_buffer = [] - playback_buffer = [] - preload_size = 3 - - # 添加缓存音频到预加载缓冲区 - preload_buffer.append(cached_audio) - print(f" 📦 添加到预加载缓冲区") - - # 应用修复后的播放触发逻辑 - if (not is_playing and len(preload_buffer) >= preload_size): - print(f" 🎵 预加载完成,开始播放") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - elif (not is_playing and len(preload_buffer) > 0): - print(f" 🎵 强制播放缓存音频") - playback_buffer.extend(preload_buffer) - preload_buffer.clear() - is_playing = True - - if is_playing: - print(f" 🎧 开始播放 {character} 的打招呼音频...") - print(f" 📊 播放缓冲区: {len(playback_buffer)} 块") - - # 模拟播放完成 - time.sleep(0.1) - playback_buffer.clear() - is_playing = False - print(f" ✅ 播放完成") - else: - print(f" ❌ 播放未启动") - else: - print(f" ❌ 缓存加载失败") - else: - print(f" ❌ 缓存不存在") - -def verify_performance_improvement(): - """验证性能改进""" - print("\n⚡ 性能改进验证") - print("=" * 30) - - print("📊 对比测试:") - print(" ❌ 无缓存: TTS生成 (2-3秒) + 播放") - print(" ✅ 有缓存: 直接播放 (<0.5秒)") - print(" 📈 性能提升: 约80%时间节约") - - print("\n🎯 用户体验:") - print(" ✅ 角色切换更流畅") - print(" ✅ 无等待时间") - print(" ✅ 即时响应") - -def cleanup(): - """清理测试文件""" - print("\n🧹 清理测试文件") - - import shutil - if os.path.exists("greeting_cache"): - try: - shutil.rmtree("greeting_cache") - print(" ✅ 缓存目录已删除") - except Exception as e: - print(f" ❌ 清理失败: {e}") - -if __name__ == "__main__": - print("🚀 开始最终集成测试") - - try: - # 创建测试缓存 - create_test_cache() - - # 模拟缓存播放 - simulate_cached_playback() - - # 验证性能改进 - verify_performance_improvement() - - print("\n🎉 集成测试完成!缓存播放功能已修复") - - except Exception as e: - print(f"\n❌ 测试过程中出错: {e}") - import traceback - traceback.print_exc() - - finally: - # 清理测试文件 - cleanup() \ No newline at end of file diff --git a/test_full_cache_flow.py b/test_full_cache_flow.py deleted file mode 100644 index 1e1be3f..0000000 --- a/test_full_cache_flow.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试完整的角色greeting缓存流程 -""" - -import os -import sys -import json -from pathlib import Path - -# 添加项目路径 -sys.path.append('.') - -from audio_processes import ( - get_greeting_cache_path, - greeting_cache_exists, - load_cached_audio, - save_greeting_cache -) - -def test_character_greeting_cache(): - """测试角色greeting缓存流程""" - print("🧪 测试角色greeting缓存流程") - print("=" * 50) - - # 测试角色配置 - characters = [ - {"name": "李白", "greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!"}, - {"name": "猪八戒", "greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?"} - ] - - for character in characters: - character_name = character["name"] - greeting_text = character["greeting"] - - print(f"\n📝 测试角色: {character_name}") - print(f" 打招呼文本: {greeting_text}") - - # 1. 检查缓存是否已存在 - cache_exists = greeting_cache_exists(character_name) - print(f" 1. 缓存存在检查: {cache_exists}") - - # 2. 如果不存在缓存,模拟生成并保存 - if not cache_exists: - print(f" 2. 模拟生成TTS音频...") - # 模拟音频数据(实际使用时会是真实的TTS生成的音频) - mock_audio_data = f"mock_audio_for_{character_name}".encode('utf-8') + os.urandom(1000) - - print(f" 3. 保存到缓存...") - save_success = save_greeting_cache(character_name, mock_audio_data) - print(f" 4. 保存结果: {save_success}") - else: - print(f" 2. 缓存已存在,跳过生成") - - # 3. 验证缓存可以正常加载 - print(f" 5. 验证缓存加载...") - cached_audio = load_cached_audio(character_name) - if cached_audio: - print(f" 6. 缓存加载成功: {len(cached_audio)} 字节") - else: - print(f" 6. 缓存加载失败") - - print(f" ✅ {character_name} 的缓存流程测试完成") - - print(f"\n🎉 所有角色的缓存流程测试完成") - -def test_cache_hit_miss_scenario(): - """测试缓存命中和未命中场景""" - print("\n🧪 测试缓存命中和未命中场景") - print("=" * 50) - - character_name = "测试角色" - greeting_text = "这是一个测试角色的打招呼文本。" - - # 第一次调用 - 缓存未命中 - print(f"\n📝 第一次调用(缓存未命中)") - cache_exists_before = greeting_cache_exists(character_name) - print(f" 缓存存在: {cache_exists_before}") - - if not cache_exists_before: - print(" 模拟TTS生成和保存缓存...") - mock_audio = b"first_call_audio_data" + os.urandom(500) - save_greeting_cache(character_name, mock_audio) - - # 第二次调用 - 缓存命中 - print(f"\n📝 第二次调用(缓存命中)") - cache_exists_after = greeting_cache_exists(character_name) - print(f" 缓存存在: {cache_exists_after}") - - if cache_exists_after: - cached_audio = load_cached_audio(character_name) - print(f" 成功加载缓存: {len(cached_audio)} 字节") - - print(f"\n✅ 缓存命中/未命中场景测试完成") - -def cleanup_test_files(): - """清理测试文件""" - print("\n🧹 清理测试文件") - print("=" * 30) - - test_characters = ["李白", "猪八戒", "测试角色"] - - for character_name in test_characters: - cache_path = get_greeting_cache_path(character_name) - cache_file = Path(cache_path) - - if cache_file.exists(): - try: - cache_file.unlink() - print(f" 已删除: {cache_file.name}") - except Exception as e: - print(f" 删除失败 {cache_file.name}: {e}") - - # 检查缓存目录是否为空 - cache_dir = Path("greeting_cache") - if cache_dir.exists() and not any(cache_dir.iterdir()): - try: - cache_dir.rmdir() - print(f" 已删除空目录: {cache_dir}") - except Exception as e: - print(f" 删除目录失败: {e}") - -if __name__ == "__main__": - print("🚀 开始测试完整的角色greeting缓存流程") - - try: - # 测试角色greeting缓存流程 - test_character_greeting_cache() - - # 测试缓存命中和未命中场景 - test_cache_hit_miss_scenario() - - finally: - # 清理测试文件 - cleanup_test_files() - - print("\n🎉 所有测试完成!") \ No newline at end of file diff --git a/test_greeting_cache.py b/test_greeting_cache.py deleted file mode 100644 index ade6b44..0000000 --- a/test_greeting_cache.py +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试角色greeting缓存功能 -""" - -import os -import sys -import json -from pathlib import Path - -# 添加项目路径 -sys.path.append('.') - -from audio_processes import ( - get_greeting_cache_path, - greeting_cache_exists, - load_cached_audio, - save_greeting_cache -) - -def test_cache_functions(): - """测试缓存功能""" - print("🧪 测试角色greeting缓存功能") - print("=" * 50) - - # 测试角色名称 - test_character = "libai" - test_audio_data = b"test_audio_data_" + os.urandom(100) - - # 1. 测试缓存路径生成 - print("\n1️⃣ 测试缓存路径生成") - cache_path = get_greeting_cache_path(test_character) - print(f" 缓存路径: {cache_path}") - - # 2. 测试缓存存在检查 - print("\n2️⃣ 测试缓存存在检查") - exists_before = greeting_cache_exists(test_character) - print(f" 缓存存在检查 (保存前): {exists_before}") - - # 3. 测试缓存保存 - print("\n3️⃣ 测试缓存保存") - save_success = save_greeting_cache(test_character, test_audio_data) - print(f" 缓存保存成功: {save_success}") - - # 4. 测试缓存存在检查(保存后) - print("\n4️⃣ 测试缓存存在检查(保存后)") - exists_after = greeting_cache_exists(test_character) - print(f" 缓存存在检查 (保存后): {exists_after}") - - # 5. 测试缓存加载 - print("\n5️⃣ 测试缓存加载") - loaded_audio = load_cached_audio(test_character) - if loaded_audio: - print(f" 缓存加载成功: {len(loaded_audio)} 字节") - print(f" 数据匹配: {loaded_audio == test_audio_data}") - else: - print(" 缓存加载失败") - - # 6. 检查缓存文件 - print("\n6️⃣ 检查缓存文件") - cache_file = Path(cache_path) - if cache_file.exists(): - print(f" 缓存文件存在: {cache_file}") - print(f" 文件大小: {cache_file.stat().st_size} 字节") - else: - print(" 缓存文件不存在") - - # 7. 清理测试文件 - print("\n7️⃣ 清理测试文件") - try: - if cache_file.exists(): - cache_file.unlink() - print(" 测试文件已清理") - except Exception as e: - print(f" 清理失败: {e}") - - print("\n✅ 缓存功能测试完成") - -def test_character_configs(): - """测试角色配置""" - print("\n🧪 测试角色配置") - print("=" * 30) - - characters_dir = Path("characters") - if not characters_dir.exists(): - print("❌ characters目录不存在") - return - - character_files = list(characters_dir.glob("*.json")) - print(f"📁 找到 {len(character_files)} 个角色配置文件:") - - for character_file in character_files: - try: - with open(character_file, 'r', encoding='utf-8') as f: - config = json.load(f) - - name = config.get("name", "未知") - greeting = config.get("greeting", "无") - print(f" 📝 {name}: {greeting[:30]}...") - - except Exception as e: - print(f" ❌ 读取 {character_file.name} 失败: {e}") - -if __name__ == "__main__": - print("🚀 开始测试greeting缓存功能") - - # 测试缓存功能 - test_cache_functions() - - # 测试角色配置 - test_character_configs() - - print("\n🎉 所有测试完成!") \ No newline at end of file diff --git a/test_recording_stop.py b/test_recording_stop.py deleted file mode 100644 index 199698c..0000000 --- a/test_recording_stop.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -录音停止功能测试脚本 -验证play_greeting方法中的录音停止逻辑 -""" - -import sys -import time -sys.path.append('.') - -def test_recording_stop_logic(): - """测试录音停止逻辑""" - print("🧪 测试录音停止逻辑...") - - # 模拟ControlSystem类的关键部分 - class MockControlCommand: - def __init__(self, command): - self.command = command - - def __str__(self): - return f"ControlCommand({self.command})" - - class MockRecordingState: - IDLE = "idle" - RECORDING = "recording" - PLAYING = "playing" - - class MockControlSystem: - def __init__(self): - self.state = MockRecordingState.IDLE - self._monitoring_active = False - self.input_command_queue = [] - self.commands_sent = [] - - def _ensure_recording_stopped(self): - """确保录音功能完全停止 - 防止播放音频时产生回声""" - try: - # 停止当前录音(如果有) - if self.state == MockRecordingState.RECORDING: - print("🛑 停止当前录音...") - self.input_command_queue.append(MockControlCommand('stop_recording')) - self.commands_sent.append('stop_recording') - - # 模拟等待录音停止完成 - start_time = time.time() - while self.state == MockRecordingState.RECORDING and time.time() - start_time < 2.0: - time.sleep(0.1) - - if self.state == MockRecordingState.RECORDING: - print("⚠️ 录音停止超时,强制设置状态") - self.state = MockRecordingState.IDLE - - # 停止当前监听(如果有) - if hasattr(self, '_monitoring_active') and self._monitoring_active: - print("🛑 停止当前监听...") - self.input_command_queue.append(MockControlCommand('stop_monitoring')) - self.commands_sent.append('stop_monitoring') - self._monitoring_active = False - - # 额外确保:再次发送停止命令 - self.input_command_queue.append(MockControlCommand('stop_monitoring')) - self.commands_sent.append('stop_monitoring') - - print("✅ 录音功能已完全停止") - - except Exception as e: - print(f"❌ 停止录音功能时出错: {e}") - # 即使出错也要确保状态正确 - self.state = MockRecordingState.IDLE - - # 测试场景1: 从IDLE状态开始 - print("\n📋 测试场景1: 从IDLE状态播放greeting") - system1 = MockControlSystem() - system1.state = MockRecordingState.IDLE - system1._monitoring_active = False - - print(f" 初始状态: {system1.state}") - system1._ensure_recording_stopped() - print(f" 发送的命令: {system1.commands_sent}") - print(f" 最终状态: {system1.state}") - assert system1.state == MockRecordingState.IDLE - assert len(system1.commands_sent) == 1 # 只有一个额外的stop_monitoring - assert system1.commands_sent[0] == 'stop_monitoring' - - # 测试场景2: 从RECORDING状态开始 - print("\n📋 测试场景2: 从RECORDING状态播放greeting") - system2 = MockControlSystem() - system2.state = MockRecordingState.RECORDING - system2._monitoring_active = True - - print(f" 初始状态: {system2.state}") - print(f" 监听状态: {system2._monitoring_active}") - system2._ensure_recording_stopped() - print(f" 发送的命令: {system2.commands_sent}") - print(f" 最终状态: {system2.state}") - assert system2.state == MockRecordingState.IDLE - assert 'stop_recording' in system2.commands_sent - assert 'stop_monitoring' in system2.commands_sent - assert len([cmd for cmd in system2.commands_sent if cmd == 'stop_monitoring']) == 2 # 应该发送两次 - - # 测试场景3: 模拟录音停止超时 - print("\n📋 测试场景3: 录音停止超时") - system3 = MockControlSystem() - system3.state = MockRecordingState.RECORDING - system3._monitoring_active = False - - # 修改方法,模拟录音无法停止的情况 - def mock_ensure_recording_stopped_timeout(self): - """模拟录音停止超时的情况""" - try: - if self.state == MockRecordingState.RECORDING: - print("🛑 停止当前录音...") - self.commands_sent.append('stop_recording') - - # 模拟等待但状态不变 - start_time = time.time() - # 故意不改变状态,模拟超时 - if self.state == MockRecordingState.RECORDING: - print("⚠️ 录音停止超时,强制设置状态") - self.state = MockRecordingState.IDLE - - print("✅ 录音功能已完全停止") - except Exception as e: - print(f"❌ 停止录音功能时出错: {e}") - self.state = MockRecordingState.IDLE - - system3._ensure_recording_stopped = mock_ensure_recording_stopped_timeout.__get__(system3) - - print(f" 初始状态: {system3.state}") - system3._ensure_recording_stopped() - print(f" 发送的命令: {system3.commands_sent}") - print(f" 最终状态: {system3.state}") - assert system3.state == MockRecordingState.IDLE - assert 'stop_recording' in system3.commands_sent - - print("\n✅ 所有录音停止逻辑测试通过!") - -def test_play_greeting_integration(): - """测试play_greeting集成""" - print("\n🧪 测试play_greeting集成...") - - # 模拟集成测试 - print("📋 模拟play_greeting流程:") - print(" 1. 获取角色配置") - print(" 2. 调用_ensure_recording_stopped()") - print(" 3. 设置状态为PLAYING") - print(" 4. 检查缓存或生成TTS") - print(" 5. 发送音频到输出队列") - - print("\n🎯 关键改进点:") - print(" ✅ 在播放前确保录音完全停止") - print(" ✅ 防止音频播放时的回声问题") - print(" ✅ 状态转换的完整性和安全性") - print(" ✅ 错误处理和超时机制") - - print("\n✅ play_greeting集成测试通过!") - -if __name__ == "__main__": - try: - test_recording_stop_logic() - test_play_greeting_integration() - print("\n🎉 所有测试完成!录音停止功能已正确实现。") - - except Exception as e: - print(f"\n❌ 测试失败: {e}") - import traceback - traceback.print_exc() - sys.exit(1) \ No newline at end of file diff --git a/test_role_switching.py b/test_role_switching.py deleted file mode 100644 index ec4747b..0000000 --- a/test_role_switching.py +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -模拟角色切换场景测试缓存功能 -""" - -import os -import sys -import json -import time -from pathlib import Path - -# 添加项目路径 -sys.path.append('.') - -def simulate_role_switching(): - """模拟角色切换场景""" - print("🎭 模拟角色切换场景测试") - print("=" * 50) - - # 模拟角色配置 - characters = { - "李白": { - "name": "李白", - "greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!", - "voice": "ICL_zh_male_huzi_v1_tob" - }, - "猪八戒": { - "name": "猪八戒", - "greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?", - "voice": "zh_male_zhubajie_mars_bigtts" - } - } - - # 确保缓存目录存在 - os.makedirs("greeting_cache", exist_ok=True) - - # 模拟多次角色切换 - for iteration in range(3): - print(f"\n🔄 第 {iteration + 1} 次角色切换测试") - - for character_name, character_config in characters.items(): - print(f"\n📝 切换到角色: {character_name}") - - # 检查缓存是否存在 - cache_path = f"greeting_cache/{character_name}.wav" - cache_exists = os.path.exists(cache_path) - - if cache_exists: - print(f" ✅ 找到缓存: {cache_path}") - # 模拟加载缓存 - file_size = os.path.getsize(cache_path) - print(f" 📁 缓存文件大小: {file_size} 字节") - print(f" ⚡ 使用缓存播放(无需TTS生成)") - else: - print(f" ❌ 无缓存,需要生成TTS") - print(f" 🎵 模拟TTS生成...") - # 模拟TTS生成延迟 - time.sleep(0.1) - - # 模拟生成音频数据 - mock_audio = f"audio_for_{character_name}".encode('utf-8') + os.urandom(2000) - - # 保存到缓存 - try: - with open(cache_path, 'wb') as f: - f.write(mock_audio) - print(f" 💾 已保存到缓存: {cache_path}") - print(f" 📊 缓存大小: {len(mock_audio)} 字节") - except Exception as e: - print(f" ❌ 保存缓存失败: {e}") - - print(f" 🎵 模拟播放打招呼: {character_config['greeting'][:30]}...") - print(f" ✅ {character_name} 打招呼完成") - - print(f"\n🎊 角色切换测试完成") - -def analyze_cache_performance(): - """分析缓存性能""" - print("\n📊 缓存性能分析") - print("=" * 30) - - cache_dir = Path("greeting_cache") - if not cache_dir.exists(): - print("❌ 缓存目录不存在") - return - - cache_files = list(cache_dir.glob("*.wav")) - - if not cache_files: - print("❌ 没有找到缓存文件") - return - - total_size = 0 - print(f"📁 找到 {len(cache_files)} 个缓存文件:") - - for cache_file in cache_files: - file_size = cache_file.stat().st_size - total_size += file_size - character_name = cache_file.stem - print(f" 📄 {character_name}: {file_size} 字节") - - print(f"\n📊 缓存统计:") - print(f" 总文件数: {len(cache_files)}") - print(f" 总大小: {total_size} 字节 ({total_size/1024:.1f} KB)") - print(f" 平均大小: {total_size/len(cache_files):.1f} 字节") - -def cleanup_cache(): - """清理缓存""" - print("\n🧹 清理缓存") - print("=" * 20) - - cache_dir = Path("greeting_cache") - if cache_dir.exists(): - for cache_file in cache_dir.glob("*.wav"): - try: - cache_file.unlink() - print(f" 已删除: {cache_file.name}") - except Exception as e: - print(f" 删除失败 {cache_file.name}: {e}") - - # 尝试删除空目录 - try: - if not any(cache_dir.iterdir()): - cache_dir.rmdir() - print(f" 已删除空目录: {cache_dir}") - except Exception as e: - print(f" 删除目录失败: {e}") - else: - print(" 缓存目录不存在") - -if __name__ == "__main__": - print("🚀 开始角色切换场景测试") - - try: - # 模拟角色切换 - simulate_role_switching() - - # 分析缓存性能 - analyze_cache_performance() - - finally: - # 清理缓存 - cleanup_cache() - - print("\n🎉 所有测试完成!") \ No newline at end of file