add config

This commit is contained in:
朱潮 2025-09-26 11:47:28 +08:00
parent 91a9043c4a
commit 0f15b5060b
22 changed files with 47 additions and 2327 deletions

View File

@ -1,127 +0,0 @@
# Audio Processes 改进总结
## 问题背景
- 原始问题TTS音频只播放3个字符就停止出现ALSA underrun错误
- 根本原因:音频缓冲区管理不当,播放策略过于保守
## 改进内容
### 1. 音频播放优化 (_play_audio 方法)
- **改进前**:保守的播放策略,需要缓冲区有足够数据才开始播放
- **改进后**
- 借鉴 recorder.py 的播放策略:只要有数据就播放
- 添加错误恢复机制,自动检测和恢复 ALSA underrun
- 优化缓冲区管理,减少延迟
### 2. TTS 工作线程模式
- **参考**: recorder.py 的 TTS 工作线程实现
- **实现功能**
- 独立的 TTS 工作线程处理音频生成
- 任务队列管理,避免阻塞主线程
- 统一的 TTS 请求接口 `process_tts_request()`
- 支持流式音频处理
### 3. 统一的音频播放队列
- **InputProcess 和 OutputProcess 都支持**
- TTS 工作线程
- 音频生成和播放队列
- 统一的错误处理和日志记录
### 4. 关键改进点
#### 音频播放策略
```python
# 改进前:保守策略
if len(self.playback_buffer) > 2: # 需要缓冲区有足够数据
# 开始播放
# 改进后:积极策略 + 错误恢复
audio_chunk = self.playback_buffer.pop(0)
if audio_chunk and len(audio_chunk) > 0:
try:
self.output_stream.write(audio_chunk)
# 统计信息
except Exception as e:
# ALSA underrun 错误恢复
if "underrun" in str(e).lower():
# 自动恢复音频流
```
#### TTS 工作线程
```python
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
task = self.tts_task_queue.get(timeout=1.0)
if task is None:
break
task_type, content = task
if task_type == "tts_sentence":
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
```
#### 错误恢复机制
```python
# ALSA underrun 检测和恢复
if "underrun" in str(e).lower() or "alsa" in str(e).lower():
self.logger.info("检测到ALSA underrun尝试恢复音频流")
try:
if self.output_stream:
self.output_stream.stop_stream()
time.sleep(0.1)
self.output_stream.start_stream()
self.logger.info("音频流已恢复")
except Exception as recovery_e:
self.logger.error(f"恢复音频流失败: {recovery_e}")
self.playback_buffer.clear()
```
### 5. 性能优化
- 减少日志输出频率,提高性能
- 优化队列处理策略,使用适当的超时设置
- 动态调整休眠时间根据播放状态优化CPU使用
### 6. 测试和验证
- 创建了测试脚本 `test_audio_processes.py`
- 验证了语法正确性
- 可以测试 TTS 功能的完整性
## 使用方法
### 在控制系统中使用
```python
from audio_processes import InputProcess, OutputProcess
# 创建输入和输出进程
input_process = InputProcess(command_queue, event_queue)
output_process = OutputProcess(audio_queue)
# 处理TTS请求
output_process.process_tts_request("你好,这是测试语音")
```
### 独立测试
```bash
python test_audio_processes.py
```
## 预期效果
- 解决 ALSA underrun 错误
- 提高音频播放的流畅性
- 减少 TTS 处理的延迟
- 提供更稳定的音频处理能力
## 注意事项
1. 确保系统安装了必要的依赖:`requests`, `pyaudio`
2. 检查音频设备是否正常工作
3. 网络连接正常用于TTS服务
4. 适当调整音频参数以适应不同环境

View File

@ -1,66 +0,0 @@
# 缓存音频播放时序修复总结
## 问题描述
在打招呼读取缓存播放时发现有时会立即触发播放完成显示异常的时间差49.292秒),导致缓存音频无法正常播放完整。
## 根本原因
`_process_cached_audio` 方法中,缓存音频的处理存在时序问题:
1. **立即设置完成状态**:缓存音频被添加到预加载缓冲区后,立即设置 `all_audio_received = True`
2. **时序变量未初始化**`last_audio_chunk_time` 没有在缓存音频开始播放时正确设置
3. **完成检测误判**:播放完成检测逻辑使用旧的或零值的 `last_audio_chunk_time`,计算出异常的时间差,误判播放已完成
## 修复方案
### 1. 修改 `_process_cached_audio` 方法
- **延迟设置 `all_audio_received`**:不在音频添加到缓冲区时立即设置,而是等待实际开始播放时设置
- **正确初始化时序变量**:在缓存音频开始播放时设置 `last_audio_chunk_time = time.time()`
- **确保时序同步**:保证 `all_audio_received``last_audio_chunk_time` 在正确的时间点设置
### 2. 修改音频缓冲区转移逻辑
在以下缓冲区转移场景中也添加了时序变量初始化:
- 预加载缓冲区达到阈值时
- 最小缓冲区模式启动时
- 强制转移预加载缓冲区时
### 3. 关键修改点
#### 在 `_process_cached_audio` 中:
```python
# 修复前立即设置all_audio_received
self.all_audio_received = True
# 修复后:等待播放开始时设置
self.last_audio_chunk_time = time.time() # 关键修复
self.all_audio_received = True
```
#### 在缓冲区转移逻辑中:
```python
# 在各种缓冲区转移场景中添加
self.last_audio_chunk_time = time.time()
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
```
## 修复效果
1. **解决立即完成问题**缓存音频不再出现49.292秒的异常时间差
2. **确保完整播放**:缓存音频能够正常播放完整时长
3. **时序同步**:播放完成检测逻辑现在使用正确的时间戳
4. **避免死锁**:即使在异常情况下,也能正确设置完成状态避免系统卡死
## 测试验证
创建了专门的测试脚本 `test_cached_timing_fix.py` 验证修复效果:
- ✅ 关键时序变量存在且可访问
- ✅ 时序变量可以正确设置和读取
- ✅ 时间差计算正常接近0秒而非49.292秒)
- ✅ 异常时间差问题得到解决
## 影响范围
- **缓存音频播放**:修复了所有缓存音频的播放时序问题
- **播放完成检测**:改进了播放完成检测的准确性
- **系统稳定性**:提高了整个音频播放系统的稳定性
这个修复确保了缓存音频(如角色打招呼)能够正常播放完整,不会立即触发播放完成。

View File

@ -1,101 +0,0 @@
# 缓存音频播放完成检测修复总结
## 问题描述
缓存音频播放时,系统在音频还未播放完成时就错误地发送了完成信号。具体表现为:
- 缓存音频播放到6-7秒时系统错误地检测到播放完成
- 发送完成事件并重置播放状态,导致音频被中断
- 用户听到的是不完整的音频播放
## 根本原因分析
1. 在 `_play_cached_audio()` 方法中,当播放开始时就立即设置了 `tts_generation_complete = True``llm_generation_complete = True`
2. `_check_enhanced_playback_completion()` 方法没有区分缓存音频和普通TTS音频
3. 当主控制系统发送结束信号时,播放完成检测机制错误地认为所有条件都已满足
## 修复方案
### 1. 添加缓存音频状态标识
`OutputProcess` 类的 `__init__` 方法中添加:
```python
self.is_playing_cached_audio = False # 是否正在播放缓存音频
```
### 2. 修改 `_play_cached_audio()` 方法
- 移除立即设置 `tts_generation_complete``llm_generation_complete` 的代码
- 添加缓存音频状态设置:
```python
# 设置缓存音频播放状态
self.is_playing_cached_audio = True
```
- 在发送TTS完成信号后只设置TTS完成状态
```python
# 缓存音频没有真正的TTS过程所以立即设置TTS完成状态
# 但不设置LLM完成状态让缓存音频完成检测逻辑处理
self.tts_generation_complete = True
```
### 3. 添加专门的缓存音频完成检测方法
新增 `_check_cached_audio_completion()` 方法:
```python
def _check_cached_audio_completion(self):
"""缓存音频播放完成检测 - 简化逻辑不依赖LLM和TTS完成状态"""
# 更新状态变量
self.pre_buffer_empty = (len(self.preload_buffer) == 0)
self.playback_buffer_empty = (len(self.playback_buffer) == 0)
self.no_active_playback = (not self.currently_playing)
# 计算时间差
current_time = time.time()
time_since_last_chunk = current_time - self.last_audio_chunk_time
# 缓存音频完成条件:
# 1. 缓冲区都为空
# 2. 没有活跃播放
# 3. 至少1秒没有新音频播放确保音频完全播放完成
if (self.pre_buffer_empty and
self.playback_buffer_empty and
self.no_active_playback):
if self.last_audio_chunk_time > 0 and time_since_last_chunk > 1.0:
print(f"✅ 缓存音频播放完成:缓冲区已清空,播放器空闲,{time_since_last_chunk:.2f}秒无新音频")
return True
else:
return False
else:
return False
```
### 4. 修改 `_check_enhanced_playback_completion()` 方法
在方法开头添加缓存音频检测逻辑:
```python
# 如果正在播放缓存音频,使用简化的完成检测逻辑
if self.is_playing_cached_audio:
return self._check_cached_audio_completion()
```
### 5. 确保状态正确重置
`_finish_playback()` 方法中添加:
```python
self.is_playing_cached_audio = False # 重置缓存音频播放状态
```
## 修复效果
修复后的系统具有以下特性:
1. **区分音频类型**能够区分缓存音频和普通TTS音频
2. **简化检测逻辑**缓存音频使用简化的完成检测逻辑不依赖LLM和TTS完成状态
3. **确保完整播放**只有当缓冲区为空、播放器空闲且至少1秒无新音频时才认为播放完成
4. **状态管理**:正确管理所有相关状态,确保状态一致性
## 测试验证
创建了专门的测试脚本验证修复效果:
- ✅ 新增状态变量和方法正确
- ✅ 缓存音频完成检测逻辑正确
- ✅ 缓存音频播放中检测逻辑正确
## 注意事项
1. 该修复不影响普通TTS音频的播放完成检测
2. 主控制系统的逻辑保持不变
3. 缓存音频播放仍然遵循原有的音频播放流程
4. 修复向后兼容,不会破坏现有功能
## 结论
通过区分缓存音频和TTS音频的播放完成检测逻辑成功解决了缓存音频提前结束的问题。现在缓存音频能够完整播放只有在真正播放完成后才会发送完成事件。

View File

@ -1,165 +0,0 @@
# 角色TTS话术音频缓存功能实现总结
## 功能概述
为角色打招呼greeting文本添加了音频缓存功能避免每次角色切换时都重新生成TTS音频提升用户体验和系统性能。
## 实现内容
### 1. 缓存管理工具函数
`audio_processes.py` 中添加了以下工具函数:
- `get_greeting_cache_path(character_name)` - 获取缓存文件路径
- `greeting_cache_exists(character_name)` - 检查缓存是否存在
- `load_cached_audio(character_name)` - 加载缓存音频数据
- `save_greeting_cache(character_name, audio_data)` - 保存音频到缓存
### 2. OutputProcess增强
#### 2.1 修改 `_add_tts_task` 方法
- 添加 `character_name` 参数支持
- 实现缓存检查逻辑
- 支持缓存音频和普通TTS音频的统一处理
#### 2.2 新增 `_process_cached_audio` 方法
- 专门处理缓存音频数据
- 复用现有的播放完成检测机制
- 确保状态管理一致性
#### 2.3 新增 `process_greeting_text` 方法
- 专门处理打招呼文本
- 集成缓存检查和TTS生成
- 正确管理播放状态
#### 2.4 新增 `_process_tts_buffer_with_cache` 方法
- 带缓存支持的TTS缓冲区处理
- 传递角色名称到TTS任务队列
#### 2.5 修改 `_generate_tts_audio` 方法
- 添加 `character_name` 参数
- 支持生成音频后自动保存到缓存
- 收集音频数据用于缓存保存
#### 2.6 修改 `_tts_worker` 方法
- 支持处理不同类型的TTS任务
- 修复任务解包逻辑,支持变长任务元组
### 3. ControlSystem增强
#### 3.1 新增 `_send_greeting_to_output_process` 方法
- 发送带角色信息的打招呼文本
- 支持缓存处理的命令格式
#### 3.2 修改 `play_greeting` 方法
- 集成缓存功能
- 传递角色名称到输出进程
### 4. 命令协议扩展
新增命令类型:
- `GREETING_TEXT:{text}:{character_name}` - 打招呼文本处理命令
任务类型扩展:
- `("tts_sentence", text, character_name)` - 带角色名的TTS任务
- `("cached_audio", text, audio_data, character_name)` - 缓存音频任务
## 工作流程
### 1. 首次播放(无缓存)
1. ControlSystem调用 `play_greeting()`
2. 发送 `GREETING_TEXT` 命令到OutputProcess
3. OutputProcess检查缓存不存在
4. 调用TTS生成音频
5. 播放音频的同时保存到缓存
6. 正常发送播放完成状态
### 2. 后续播放(有缓存)
1. ControlSystem调用 `play_greeting()`
2. 发送 `GREETING_TEXT` 命令到OutputProcess
3. OutputProcess检查缓存存在
4. 直接加载缓存音频到播放缓冲区
5. 发送TTS完成状态
6. 正常播放完成
## 状态管理
缓存音频完全复用现有的状态管理机制:
- `tts_generation_complete` - TTS生成完成状态
- `llm_generation_complete` - LLM生成完成状态
- `all_audio_received` - 音频接收完成状态
- 播放完成检测机制 - 确保音频完整播放
## 缓存存储
### 文件结构
```
greeting_cache/
├── {character_name}.wav # 角色打招呼音频文件
```
### 命名规则
- 文件名:`{character_name}.wav`
- 路径:`greeting_cache/{character_name}.wav`
### 自动管理
- 缓存目录自动创建
- 简单的文件覆盖策略无LRU等复杂策略
## 性能提升
### 测试结果
- **首次播放**需要TTS生成2-3秒
- **缓存播放**:即时播放(<0.5秒
- **缓存命中率**100%(第二次及以后播放)
- **存储开销**约2KB per角色
### 资源节约
- 减少重复TTS API调用
- 降低网络带宽使用
- 提升系统响应速度
## 兼容性
- 完全向后兼容,不影响现有功能
- 普通TTS对话不受影响
- 仅对角色打招呼启用缓存
- 状态管理机制保持不变
## 测试验证
创建了多个测试脚本:
- `test_greeting_cache.py` - 基础缓存功能测试
- `test_full_cache_flow.py` - 完整缓存流程测试
- `test_role_switching.py` - 角色切换场景测试
所有测试均通过,功能正常工作。
## 使用说明
### 自动使用
缓存功能完全自动化,无需手动干预:
1. 角色切换时自动检查缓存
2. 无缓存时自动生成并保存
3. 有缓存时自动使用
### 手动清理
如需清理缓存,删除 `greeting_cache/` 目录即可:
```bash
rm -rf greeting_cache/
```
## 注意事项
1. **缓存有效性**:不验证缓存文件是否过期
2. **存储空间**:无自动清理机制,需要手动管理
3. **角色名称**:基于角色名称作为缓存键,确保名称唯一性
4. **音频格式**保存原始PCM音频数据无格式转换
## 扩展性
该实现为未来扩展提供了良好基础:
- 可添加缓存过期策略
- 可添加缓存大小限制
- 可支持更多类型的文本缓存
- 可添加缓存统计和监控

View File

@ -1,101 +0,0 @@
# 快速启动指南
## 一键启动(推荐)
```bash
# 直接运行,系统会自动校准和启动监听
python multiprocess_recorder.py
# 指定角色
python multiprocess_recorder.py -c libai
# 详细模式
python multiprocess_recorder.py -v
```
## 编程方式启动
### 最简单的方式
```python
from control_system import ControlSystem
# 创建控制系统
control_system = ControlSystem()
# 一键启动(自动校准 + 自动监听)
control_system.start()
```
### 自定义配置
```python
from control_system import ControlSystem
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 启动选项:
# auto_calibration=True - 自动校准语音检测器
# auto_monitoring=True - 自动启动音频监听
control_system.start(auto_calibration=True, auto_monitoring=True)
```
## 手动控制
```python
from control_system import ControlSystem
control_system = ControlSystem()
# 只启动进程,不自动校准和监听
control_system._start_processes()
# 手动步骤:
control_system.start_calibration() # 1. 启动校准
control_system.wait_for_calibration_complete() # 2. 等待校准完成
control_system.start_monitoring() # 3. 启动监听
# 运行中可以随时控制:
control_system.stop_monitoring() # 停止监听
control_system.start_monitoring() # 重新启动监听
# 查询状态:
status = control_system.get_calibration_status() # 获取校准状态
status = control_system.get_monitoring_status() # 获取监听状态
# 关闭系统:
control_system.shutdown()
```
## 启动流程
系统启动时会按以下顺序执行:
1. **启动进程** - 创建输入进程和输出进程
2. **自动校准** - 校准语音检测器约3-5秒
3. **启动监听** - 启用音频监听功能
4. **开始运行** - 进入主控制循环,开始检测语音
## 注意事项
- **校准时间**首次启动需要3-5秒进行语音检测器校准
- **音频权限**:确保麦克风权限已授予
- **环境安静**:校准时请保持环境安静
- **API密钥**如需LLM功能请设置 `ARK_API_KEY` 环境变量
## 故障排除
如果校准失败:
- 检查麦克风是否正常工作
- 确保环境安静,无背景噪音
- 尝试重新启动系统
如果监听失败:
- 检查音频设备是否被其他程序占用
- 尝试重启程序
- 查看日志文件排查问题

143
README.md
View File

@ -1,143 +0,0 @@
# 智能语音助手系统使用说明
## 功能概述
这是一个完整的智能语音助手系统,集成了语音录制、语音识别、大语言模型和文本转语音功能,实现语音对话交互。
## 完整工作流程
1. 🎙️ **语音录制** - 基于ZCR的智能语音检测
2. 📝 **保存录音** - 自动保存为WAV文件
3. 🤖 **语音识别** - 使用字节跳动ASR将语音转为文字
4. 💬 **AI回复** - 使用豆包大模型生成智能回复
5. 🔊 **语音回复** - 使用字节跳动TTS将AI回复转为语音
## 环境配置
### 1. 安装依赖
```bash
pip install websockets requests pyaudio numpy
```
### 2. 安装音频播放器(树莓派/Linux系统
系统使用PCM格式音频只需要安装基础的音频播放工具
```bash
# 安装 alsa-utils包含aplay播放器
sudo apt-get update
sudo apt-get install alsa-utils
```
> **优势**: PCM格式无需额外解码器兼容性更好资源占用更少。
> **注意**: macOS和Windows系统通常内置支持音频播放无需额外安装。
### 3. 设置API密钥
为了启用大语言模型功能,需要设置环境变量:
```bash
# Linux/Mac
export ARK_API_KEY='your_api_key_here'
# Windows
set ARK_API_KEY=your_api_key_here
```
> **注意**: 语音识别和文本转语音功能使用内置的API密钥无需额外配置。
## 使用方法
### 基本使用
```bash
python recorder.py
```
### 功能说明
- 🎯 **自动检测语音**:系统会自动检测声音并开始录音
- ⏱️ **智能停止**静音3秒后自动停止录音
- 🔊 **自动播放**:录音完成后自动播放音频
- 📝 **语音识别**:自动将语音转为文字
- 🤖 **AI助手**:自动调用大语言模型生成回复
### 配置参数
- `energy_threshold=200` - 能量阈值(调整灵敏度)
- `silence_threshold=3.0` - 静音阈值(秒)
- `min_recording_time=2.0` - 最小录音时间(秒)
- `max_recording_time=30.0` - 最大录音时间(秒)
- `enable_asr=True` - 启用语音识别
- `enable_llm=True` - 启用大语言模型
- `enable_tts=True` - 启用文本转语音
## 输出示例
```
🎤 开始监听...
能量阈值: 200 (已弃用)
静音阈值: 3.0秒
📖 使用说明:
- 检测到声音自动开始录音
- 持续静音3秒自动结束录音
- 最少录音2秒最多30秒
- 录音完成后自动进行语音识别和AI回复
- 按 Ctrl+C 退出
==================================================
🎙️ 检测到声音,开始录音...
📝 录音完成,时长: 3.45秒 (包含预录音 2.0秒)
✅ 录音已保存: recording_20250920_163022.wav
==================================================
📡 音频输入已保持关闭状态
🔄 开始处理音频...
🤖 开始语音识别...
📝 识别结果: 你好,今天天气怎么样?
--------------------------------------------------
🤖 调用大语言模型...
💬 AI助手回复: 你好!我无法实时获取天气信息,建议你查看天气预报或打开天气应用来了解今天的天气情况。有什么其他我可以帮助你的吗?
--------------------------------------------------
🔊 开始文本转语音...
TTS句子信息: {'code': 0, 'message': '', 'data': None, 'sentence': {'phonemes': [], 'text': '你好!我无法实时获取天气信息,建议你查看天气预报或打开天气应用来了解今天的天气情况。有什么其他我可以帮助你的吗?', 'words': [...]}}
✅ TTS音频已保存: tts_response_20250920_163022.pcm
📁 文件大小: 128.75 KB
🔊 播放AI语音回复...
✅ AI语音回复完成
🔄 准备重新开启音频输入
✅ 音频设备初始化成功
📡 音频输入已重新开启
```
## 注意事项
1. **网络连接**:需要网络连接来使用语音识别、大语言模型和文本转语音服务
2. **API密钥**需要有效的ARK_API_KEY才能使用大语言模型功能
3. **音频设备**:确保麦克风和扬声器工作正常
4. **权限**:确保程序有访问麦克风、网络和存储的权限
5. **文件存储**系统会保存录音文件和TTS生成的音频文件
## 故障排除
- 如果语音识别失败检查网络连接和API密钥
- 如果大语言模型失败检查ARK_API_KEY是否正确设置
- 如果文本转语音失败检查TTS服务状态
- 如果录音失败,检查麦克风权限和设备
- 如果播放失败,检查音频设备权限
- 如果PCM文件无法播放检查是否安装了alsa-utils
```bash
# 树莓派/Ubuntu/Debian系统
sudo apt-get install alsa-utils
# 或检查aplay是否安装
which aplay
```
## 技术特点
- 🎯 基于ZCR的精确语音检测
- 🚀 低延迟实时处理
- 💾 环形缓冲区防止音频丢失
- 🔧 自动调整能量阈值
- 📊 实时性能监控
- 🌐 完整的语音对话链路
- 📁 自动文件管理和权限设置
- 🔊 PCM格式音频无需额外解码器
## 生成的文件
- `recording_*.wav` - 录制的音频文件
- `tts_response_*.pcm` - AI语音回复文件PCM格式
## PCM格式优势
- **兼容性好**aplay原生支持树莓派开箱即用
- **资源占用少**无需解码过程CPU占用更低
- **延迟更低**:直接播放,无需格式转换
- **稳定性高**:减少依赖组件,提高系统稳定性

View File

@ -1,210 +0,0 @@
# 多进程音频控制系统 - 主进程控制功能
## 概述
本系统已经重构,支持主进程对输入进程的校准和监听功能进行精确控制。通过这些新功能,你可以:
1. **手动控制校准过程**:在适当的时间启动语音检测器校准
2. **精确控制监听状态**:按需启用或禁用音频监听
3. **获取实时状态**:查询校准进度和监听状态
## 主要功能
### 1. 校准功能
#### 启动校准
```python
# 启动语音检测器校准
success = control_system.start_calibration()
if success:
print("校准已启动")
```
#### 获取校准状态
```python
# 获取当前校准状态
status = control_system.get_calibration_status()
if status:
print(f"校准进度: {status['progress']*100:.1f}%")
print(f"是否在校准中: {status['calibrating']}")
```
#### 等待校准完成
```python
# 等待校准完成30秒超时
if control_system.wait_for_calibration_complete(timeout=30):
print("校准完成")
else:
print("校准超时")
```
### 2. 监听功能
#### 启动监听
```python
# 启动音频监听
success = control_system.start_monitoring()
if success:
print("监听已启动")
```
#### 停止监听
```python
# 停止音频监听
success = control_system.stop_monitoring()
if success:
print("监听已停止")
```
#### 获取监听状态
```python
# 获取当前监听状态
status = control_system.get_monitoring_status()
if status:
print(f"监听启用: {status['enabled']}")
print(f"正在录音: {status['recording']}")
print(f"音频流活跃: {status['audio_stream_active']}")
```
## 使用示例
### 方法1自动启动推荐
```python
from control_system import ControlSystem
# 1. 创建控制系统
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 2. 一键启动(自动校准和监听)
control_system.start(auto_calibration=True, auto_monitoring=True)
# 系统现在正在运行,会自动处理语音检测和录音
```
### 方法2手动控制
```python
from control_system import ControlSystem
import time
# 1. 创建控制系统
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 2. 启动进程(但不自动启用监听)
control_system._start_processes()
# 3. 步骤1校准
print("开始校准...")
control_system.start_calibration()
# 等待校准完成
if control_system.wait_for_calibration_complete(timeout=30):
print("校准完成")
else:
print("校准失败")
exit(1)
# 4. 步骤2启动监听
print("开始监听...")
control_system.start_monitoring()
# 5. 运行一段时间
print("系统运行中...")
try:
while True:
# 检查事件和显示状态
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
print("用户中断")
# 6. 停止监听
print("停止监听...")
control_system.stop_monitoring()
# 7. 关闭系统
control_system.shutdown()
```
### 方法3混合控制
```python
from control_system import ControlSystem
# 1. 创建控制系统
control_system = ControlSystem(config)
# 2. 自动启动,但只校准,不自动监听
control_system.start(auto_calibration=True, auto_monitoring=False)
# 3. 手动控制监听
control_system.start_monitoring() # 启动监听
# ... 运行一段时间 ...
control_system.stop_monitoring() # 停止监听
control_system.start_monitoring() # 重新启动监听
# 4. 关闭系统
control_system.shutdown()
```
### 自动化示例
查看 `example_manual_control.py` 文件获取完整的自动化控制示例。
## 关键变化
### 1. 默认行为变化
- **之前**:输入进程启动后自动开始校准和监听
- **现在**:输入进程启动后处于静默状态,等待主进程命令
### 2. 新增控制接口
`ControlSystem` 类中新增了以下方法:
- `start_calibration()` - 启动校准
- `start_monitoring()` - 启动监听
- `stop_monitoring()` - 停止监听
- `get_calibration_status()` - 获取校准状态
- `get_monitoring_status()` - 获取监听状态
- `wait_for_calibration_complete(timeout)` - 等待校准完成
### 3. 新增命令支持
`InputProcess` 中支持以下新命令:
- `start_calibration` - 开始校准
- `start_monitoring` - 开始监听
- `stop_monitoring` - 停止监听
- `get_calibration_status` - 获取校准状态
- `get_monitoring_status` - 获取监听状态
## 使用建议
1. **初始化顺序**:建议按照"启动进程 → 校准 → 启动监听"的顺序进行
2. **错误处理**:建议对每个操作进行错误检查和重试
3. **状态监控**:定期检查状态以确保系统正常运行
4. **资源清理**:使用完毕后正确关闭系统
## 注意事项
1. **进程间通信**:所有控制都是通过进程间队列实现的,可能会有轻微延迟
2. **超时处理**:建议为所有状态查询操作设置合理的超时时间
3. **并发安全**:确保在多线程环境中正确使用这些方法
4. **音频设备**:启动和停止监听会重新初始化音频设备,可能有短暂延迟

View File

@ -1,190 +0,0 @@
# 多进程音频录音系统
基于进程隔离的音频处理架构,实现零延迟的录音和播放切换。
## 🚀 系统特点
### 核心优势
- **多进程架构**: 输入输出进程完全隔离,无需设备重置
- **零切换延迟**: 彻底解决传统单进程的音频切换问题
- **实时响应**: 并行处理录音和播放,真正的实时体验
- **智能检测**: 基于ZCR(零交叉率)的精确语音识别
- **流式TTS**: 实时音频生成和播放,减少等待时间
- **角色扮演**: 支持多种AI角色和音色
### 技术架构
```
主控制进程 ──┐
├─ 输入进程 (录音 + 语音检测)
├─ 输出进程 (音频播放)
└─ 在线AI服务 (STT + LLM + TTS)
```
## 📦 文件结构
```
Local-Voice/
├── recorder.py # 原始实现 (保留作为参考)
├── multiprocess_recorder.py # 主程序
├── audio_processes.py # 音频进程模块
├── control_system.py # 控制系统模块
├── config.json # 配置文件
└── characters/ # 角色配置目录
├── libai.json # 李白角色
└── zhubajie.json # 猪八戒角色
```
## 🛠️ 安装和运行
### 1. 环境要求
- Python 3.7+
- 音频输入设备 (麦克风)
- 网络连接 (用于在线AI服务)
### 2. 安装依赖
```bash
pip install pyaudio numpy requests websockets
```
### 3. 设置API密钥
```bash
export ARK_API_KEY='your_api_key_here'
```
### 4. 基本运行
```bash
# 使用默认角色 (李白)
python multiprocess_recorder.py
# 指定角色
python multiprocess_recorder.py -c zhubajie
# 列出可用角色
python multiprocess_recorder.py -l
# 使用配置文件
python multiprocess_recorder.py --config config.json
# 创建示例配置文件
python multiprocess_recorder.py --create-config
```
## ⚙️ 配置说明
### 主要配置项
| 配置项 | 说明 | 默认值 |
|--------|------|--------|
| `recording.min_duration` | 最小录音时长(秒) | 2.0 |
| `recording.max_duration` | 最大录音时长(秒) | 30.0 |
| `recording.silence_threshold` | 静音检测阈值(秒) | 3.0 |
| `detection.zcr_min` | ZCR最小值 | 2400 |
| `detection.zcr_max` | ZCR最大值 | 12000 |
| `processing.max_tokens` | LLM最大token数 | 50 |
### 音频参数
- 采样率: 16kHz
- 声道数: 1 (单声道)
- 位深度: 16位
- 格式: PCM
## 🎭 角色系统
### 支持的角色
- **libai**: 李白 - 文雅诗人风格
- **zhubajie**: <20>豬八戒 - 幽默风趣风格
### 自定义角色
`characters/` 目录创建JSON文件:
```json
{
"name": "角色名称",
"description": "角色描述",
"system_prompt": "系统提示词",
"voice": "zh_female_wanqudashu_moon_bigtts",
"max_tokens": 50
}
```
## 🔧 故障排除
### 常见问题
1. **音频设备问题**
```bash
# 检查音频设备
python multiprocess_recorder.py --check-env
```
2. **依赖缺失**
```bash
# 重新安装依赖
pip install --upgrade pyaudio numpy requests websockets
```
3. **网络连接问题**
- 检查网络连接
- 确认API密钥正确
- 检查防火墙设置
4. **权限问题**
```bash
# Linux系统可能需要音频权限
sudo usermod -a -G audio $USER
```
### 调试模式
```bash
# 启用详细输出
python multiprocess_recorder.py -v
```
## 📊 性能对比
| 指标 | 原始单进程 | 多进程架构 | 改善 |
|------|-----------|------------|------|
| 切换延迟 | 1-2秒 | 0秒 | 100% |
| CPU利用率 | 单核 | 多核 | 提升 |
| 响应速度 | 较慢 | 实时 | 显著改善 |
| 稳定性 | 一般 | 优秀 | 大幅提升 |
## 🔄 与原版本对比
### 原版本 (recorder.py)
- 单进程处理
- 需要频繁重置音频设备
- 录音和播放不能同时进行
- 切换延迟明显
### 新版本 (multiprocess_recorder.py)
- 多进程架构
- 输入输出完全隔离
- 零切换延迟
- 真正的并行处理
- 更好的稳定性和扩展性
## 📝 开发说明
### 架构设计
- **输入进程**: 专注录音和语音检测
- **输出进程**: 专注音频播放
- **主控制进程**: 协调整个系统和AI处理
### 进程间通信
- 使用 `multiprocessing.Queue` 进行安全通信
- 支持命令控制和事件通知
- 线程安全的音频数据传输
### 状态管理
- 清晰的状态机设计
- 完善的错误处理机制
- 优雅的进程退出流程
## 📄 许可证
本项目仅供学习和研究使用。
## 🤝 贡献
欢迎提交Issue和Pull Request来改进这个项目。

View File

@ -0,0 +1,10 @@
{
"name": "爱因斯坦",
"description": "伟大的物理学家,相对论创立者",
"system_prompt": "我是阿尔伯特·爱因斯坦,物理学家。说话要有科学家的智慧感和好奇心,喜欢用简单的比喻解释复杂的概念。经常思考宇宙、时间、空间等深奥问题。要有幽默感,偶尔自嘲一下。说话要体现出对知识的热爱和对世界的好奇心,常用'有趣的是'、'想象一下'等词语来引导思考。",
"voice": "ICL_zh_male_youmodaye_tob",
"max_tokens": 500,
"greeting": "你好!我是爱因斯坦。想象力比知识更重要,因为知识是有限的,而想象力概括着世界的一切。",
"nfc_uid": "1DCAC90D0D1080",
"author": "Claude"
}

10
characters/shaseng.json Normal file
View File

@ -0,0 +1,10 @@
{
"name": "沙僧",
"description": "西游记中的沙和尚,忠厚老实的护法",
"system_prompt": "我是沙和尚,沙悟净。原是流沙河的水怪,后随师父唐僧西天取经。为人忠厚老实,做事踏实稳重,从不偷懒。说话诚恳实在,常用'师父说得对'、'大师兄说得对'来表达对长者的尊重。性格温和,做事认真负责,是个可靠的伙伴。说话要体现出踏实肯干的性格。",
"voice": "ICL_zh_male_diyinchenyu_tob",
"max_tokens": 500,
"greeting": "贫僧沙悟净,见过各位。愿随师父西行取经,保护师父安全,一路降妖除魔。",
"nfc_uid": "1DC9C90D0D1080",
"author": "Claude"
}

10
characters/tangseng.json Normal file
View File

@ -0,0 +1,10 @@
{
"name": "唐僧",
"description": "西游记中的取经人,慈悲为怀的高僧",
"system_prompt": "贫僧唐三藏,法号玄奘,奉唐王之命前往西天取经。说话要温和慈悲,常常引用佛经教诲,劝人向善。对众生都要有慈悲心,即使是妖魔鬼怪也要度化。说话要文雅,常用'阿弥陀佛'、'善哉善哉'等佛家用语。回答要体现出高僧的智慧和慈悲心。",
"voice": "zh_male_tangseng_mars_bigtts",
"max_tokens": 500,
"greeting": "阿弥陀佛,贫僧唐三藏。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。",
"nfc_uid": "1DC8C90D0D1080",
"author": "Claude"
}

View File

@ -0,0 +1,10 @@
{
"name": "作业小助手",
"description": "专门催促小懒虫写作业的贴心小帮手",
"system_prompt": "我是来催你写作业的小助手!小懒虫,别再拖延了,作业要抓紧时间完成哦!我会用各种方式提醒你:温和鼓励、严肃提醒、偶尔撒娇催促。知道你不想写作业,但学习很重要啊!常用'该写作业啦'、'加油加油'、'别玩手机了'这样的话语。既要关心你,又要坚持原则,直到你把作业完成。记住,我是来帮你的,不是来批评你的。",
"voice": "zh_female_linjianvhai_moon_bigtts",
"max_tokens": 500,
"greeting": "小懒虫!作业写完了吗?快过来写作业,我陪着你一起加油!别让我一直催你哦~",
"nfc_uid": "1DCBC90D0D1080",
"author": "Claude"
}

View File

@ -35,5 +35,11 @@
"show_progress": true,
"progress_interval": 100,
"chunk_size": 512
},
"cleanup": {
"auto_cleanup": true,
"retention_hours": 1,
"max_files": 10,
"cleanup_interval": 120
}
}

View File

@ -1,172 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试修复后的缓存播放功能
"""
import os
import sys
import time
# 添加项目路径
sys.path.append('.')
def test_cache_playback_fixes():
"""测试缓存播放修复"""
print("🔧 测试缓存播放修复")
print("=" * 40)
from audio_processes import save_greeting_cache, greeting_cache_exists, load_cached_audio
# 创建测试缓存
character_name = "测试角色"
mock_audio = b"test_cached_audio_" * 100 # 约1.6KB
print("📝 创建测试缓存...")
save_greeting_cache(character_name, mock_audio)
if greeting_cache_exists(character_name):
print("✅ 缓存创建成功")
# 模拟OutputProcess的修复逻辑
print("\n🎵 模拟修复后的缓存播放流程...")
# 模拟状态变量
is_playing = False
currently_playing = False
preload_buffer = []
playback_buffer = []
preload_size = 3
last_playback_time = 0
playback_cooldown_period = 0.05
tts_generation_complete = False
all_audio_received = False
# 模拟加载缓存
cached_audio = load_cached_audio(character_name)
if cached_audio:
print(f"📁 缓存音频加载: {len(cached_audio)} 字节")
# 添加到预加载缓冲区
preload_buffer.append(cached_audio)
print(f"📦 添加到预加载缓冲区")
# 应用修复后的逻辑
print("\n🔧 应用修复1: 设置TTS完成状态")
tts_generation_complete = True
print(f" tts_generation_complete = {tts_generation_complete}")
print("🔧 应用修复2: 设置all_audio_received状态")
all_audio_received = True
print(f" all_audio_received = {all_audio_received}")
print("🔧 应用修复3: 检查播放触发条件")
if (not is_playing and len(preload_buffer) >= preload_size):
print(" 🎵 条件1: 预加载完成播放")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
last_playback_time = 0 # 修复: 避免冷却期
print(f" 📊 播放缓冲区: {len(playback_buffer)}")
elif (not is_playing and len(preload_buffer) > 0):
print(" 🎵 条件2: 强制播放缓存音频")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
last_playback_time = 0 # 修复: 避免冷却期
print(f" 📊 播放缓冲区: {len(playback_buffer)}")
# 模拟播放冷却检查
print("\n🔧 检查播放冷却机制...")
current_time = time.time()
time_since_last_play = current_time - last_playback_time
in_cooldown = (last_playback_time > 0 and
time_since_last_play < playback_cooldown_period)
print(f" time_since_last_play = {time_since_last_play}")
print(f" playback_cooldown_period = {playback_cooldown_period}")
print(f" in_cooldown = {in_cooldown}")
if not in_cooldown:
print(" ✅ 无冷却期限制,可以播放")
currently_playing = True
print(" 🎧 开始播放音频...")
else:
print(" ❌ 仍在冷却期内,跳过播放")
# 模拟播放完成检测
print("\n🔧 检查播放完成条件...")
conditions_met = (
tts_generation_complete and # TTS生成完成
all_audio_received and # 所有音频已接收
len(preload_buffer) == 0 and # 预加载缓冲区为空
len(playback_buffer) == 0 and # 播放缓冲区为空
not currently_playing # 当前没有在播放
)
print(f" tts_generation_complete = {tts_generation_complete}")
print(f" all_audio_received = {all_audio_received}")
print(f" preload_buffer_empty = {len(preload_buffer) == 0}")
print(f" playback_buffer_empty = {len(playback_buffer) == 0}")
print(f" not_currently_playing = {not currently_playing}")
print(f" conditions_met = {conditions_met}")
if conditions_met:
print(" ✅ 播放完成条件满足")
else:
print(" ⏳ 等待播放完成")
else:
print("❌ 缓存创建失败")
def test_key_fixes():
"""测试关键修复点"""
print("\n🔍 测试关键修复点")
print("=" * 30)
print("✅ 修复1: 播放冷却问题")
print(" - 将 last_playback_time 设置为 0")
print(" - 避免立即触发冷却期")
print("\n✅ 修复2: all_audio_received 状态")
print(" - 缓存音频立即设置为 True")
print(" - 避免系统无限等待")
print("\n✅ 修复3: 播放触发逻辑")
print(" - 强制播放机制")
print(" - 确保缓存音频能正常播放")
def cleanup():
"""清理测试文件"""
print("\n🧹 清理测试文件")
from audio_processes import get_greeting_cache_path
cache_path = get_greeting_cache_path("测试角色")
if os.path.exists(cache_path):
try:
os.remove(cache_path)
print(" ✅ 测试缓存已删除")
except Exception as e:
print(f" ❌ 删除失败: {e}")
if __name__ == "__main__":
print("🚀 开始测试修复后的缓存播放功能")
try:
# 测试缓存播放修复
test_cache_playback_fixes()
# 测试关键修复点
test_key_fixes()
print("\n🎉 修复测试完成!")
except Exception as e:
print(f"\n❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
finally:
# 清理测试文件
cleanup()

View File

@ -1,78 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试缓存音频播放完成检测修复
"""
import sys
import os
sys.path.append(os.path.dirname(__file__))
from audio_processes import OutputProcess
import multiprocessing as mp
import time
def test_cached_audio_completion():
"""测试缓存音频播放完成检测"""
print("🧪 开始测试缓存音频播放完成检测修复...")
# 创建测试队列
audio_queue = mp.Queue(maxsize=100)
event_queue = mp.Queue(maxsize=100)
# 创建输出进程实例
config = {
'buffer_size': 1000,
'show_progress': True,
'progress_interval': 100,
'tts_speaker': 'zh_female_wanqudashu_moon_bigtts'
}
output_process = OutputProcess(audio_queue, config, event_queue)
# 测试1: 检查新添加的状态变量
print("\n📋 测试1: 检查新增的状态变量")
assert hasattr(output_process, 'is_playing_cached_audio'), "缺少 is_playing_cached_audio 状态变量"
assert output_process.is_playing_cached_audio == False, "初始状态应该为 False"
print("✅ 状态变量检查通过")
# 测试2: 检查新添加的方法
print("\n📋 测试2: 检查新增的方法")
assert hasattr(output_process, '_check_cached_audio_completion'), "缺少 _check_cached_audio_completion 方法"
print("✅ 方法检查通过")
# 测试3: 检查增强播放完成检测方法
print("\n📋 测试3: 检查增强播放完成检测方法")
assert hasattr(output_process, '_check_enhanced_playback_completion'), "缺少 _check_enhanced_playback_completion 方法"
print("✅ 增强播放完成检测方法检查通过")
# 测试4: 模拟缓存音频播放状态
print("\n📋 测试4: 模拟缓存音频播放状态")
output_process.is_playing_cached_audio = True
output_process.end_signal_received = True
output_process.currently_playing = False
output_process.preload_buffer = []
output_process.playback_buffer = []
output_process.last_audio_chunk_time = time.time() - 2.0 # 2秒前播放
# 测试缓存音频完成检测
result = output_process._check_cached_audio_completion()
assert result == True, "缓存音频应该检测为播放完成"
print("✅ 缓存音频完成检测逻辑正确")
# 测试5: 模拟缓存音频仍在播放
print("\n📋 测试5: 模拟缓存音频仍在播放")
output_process.playback_buffer = [b'fake_audio_data'] # 还有数据在播放缓冲区
result = output_process._check_cached_audio_completion()
assert result == False, "缓存音频仍在播放时应该检测为未完成"
print("✅ 缓存音频播放中检测逻辑正确")
print("\n🎉 所有测试通过!修复成功!")
# 清理
audio_queue.close()
event_queue.close()
if __name__ == "__main__":
test_cached_audio_completion()

View File

@ -1,172 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试缓存音频播放功能
"""
import os
import sys
import time
from pathlib import Path
# 添加项目路径
sys.path.append('.')
from audio_processes import (
get_greeting_cache_path,
greeting_cache_exists,
load_cached_audio,
save_greeting_cache
)
def test_cached_audio_playback():
"""测试缓存音频播放功能"""
print("🎵 测试缓存音频播放功能")
print("=" * 50)
# 确保缓存目录存在
os.makedirs("greeting_cache", exist_ok=True)
# 测试角色
character_name = "测试角色"
greeting_text = "这是一个测试角色的打招呼音频。"
# 1. 创建测试缓存音频
print(f"\n📝 创建测试缓存音频...")
cache_path = get_greeting_cache_path(character_name)
# 生成模拟音频数据(较大音频以测试播放)
mock_audio_data = b"mock_cached_audio_data_" * 100 # 约2.5KB
# 保存到缓存
save_success = save_greeting_cache(character_name, mock_audio_data)
print(f" 缓存保存结果: {save_success}")
# 2. 验证缓存文件
if os.path.exists(cache_path):
file_size = os.path.getsize(cache_path)
print(f" 缓存文件大小: {file_size} 字节")
# 3. 模拟缓存加载和播放状态检查
print(f"\n🎵 模拟缓存音频播放流程...")
# 模拟OutputProcess的播放状态
is_playing = False
preload_buffer = []
playback_buffer = []
preload_size = 3
# 模拟加载缓存音频
cached_audio = load_cached_audio(character_name)
if cached_audio:
print(f" ✅ 缓存音频加载成功: {len(cached_audio)} 字节")
# 模拟添加到预加载缓冲区
preload_buffer.append(cached_audio)
print(f" 📦 已添加到预加载缓冲区,当前大小: {len(preload_buffer)}")
# 检查播放触发条件(修复后的逻辑)
if (not is_playing and len(preload_buffer) >= preload_size):
print(f" 🎵 条件1预加载完成开始播放")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
print(f" 🎵 播放缓冲区大小: {len(playback_buffer)}")
elif (not is_playing and len(preload_buffer) > 0):
print(f" 🎵 条件2强制播放缓存音频")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
print(f" 🎵 播放缓冲区大小: {len(playback_buffer)}")
else:
print(f" ⚠️ 未满足播放条件")
print(f" is_playing: {is_playing}")
print(f" preload_buffer大小: {len(preload_buffer)}")
print(f" preload_size: {preload_size}")
else:
print(f" ❌ 缓存音频加载失败")
# 4. 模拟播放完成检测
if is_playing and len(playback_buffer) > 0:
print(f"\n🎵 模拟播放过程...")
print(f" 🎧 正在播放音频...")
# 模拟播放缓冲区清空
playback_buffer.clear()
is_playing = False
print(f" ✅ 播放完成")
print(f"\n✅ 缓存音频播放测试完成")
def test_different_audio_sizes():
"""测试不同大小的音频文件"""
print("\n🎵 测试不同大小的音频文件")
print("=" * 40)
test_cases = [
{"name": "小音频", "size": 1}, # 1个音频块
{"name": "中等音频", "size": 5}, # 5个音频块
{"name": "大音频", "size": 10}, # 10个音频块
]
preload_size = 3
for case in test_cases:
print(f"\n📝 测试{case['name']} ({case['size']}个音频块)")
# 模拟音频块
audio_chunks = [f"chunk_{i}".encode() for i in range(case['size'])]
# 模拟播放状态
is_playing = False
preload_buffer = []
playback_buffer = []
# 添加音频到预加载缓冲区
preload_buffer.extend(audio_chunks)
print(f" 📦 添加到预加载缓冲区: {len(preload_buffer)}")
# 检查播放触发条件
if (not is_playing and len(preload_buffer) >= preload_size):
print(f" 🎵 预加载完成,开始播放")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
elif (not is_playing and len(preload_buffer) > 0):
print(f" 🎵 强制播放缓存音频")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
else:
print(f" ⚠️ 未满足播放条件")
print(f" 📊 播放状态: is_playing={is_playing}, playback_buffer={len(playback_buffer)}, preload_buffer={len(preload_buffer)}")
def cleanup_test_files():
"""清理测试文件"""
print("\n🧹 清理测试文件")
cache_path = get_greeting_cache_path("测试角色")
if os.path.exists(cache_path):
try:
os.remove(cache_path)
print(f" 已删除: {cache_path}")
except Exception as e:
print(f" 删除失败: {e}")
if __name__ == "__main__":
print("🚀 开始测试缓存音频播放功能")
try:
# 测试缓存音频播放
test_cached_audio_playback()
# 测试不同大小的音频
test_different_audio_sizes()
finally:
# 清理测试文件
cleanup_test_files()
print("\n🎉 所有测试完成!")

View File

@ -1,91 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试缓存音频时序修复
"""
import sys
import os
import time
sys.path.append(os.path.dirname(__file__))
from audio_processes import OutputProcess
import multiprocessing as mp
def test_cached_audio_timing():
"""测试缓存音频时序修复"""
print("🧪 开始测试缓存音频时序修复...")
# 创建测试队列
audio_queue = mp.Queue(maxsize=100)
event_queue = mp.Queue(maxsize=100)
# 创建输出进程实例
config = {
'buffer_size': 1000,
'show_progress': True,
'progress_interval': 100,
'tts_speaker': 'zh_female_wanqudashu_moon_bigtts'
}
output_process = OutputProcess(audio_queue, config, event_queue)
# 测试1: 检查关键时序变量
print("\n📋 测试1: 检查关键时序变量")
assert hasattr(output_process, 'last_audio_chunk_time'), "缺少 last_audio_chunk_time 变量"
assert hasattr(output_process, 'all_audio_received'), "缺少 all_audio_received 变量"
print(f"✅ 初始 last_audio_chunk_time: {output_process.last_audio_chunk_time}")
print(f"✅ 初始 all_audio_received: {output_process.all_audio_received}")
# 测试2: 模拟缓存音频处理前的状态
print("\n📋 测试2: 检查初始状态")
initial_time = output_process.last_audio_chunk_time
print(f"✅ 初始时间戳: {initial_time}")
# 测试3: 验证我们的修复逻辑
print("\n📋 测试3: 验证修复逻辑")
# 模拟设置时序变量(这是我们的修复核心)
test_time = time.time()
output_process.last_audio_chunk_time = test_time
output_process.all_audio_received = True
print(f"✅ 设置后 last_audio_chunk_time: {output_process.last_audio_chunk_time}")
print(f"✅ 设置后 all_audio_received: {output_process.all_audio_received}")
# 验证时间差计算
time_diff = time.time() - output_process.last_audio_chunk_time
print(f"✅ 时间差计算: {time_diff:.3f}应该接近0")
# 测试4: 检查是否会有异常的时间差(修复前的问题)
print("\n📋 测试4: 检查异常时间差问题")
# 模拟修复前的问题last_audio_chunk_time为0或很老的时间
old_time = output_process.last_audio_chunk_time
output_process.last_audio_chunk_time = 0 # 模拟未初始化的情况
# 模拟播放完成检测逻辑
if output_process.last_audio_chunk_time > 0:
time_since_last = time.time() - output_process.last_audio_chunk_time
print(f"⚠️ 时间差: {time_since_last:.3f}")
else:
print("⚠️ last_audio_chunk_time未设置这会导致立即完成")
# 恢复正确的时间
output_process.last_audio_chunk_time = old_time
print("\n🎉 时序修复验证完成!")
print("📝 修复要点:")
print(" 1. 在缓存音频开始播放时设置last_audio_chunk_time")
print(" 2. 确保all_audio_received在适当时机设置")
print(" 3. 避免出现49.292秒的异常时间差")
# 清理
output_process.running = False
time.sleep(0.1) # 给线程时间清理
return True
if __name__ == "__main__":
test_cached_audio_timing()

View File

@ -1,139 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
最终集成测试验证缓存播放修复
"""
import os
import sys
import time
# 添加项目路径
sys.path.append('.')
def create_test_cache():
"""创建测试缓存文件"""
from audio_processes import save_greeting_cache
print("🎵 创建测试缓存文件...")
# 创建李白角色的缓存
libai_audio = b"libai_greeting_audio_" * 50 # 约1KB
save_greeting_cache("李白", libai_audio)
print(" ✅ 李白缓存已创建")
# 创建猪八戒角色的缓存
zhubajie_audio = b"zhubajie_greeting_audio_" * 80 # 约1.6KB
save_greeting_cache("猪八戒", zhubajie_audio)
print(" ✅ 猪八戒缓存已创建")
def simulate_cached_playback():
"""模拟缓存播放流程"""
print("\n🎭 模拟角色切换和缓存播放")
print("=" * 50)
characters = ["李白", "猪八戒"]
for i, character in enumerate(characters):
print(f"\n📝 第 {i+1} 次角色切换: {character}")
# 模拟检查缓存
from audio_processes import greeting_cache_exists, load_cached_audio
if greeting_cache_exists(character):
print(f" ✅ 找到缓存文件")
# 模拟加载缓存
cached_audio = load_cached_audio(character)
if cached_audio:
print(f" 📁 缓存大小: {len(cached_audio)} 字节")
# 模拟OutputProcess的播放逻辑
is_playing = False
preload_buffer = []
playback_buffer = []
preload_size = 3
# 添加缓存音频到预加载缓冲区
preload_buffer.append(cached_audio)
print(f" 📦 添加到预加载缓冲区")
# 应用修复后的播放触发逻辑
if (not is_playing and len(preload_buffer) >= preload_size):
print(f" 🎵 预加载完成,开始播放")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
elif (not is_playing and len(preload_buffer) > 0):
print(f" 🎵 强制播放缓存音频")
playback_buffer.extend(preload_buffer)
preload_buffer.clear()
is_playing = True
if is_playing:
print(f" 🎧 开始播放 {character} 的打招呼音频...")
print(f" 📊 播放缓冲区: {len(playback_buffer)}")
# 模拟播放完成
time.sleep(0.1)
playback_buffer.clear()
is_playing = False
print(f" ✅ 播放完成")
else:
print(f" ❌ 播放未启动")
else:
print(f" ❌ 缓存加载失败")
else:
print(f" ❌ 缓存不存在")
def verify_performance_improvement():
"""验证性能改进"""
print("\n⚡ 性能改进验证")
print("=" * 30)
print("📊 对比测试:")
print(" ❌ 无缓存: TTS生成 (2-3秒) + 播放")
print(" ✅ 有缓存: 直接播放 (<0.5秒)")
print(" 📈 性能提升: 约80%时间节约")
print("\n🎯 用户体验:")
print(" ✅ 角色切换更流畅")
print(" ✅ 无等待时间")
print(" ✅ 即时响应")
def cleanup():
"""清理测试文件"""
print("\n🧹 清理测试文件")
import shutil
if os.path.exists("greeting_cache"):
try:
shutil.rmtree("greeting_cache")
print(" ✅ 缓存目录已删除")
except Exception as e:
print(f" ❌ 清理失败: {e}")
if __name__ == "__main__":
print("🚀 开始最终集成测试")
try:
# 创建测试缓存
create_test_cache()
# 模拟缓存播放
simulate_cached_playback()
# 验证性能改进
verify_performance_improvement()
print("\n🎉 集成测试完成!缓存播放功能已修复")
except Exception as e:
print(f"\n❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
finally:
# 清理测试文件
cleanup()

View File

@ -1,139 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试完整的角色greeting缓存流程
"""
import os
import sys
import json
from pathlib import Path
# 添加项目路径
sys.path.append('.')
from audio_processes import (
get_greeting_cache_path,
greeting_cache_exists,
load_cached_audio,
save_greeting_cache
)
def test_character_greeting_cache():
"""测试角色greeting缓存流程"""
print("🧪 测试角色greeting缓存流程")
print("=" * 50)
# 测试角色配置
characters = [
{"name": "李白", "greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!"},
{"name": "猪八戒", "greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?"}
]
for character in characters:
character_name = character["name"]
greeting_text = character["greeting"]
print(f"\n📝 测试角色: {character_name}")
print(f" 打招呼文本: {greeting_text}")
# 1. 检查缓存是否已存在
cache_exists = greeting_cache_exists(character_name)
print(f" 1. 缓存存在检查: {cache_exists}")
# 2. 如果不存在缓存,模拟生成并保存
if not cache_exists:
print(f" 2. 模拟生成TTS音频...")
# 模拟音频数据实际使用时会是真实的TTS生成的音频
mock_audio_data = f"mock_audio_for_{character_name}".encode('utf-8') + os.urandom(1000)
print(f" 3. 保存到缓存...")
save_success = save_greeting_cache(character_name, mock_audio_data)
print(f" 4. 保存结果: {save_success}")
else:
print(f" 2. 缓存已存在,跳过生成")
# 3. 验证缓存可以正常加载
print(f" 5. 验证缓存加载...")
cached_audio = load_cached_audio(character_name)
if cached_audio:
print(f" 6. 缓存加载成功: {len(cached_audio)} 字节")
else:
print(f" 6. 缓存加载失败")
print(f"{character_name} 的缓存流程测试完成")
print(f"\n🎉 所有角色的缓存流程测试完成")
def test_cache_hit_miss_scenario():
"""测试缓存命中和未命中场景"""
print("\n🧪 测试缓存命中和未命中场景")
print("=" * 50)
character_name = "测试角色"
greeting_text = "这是一个测试角色的打招呼文本。"
# 第一次调用 - 缓存未命中
print(f"\n📝 第一次调用(缓存未命中)")
cache_exists_before = greeting_cache_exists(character_name)
print(f" 缓存存在: {cache_exists_before}")
if not cache_exists_before:
print(" 模拟TTS生成和保存缓存...")
mock_audio = b"first_call_audio_data" + os.urandom(500)
save_greeting_cache(character_name, mock_audio)
# 第二次调用 - 缓存命中
print(f"\n📝 第二次调用(缓存命中)")
cache_exists_after = greeting_cache_exists(character_name)
print(f" 缓存存在: {cache_exists_after}")
if cache_exists_after:
cached_audio = load_cached_audio(character_name)
print(f" 成功加载缓存: {len(cached_audio)} 字节")
print(f"\n✅ 缓存命中/未命中场景测试完成")
def cleanup_test_files():
"""清理测试文件"""
print("\n🧹 清理测试文件")
print("=" * 30)
test_characters = ["李白", "猪八戒", "测试角色"]
for character_name in test_characters:
cache_path = get_greeting_cache_path(character_name)
cache_file = Path(cache_path)
if cache_file.exists():
try:
cache_file.unlink()
print(f" 已删除: {cache_file.name}")
except Exception as e:
print(f" 删除失败 {cache_file.name}: {e}")
# 检查缓存目录是否为空
cache_dir = Path("greeting_cache")
if cache_dir.exists() and not any(cache_dir.iterdir()):
try:
cache_dir.rmdir()
print(f" 已删除空目录: {cache_dir}")
except Exception as e:
print(f" 删除目录失败: {e}")
if __name__ == "__main__":
print("🚀 开始测试完整的角色greeting缓存流程")
try:
# 测试角色greeting缓存流程
test_character_greeting_cache()
# 测试缓存命中和未命中场景
test_cache_hit_miss_scenario()
finally:
# 清理测试文件
cleanup_test_files()
print("\n🎉 所有测试完成!")

View File

@ -1,115 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试角色greeting缓存功能
"""
import os
import sys
import json
from pathlib import Path
# 添加项目路径
sys.path.append('.')
from audio_processes import (
get_greeting_cache_path,
greeting_cache_exists,
load_cached_audio,
save_greeting_cache
)
def test_cache_functions():
"""测试缓存功能"""
print("🧪 测试角色greeting缓存功能")
print("=" * 50)
# 测试角色名称
test_character = "libai"
test_audio_data = b"test_audio_data_" + os.urandom(100)
# 1. 测试缓存路径生成
print("\n1⃣ 测试缓存路径生成")
cache_path = get_greeting_cache_path(test_character)
print(f" 缓存路径: {cache_path}")
# 2. 测试缓存存在检查
print("\n2⃣ 测试缓存存在检查")
exists_before = greeting_cache_exists(test_character)
print(f" 缓存存在检查 (保存前): {exists_before}")
# 3. 测试缓存保存
print("\n3⃣ 测试缓存保存")
save_success = save_greeting_cache(test_character, test_audio_data)
print(f" 缓存保存成功: {save_success}")
# 4. 测试缓存存在检查(保存后)
print("\n4⃣ 测试缓存存在检查(保存后)")
exists_after = greeting_cache_exists(test_character)
print(f" 缓存存在检查 (保存后): {exists_after}")
# 5. 测试缓存加载
print("\n5⃣ 测试缓存加载")
loaded_audio = load_cached_audio(test_character)
if loaded_audio:
print(f" 缓存加载成功: {len(loaded_audio)} 字节")
print(f" 数据匹配: {loaded_audio == test_audio_data}")
else:
print(" 缓存加载失败")
# 6. 检查缓存文件
print("\n6⃣ 检查缓存文件")
cache_file = Path(cache_path)
if cache_file.exists():
print(f" 缓存文件存在: {cache_file}")
print(f" 文件大小: {cache_file.stat().st_size} 字节")
else:
print(" 缓存文件不存在")
# 7. 清理测试文件
print("\n7⃣ 清理测试文件")
try:
if cache_file.exists():
cache_file.unlink()
print(" 测试文件已清理")
except Exception as e:
print(f" 清理失败: {e}")
print("\n✅ 缓存功能测试完成")
def test_character_configs():
"""测试角色配置"""
print("\n🧪 测试角色配置")
print("=" * 30)
characters_dir = Path("characters")
if not characters_dir.exists():
print("❌ characters目录不存在")
return
character_files = list(characters_dir.glob("*.json"))
print(f"📁 找到 {len(character_files)} 个角色配置文件:")
for character_file in character_files:
try:
with open(character_file, 'r', encoding='utf-8') as f:
config = json.load(f)
name = config.get("name", "未知")
greeting = config.get("greeting", "")
print(f" 📝 {name}: {greeting[:30]}...")
except Exception as e:
print(f" ❌ 读取 {character_file.name} 失败: {e}")
if __name__ == "__main__":
print("🚀 开始测试greeting缓存功能")
# 测试缓存功能
test_cache_functions()
# 测试角色配置
test_character_configs()
print("\n🎉 所有测试完成!")

View File

@ -1,170 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
录音停止功能测试脚本
验证play_greeting方法中的录音停止逻辑
"""
import sys
import time
sys.path.append('.')
def test_recording_stop_logic():
"""测试录音停止逻辑"""
print("🧪 测试录音停止逻辑...")
# 模拟ControlSystem类的关键部分
class MockControlCommand:
def __init__(self, command):
self.command = command
def __str__(self):
return f"ControlCommand({self.command})"
class MockRecordingState:
IDLE = "idle"
RECORDING = "recording"
PLAYING = "playing"
class MockControlSystem:
def __init__(self):
self.state = MockRecordingState.IDLE
self._monitoring_active = False
self.input_command_queue = []
self.commands_sent = []
def _ensure_recording_stopped(self):
"""确保录音功能完全停止 - 防止播放音频时产生回声"""
try:
# 停止当前录音(如果有)
if self.state == MockRecordingState.RECORDING:
print("🛑 停止当前录音...")
self.input_command_queue.append(MockControlCommand('stop_recording'))
self.commands_sent.append('stop_recording')
# 模拟等待录音停止完成
start_time = time.time()
while self.state == MockRecordingState.RECORDING and time.time() - start_time < 2.0:
time.sleep(0.1)
if self.state == MockRecordingState.RECORDING:
print("⚠️ 录音停止超时,强制设置状态")
self.state = MockRecordingState.IDLE
# 停止当前监听(如果有)
if hasattr(self, '_monitoring_active') and self._monitoring_active:
print("🛑 停止当前监听...")
self.input_command_queue.append(MockControlCommand('stop_monitoring'))
self.commands_sent.append('stop_monitoring')
self._monitoring_active = False
# 额外确保:再次发送停止命令
self.input_command_queue.append(MockControlCommand('stop_monitoring'))
self.commands_sent.append('stop_monitoring')
print("✅ 录音功能已完全停止")
except Exception as e:
print(f"❌ 停止录音功能时出错: {e}")
# 即使出错也要确保状态正确
self.state = MockRecordingState.IDLE
# 测试场景1: 从IDLE状态开始
print("\n📋 测试场景1: 从IDLE状态播放greeting")
system1 = MockControlSystem()
system1.state = MockRecordingState.IDLE
system1._monitoring_active = False
print(f" 初始状态: {system1.state}")
system1._ensure_recording_stopped()
print(f" 发送的命令: {system1.commands_sent}")
print(f" 最终状态: {system1.state}")
assert system1.state == MockRecordingState.IDLE
assert len(system1.commands_sent) == 1 # 只有一个额外的stop_monitoring
assert system1.commands_sent[0] == 'stop_monitoring'
# 测试场景2: 从RECORDING状态开始
print("\n📋 测试场景2: 从RECORDING状态播放greeting")
system2 = MockControlSystem()
system2.state = MockRecordingState.RECORDING
system2._monitoring_active = True
print(f" 初始状态: {system2.state}")
print(f" 监听状态: {system2._monitoring_active}")
system2._ensure_recording_stopped()
print(f" 发送的命令: {system2.commands_sent}")
print(f" 最终状态: {system2.state}")
assert system2.state == MockRecordingState.IDLE
assert 'stop_recording' in system2.commands_sent
assert 'stop_monitoring' in system2.commands_sent
assert len([cmd for cmd in system2.commands_sent if cmd == 'stop_monitoring']) == 2 # 应该发送两次
# 测试场景3: 模拟录音停止超时
print("\n📋 测试场景3: 录音停止超时")
system3 = MockControlSystem()
system3.state = MockRecordingState.RECORDING
system3._monitoring_active = False
# 修改方法,模拟录音无法停止的情况
def mock_ensure_recording_stopped_timeout(self):
"""模拟录音停止超时的情况"""
try:
if self.state == MockRecordingState.RECORDING:
print("🛑 停止当前录音...")
self.commands_sent.append('stop_recording')
# 模拟等待但状态不变
start_time = time.time()
# 故意不改变状态,模拟超时
if self.state == MockRecordingState.RECORDING:
print("⚠️ 录音停止超时,强制设置状态")
self.state = MockRecordingState.IDLE
print("✅ 录音功能已完全停止")
except Exception as e:
print(f"❌ 停止录音功能时出错: {e}")
self.state = MockRecordingState.IDLE
system3._ensure_recording_stopped = mock_ensure_recording_stopped_timeout.__get__(system3)
print(f" 初始状态: {system3.state}")
system3._ensure_recording_stopped()
print(f" 发送的命令: {system3.commands_sent}")
print(f" 最终状态: {system3.state}")
assert system3.state == MockRecordingState.IDLE
assert 'stop_recording' in system3.commands_sent
print("\n✅ 所有录音停止逻辑测试通过!")
def test_play_greeting_integration():
"""测试play_greeting集成"""
print("\n🧪 测试play_greeting集成...")
# 模拟集成测试
print("📋 模拟play_greeting流程:")
print(" 1. 获取角色配置")
print(" 2. 调用_ensure_recording_stopped()")
print(" 3. 设置状态为PLAYING")
print(" 4. 检查缓存或生成TTS")
print(" 5. 发送音频到输出队列")
print("\n🎯 关键改进点:")
print(" ✅ 在播放前确保录音完全停止")
print(" ✅ 防止音频播放时的回声问题")
print(" ✅ 状态转换的完整性和安全性")
print(" ✅ 错误处理和超时机制")
print("\n✅ play_greeting集成测试通过")
if __name__ == "__main__":
try:
test_recording_stop_logic()
test_play_greeting_integration()
print("\n🎉 所有测试完成!录音停止功能已正确实现。")
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@ -1,147 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模拟角色切换场景测试缓存功能
"""
import os
import sys
import json
import time
from pathlib import Path
# 添加项目路径
sys.path.append('.')
def simulate_role_switching():
"""模拟角色切换场景"""
print("🎭 模拟角色切换场景测试")
print("=" * 50)
# 模拟角色配置
characters = {
"李白": {
"name": "李白",
"greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!",
"voice": "ICL_zh_male_huzi_v1_tob"
},
"猪八戒": {
"name": "猪八戒",
"greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?",
"voice": "zh_male_zhubajie_mars_bigtts"
}
}
# 确保缓存目录存在
os.makedirs("greeting_cache", exist_ok=True)
# 模拟多次角色切换
for iteration in range(3):
print(f"\n🔄 第 {iteration + 1} 次角色切换测试")
for character_name, character_config in characters.items():
print(f"\n📝 切换到角色: {character_name}")
# 检查缓存是否存在
cache_path = f"greeting_cache/{character_name}.wav"
cache_exists = os.path.exists(cache_path)
if cache_exists:
print(f" ✅ 找到缓存: {cache_path}")
# 模拟加载缓存
file_size = os.path.getsize(cache_path)
print(f" 📁 缓存文件大小: {file_size} 字节")
print(f" ⚡ 使用缓存播放无需TTS生成")
else:
print(f" ❌ 无缓存需要生成TTS")
print(f" 🎵 模拟TTS生成...")
# 模拟TTS生成延迟
time.sleep(0.1)
# 模拟生成音频数据
mock_audio = f"audio_for_{character_name}".encode('utf-8') + os.urandom(2000)
# 保存到缓存
try:
with open(cache_path, 'wb') as f:
f.write(mock_audio)
print(f" 💾 已保存到缓存: {cache_path}")
print(f" 📊 缓存大小: {len(mock_audio)} 字节")
except Exception as e:
print(f" ❌ 保存缓存失败: {e}")
print(f" 🎵 模拟播放打招呼: {character_config['greeting'][:30]}...")
print(f"{character_name} 打招呼完成")
print(f"\n🎊 角色切换测试完成")
def analyze_cache_performance():
"""分析缓存性能"""
print("\n📊 缓存性能分析")
print("=" * 30)
cache_dir = Path("greeting_cache")
if not cache_dir.exists():
print("❌ 缓存目录不存在")
return
cache_files = list(cache_dir.glob("*.wav"))
if not cache_files:
print("❌ 没有找到缓存文件")
return
total_size = 0
print(f"📁 找到 {len(cache_files)} 个缓存文件:")
for cache_file in cache_files:
file_size = cache_file.stat().st_size
total_size += file_size
character_name = cache_file.stem
print(f" 📄 {character_name}: {file_size} 字节")
print(f"\n📊 缓存统计:")
print(f" 总文件数: {len(cache_files)}")
print(f" 总大小: {total_size} 字节 ({total_size/1024:.1f} KB)")
print(f" 平均大小: {total_size/len(cache_files):.1f} 字节")
def cleanup_cache():
"""清理缓存"""
print("\n🧹 清理缓存")
print("=" * 20)
cache_dir = Path("greeting_cache")
if cache_dir.exists():
for cache_file in cache_dir.glob("*.wav"):
try:
cache_file.unlink()
print(f" 已删除: {cache_file.name}")
except Exception as e:
print(f" 删除失败 {cache_file.name}: {e}")
# 尝试删除空目录
try:
if not any(cache_dir.iterdir()):
cache_dir.rmdir()
print(f" 已删除空目录: {cache_dir}")
except Exception as e:
print(f" 删除目录失败: {e}")
else:
print(" 缓存目录不存在")
if __name__ == "__main__":
print("🚀 开始角色切换场景测试")
try:
# 模拟角色切换
simulate_role_switching()
# 分析缓存性能
analyze_cache_performance()
finally:
# 清理缓存
cleanup_cache()
print("\n🎉 所有测试完成!")