Compare commits

...

11 Commits

Author SHA1 Message Date
朱潮
a7876fb472 add config 2025-09-26 13:11:14 +08:00
朱潮
0f15b5060b add config 2025-09-26 11:47:28 +08:00
朱潮
91a9043c4a 缓存 2025-09-25 11:25:16 +08:00
朱潮
9cf562e3be 缓存 2025-09-25 10:41:27 +08:00
朱潮
d1193381ac 增加自启动 2025-09-25 09:40:12 +08:00
朱潮
997691cc2b 修复无声音导致的 all_audio_received问题 2025-09-23 14:14:41 +08:00
朱潮
01d55be032 修复无声音导致的 all_audio_received问题 2025-09-23 14:11:22 +08:00
朱潮
26a42452c8 cache audio 2025-09-23 13:40:57 +08:00
朱潮
2dff81ecb7 增NFC呼功能 2025-09-22 02:06:17 +08:00
朱潮
f416ef0036 增NFC呼功能 2025-09-22 02:06:00 +08:00
朱潮
67549566ec ndf 2025-09-22 00:40:12 +08:00
33 changed files with 2144 additions and 1456 deletions

View File

@ -1,127 +0,0 @@
# Audio Processes 改进总结
## 问题背景
- 原始问题TTS音频只播放3个字符就停止出现ALSA underrun错误
- 根本原因:音频缓冲区管理不当,播放策略过于保守
## 改进内容
### 1. 音频播放优化 (_play_audio 方法)
- **改进前**:保守的播放策略,需要缓冲区有足够数据才开始播放
- **改进后**
- 借鉴 recorder.py 的播放策略:只要有数据就播放
- 添加错误恢复机制,自动检测和恢复 ALSA underrun
- 优化缓冲区管理,减少延迟
### 2. TTS 工作线程模式
- **参考**: recorder.py 的 TTS 工作线程实现
- **实现功能**
- 独立的 TTS 工作线程处理音频生成
- 任务队列管理,避免阻塞主线程
- 统一的 TTS 请求接口 `process_tts_request()`
- 支持流式音频处理
### 3. 统一的音频播放队列
- **InputProcess 和 OutputProcess 都支持**
- TTS 工作线程
- 音频生成和播放队列
- 统一的错误处理和日志记录
### 4. 关键改进点
#### 音频播放策略
```python
# 改进前:保守策略
if len(self.playback_buffer) > 2: # 需要缓冲区有足够数据
# 开始播放
# 改进后:积极策略 + 错误恢复
audio_chunk = self.playback_buffer.pop(0)
if audio_chunk and len(audio_chunk) > 0:
try:
self.output_stream.write(audio_chunk)
# 统计信息
except Exception as e:
# ALSA underrun 错误恢复
if "underrun" in str(e).lower():
# 自动恢复音频流
```
#### TTS 工作线程
```python
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
task = self.tts_task_queue.get(timeout=1.0)
if task is None:
break
task_type, content = task
if task_type == "tts_sentence":
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
```
#### 错误恢复机制
```python
# ALSA underrun 检测和恢复
if "underrun" in str(e).lower() or "alsa" in str(e).lower():
self.logger.info("检测到ALSA underrun尝试恢复音频流")
try:
if self.output_stream:
self.output_stream.stop_stream()
time.sleep(0.1)
self.output_stream.start_stream()
self.logger.info("音频流已恢复")
except Exception as recovery_e:
self.logger.error(f"恢复音频流失败: {recovery_e}")
self.playback_buffer.clear()
```
### 5. 性能优化
- 减少日志输出频率,提高性能
- 优化队列处理策略,使用适当的超时设置
- 动态调整休眠时间根据播放状态优化CPU使用
### 6. 测试和验证
- 创建了测试脚本 `test_audio_processes.py`
- 验证了语法正确性
- 可以测试 TTS 功能的完整性
## 使用方法
### 在控制系统中使用
```python
from audio_processes import InputProcess, OutputProcess
# 创建输入和输出进程
input_process = InputProcess(command_queue, event_queue)
output_process = OutputProcess(audio_queue)
# 处理TTS请求
output_process.process_tts_request("你好,这是测试语音")
```
### 独立测试
```bash
python test_audio_processes.py
```
## 预期效果
- 解决 ALSA underrun 错误
- 提高音频播放的流畅性
- 减少 TTS 处理的延迟
- 提供更稳定的音频处理能力
## 注意事项
1. 确保系统安装了必要的依赖:`requests`, `pyaudio`
2. 检查音频设备是否正常工作
3. 网络连接正常用于TTS服务
4. 适当调整音频参数以适应不同环境

View File

@ -1,101 +0,0 @@
# 快速启动指南
## 一键启动(推荐)
```bash
# 直接运行,系统会自动校准和启动监听
python multiprocess_recorder.py
# 指定角色
python multiprocess_recorder.py -c libai
# 详细模式
python multiprocess_recorder.py -v
```
## 编程方式启动
### 最简单的方式
```python
from control_system import ControlSystem
# 创建控制系统
control_system = ControlSystem()
# 一键启动(自动校准 + 自动监听)
control_system.start()
```
### 自定义配置
```python
from control_system import ControlSystem
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 启动选项:
# auto_calibration=True - 自动校准语音检测器
# auto_monitoring=True - 自动启动音频监听
control_system.start(auto_calibration=True, auto_monitoring=True)
```
## 手动控制
```python
from control_system import ControlSystem
control_system = ControlSystem()
# 只启动进程,不自动校准和监听
control_system._start_processes()
# 手动步骤:
control_system.start_calibration() # 1. 启动校准
control_system.wait_for_calibration_complete() # 2. 等待校准完成
control_system.start_monitoring() # 3. 启动监听
# 运行中可以随时控制:
control_system.stop_monitoring() # 停止监听
control_system.start_monitoring() # 重新启动监听
# 查询状态:
status = control_system.get_calibration_status() # 获取校准状态
status = control_system.get_monitoring_status() # 获取监听状态
# 关闭系统:
control_system.shutdown()
```
## 启动流程
系统启动时会按以下顺序执行:
1. **启动进程** - 创建输入进程和输出进程
2. **自动校准** - 校准语音检测器约3-5秒
3. **启动监听** - 启用音频监听功能
4. **开始运行** - 进入主控制循环,开始检测语音
## 注意事项
- **校准时间**首次启动需要3-5秒进行语音检测器校准
- **音频权限**:确保麦克风权限已授予
- **环境安静**:校准时请保持环境安静
- **API密钥**如需LLM功能请设置 `ARK_API_KEY` 环境变量
## 故障排除
如果校准失败:
- 检查麦克风是否正常工作
- 确保环境安静,无背景噪音
- 尝试重新启动系统
如果监听失败:
- 检查音频设备是否被其他程序占用
- 尝试重启程序
- 查看日志文件排查问题

143
README.md
View File

@ -1,143 +0,0 @@
# 智能语音助手系统使用说明
## 功能概述
这是一个完整的智能语音助手系统,集成了语音录制、语音识别、大语言模型和文本转语音功能,实现语音对话交互。
## 完整工作流程
1. 🎙️ **语音录制** - 基于ZCR的智能语音检测
2. 📝 **保存录音** - 自动保存为WAV文件
3. 🤖 **语音识别** - 使用字节跳动ASR将语音转为文字
4. 💬 **AI回复** - 使用豆包大模型生成智能回复
5. 🔊 **语音回复** - 使用字节跳动TTS将AI回复转为语音
## 环境配置
### 1. 安装依赖
```bash
pip install websockets requests pyaudio numpy
```
### 2. 安装音频播放器(树莓派/Linux系统
系统使用PCM格式音频只需要安装基础的音频播放工具
```bash
# 安装 alsa-utils包含aplay播放器
sudo apt-get update
sudo apt-get install alsa-utils
```
> **优势**: PCM格式无需额外解码器兼容性更好资源占用更少。
> **注意**: macOS和Windows系统通常内置支持音频播放无需额外安装。
### 3. 设置API密钥
为了启用大语言模型功能,需要设置环境变量:
```bash
# Linux/Mac
export ARK_API_KEY='your_api_key_here'
# Windows
set ARK_API_KEY=your_api_key_here
```
> **注意**: 语音识别和文本转语音功能使用内置的API密钥无需额外配置。
## 使用方法
### 基本使用
```bash
python recorder.py
```
### 功能说明
- 🎯 **自动检测语音**:系统会自动检测声音并开始录音
- ⏱️ **智能停止**静音3秒后自动停止录音
- 🔊 **自动播放**:录音完成后自动播放音频
- 📝 **语音识别**:自动将语音转为文字
- 🤖 **AI助手**:自动调用大语言模型生成回复
### 配置参数
- `energy_threshold=200` - 能量阈值(调整灵敏度)
- `silence_threshold=3.0` - 静音阈值(秒)
- `min_recording_time=2.0` - 最小录音时间(秒)
- `max_recording_time=30.0` - 最大录音时间(秒)
- `enable_asr=True` - 启用语音识别
- `enable_llm=True` - 启用大语言模型
- `enable_tts=True` - 启用文本转语音
## 输出示例
```
🎤 开始监听...
能量阈值: 200 (已弃用)
静音阈值: 3.0秒
📖 使用说明:
- 检测到声音自动开始录音
- 持续静音3秒自动结束录音
- 最少录音2秒最多30秒
- 录音完成后自动进行语音识别和AI回复
- 按 Ctrl+C 退出
==================================================
🎙️ 检测到声音,开始录音...
📝 录音完成,时长: 3.45秒 (包含预录音 2.0秒)
✅ 录音已保存: recording_20250920_163022.wav
==================================================
📡 音频输入已保持关闭状态
🔄 开始处理音频...
🤖 开始语音识别...
📝 识别结果: 你好,今天天气怎么样?
--------------------------------------------------
🤖 调用大语言模型...
💬 AI助手回复: 你好!我无法实时获取天气信息,建议你查看天气预报或打开天气应用来了解今天的天气情况。有什么其他我可以帮助你的吗?
--------------------------------------------------
🔊 开始文本转语音...
TTS句子信息: {'code': 0, 'message': '', 'data': None, 'sentence': {'phonemes': [], 'text': '你好!我无法实时获取天气信息,建议你查看天气预报或打开天气应用来了解今天的天气情况。有什么其他我可以帮助你的吗?', 'words': [...]}}
✅ TTS音频已保存: tts_response_20250920_163022.pcm
📁 文件大小: 128.75 KB
🔊 播放AI语音回复...
✅ AI语音回复完成
🔄 准备重新开启音频输入
✅ 音频设备初始化成功
📡 音频输入已重新开启
```
## 注意事项
1. **网络连接**:需要网络连接来使用语音识别、大语言模型和文本转语音服务
2. **API密钥**需要有效的ARK_API_KEY才能使用大语言模型功能
3. **音频设备**:确保麦克风和扬声器工作正常
4. **权限**:确保程序有访问麦克风、网络和存储的权限
5. **文件存储**系统会保存录音文件和TTS生成的音频文件
## 故障排除
- 如果语音识别失败检查网络连接和API密钥
- 如果大语言模型失败检查ARK_API_KEY是否正确设置
- 如果文本转语音失败检查TTS服务状态
- 如果录音失败,检查麦克风权限和设备
- 如果播放失败,检查音频设备权限
- 如果PCM文件无法播放检查是否安装了alsa-utils
```bash
# 树莓派/Ubuntu/Debian系统
sudo apt-get install alsa-utils
# 或检查aplay是否安装
which aplay
```
## 技术特点
- 🎯 基于ZCR的精确语音检测
- 🚀 低延迟实时处理
- 💾 环形缓冲区防止音频丢失
- 🔧 自动调整能量阈值
- 📊 实时性能监控
- 🌐 完整的语音对话链路
- 📁 自动文件管理和权限设置
- 🔊 PCM格式音频无需额外解码器
## 生成的文件
- `recording_*.wav` - 录制的音频文件
- `tts_response_*.pcm` - AI语音回复文件PCM格式
## PCM格式优势
- **兼容性好**aplay原生支持树莓派开箱即用
- **资源占用少**无需解码过程CPU占用更低
- **延迟更低**:直接播放,无需格式转换
- **稳定性高**:减少依赖组件,提高系统稳定性

View File

@ -1,210 +0,0 @@
# 多进程音频控制系统 - 主进程控制功能
## 概述
本系统已经重构,支持主进程对输入进程的校准和监听功能进行精确控制。通过这些新功能,你可以:
1. **手动控制校准过程**:在适当的时间启动语音检测器校准
2. **精确控制监听状态**:按需启用或禁用音频监听
3. **获取实时状态**:查询校准进度和监听状态
## 主要功能
### 1. 校准功能
#### 启动校准
```python
# 启动语音检测器校准
success = control_system.start_calibration()
if success:
print("校准已启动")
```
#### 获取校准状态
```python
# 获取当前校准状态
status = control_system.get_calibration_status()
if status:
print(f"校准进度: {status['progress']*100:.1f}%")
print(f"是否在校准中: {status['calibrating']}")
```
#### 等待校准完成
```python
# 等待校准完成30秒超时
if control_system.wait_for_calibration_complete(timeout=30):
print("校准完成")
else:
print("校准超时")
```
### 2. 监听功能
#### 启动监听
```python
# 启动音频监听
success = control_system.start_monitoring()
if success:
print("监听已启动")
```
#### 停止监听
```python
# 停止音频监听
success = control_system.stop_monitoring()
if success:
print("监听已停止")
```
#### 获取监听状态
```python
# 获取当前监听状态
status = control_system.get_monitoring_status()
if status:
print(f"监听启用: {status['enabled']}")
print(f"正在录音: {status['recording']}")
print(f"音频流活跃: {status['audio_stream_active']}")
```
## 使用示例
### 方法1自动启动推荐
```python
from control_system import ControlSystem
# 1. 创建控制系统
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 2. 一键启动(自动校准和监听)
control_system.start(auto_calibration=True, auto_monitoring=True)
# 系统现在正在运行,会自动处理语音检测和录音
```
### 方法2手动控制
```python
from control_system import ControlSystem
import time
# 1. 创建控制系统
config = {
'system': {'log_level': "INFO"},
'audio': {'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024},
'recording': {'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0},
'processing': {'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai'}
}
control_system = ControlSystem(config)
# 2. 启动进程(但不自动启用监听)
control_system._start_processes()
# 3. 步骤1校准
print("开始校准...")
control_system.start_calibration()
# 等待校准完成
if control_system.wait_for_calibration_complete(timeout=30):
print("校准完成")
else:
print("校准失败")
exit(1)
# 4. 步骤2启动监听
print("开始监听...")
control_system.start_monitoring()
# 5. 运行一段时间
print("系统运行中...")
try:
while True:
# 检查事件和显示状态
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
print("用户中断")
# 6. 停止监听
print("停止监听...")
control_system.stop_monitoring()
# 7. 关闭系统
control_system.shutdown()
```
### 方法3混合控制
```python
from control_system import ControlSystem
# 1. 创建控制系统
control_system = ControlSystem(config)
# 2. 自动启动,但只校准,不自动监听
control_system.start(auto_calibration=True, auto_monitoring=False)
# 3. 手动控制监听
control_system.start_monitoring() # 启动监听
# ... 运行一段时间 ...
control_system.stop_monitoring() # 停止监听
control_system.start_monitoring() # 重新启动监听
# 4. 关闭系统
control_system.shutdown()
```
### 自动化示例
查看 `example_manual_control.py` 文件获取完整的自动化控制示例。
## 关键变化
### 1. 默认行为变化
- **之前**:输入进程启动后自动开始校准和监听
- **现在**:输入进程启动后处于静默状态,等待主进程命令
### 2. 新增控制接口
`ControlSystem` 类中新增了以下方法:
- `start_calibration()` - 启动校准
- `start_monitoring()` - 启动监听
- `stop_monitoring()` - 停止监听
- `get_calibration_status()` - 获取校准状态
- `get_monitoring_status()` - 获取监听状态
- `wait_for_calibration_complete(timeout)` - 等待校准完成
### 3. 新增命令支持
`InputProcess` 中支持以下新命令:
- `start_calibration` - 开始校准
- `start_monitoring` - 开始监听
- `stop_monitoring` - 停止监听
- `get_calibration_status` - 获取校准状态
- `get_monitoring_status` - 获取监听状态
## 使用建议
1. **初始化顺序**:建议按照"启动进程 → 校准 → 启动监听"的顺序进行
2. **错误处理**:建议对每个操作进行错误检查和重试
3. **状态监控**:定期检查状态以确保系统正常运行
4. **资源清理**:使用完毕后正确关闭系统
## 注意事项
1. **进程间通信**:所有控制都是通过进程间队列实现的,可能会有轻微延迟
2. **超时处理**:建议为所有状态查询操作设置合理的超时时间
3. **并发安全**:确保在多线程环境中正确使用这些方法
4. **音频设备**:启动和停止监听会重新初始化音频设备,可能有短暂延迟

View File

@ -1,190 +0,0 @@
# 多进程音频录音系统
基于进程隔离的音频处理架构,实现零延迟的录音和播放切换。
## 🚀 系统特点
### 核心优势
- **多进程架构**: 输入输出进程完全隔离,无需设备重置
- **零切换延迟**: 彻底解决传统单进程的音频切换问题
- **实时响应**: 并行处理录音和播放,真正的实时体验
- **智能检测**: 基于ZCR(零交叉率)的精确语音识别
- **流式TTS**: 实时音频生成和播放,减少等待时间
- **角色扮演**: 支持多种AI角色和音色
### 技术架构
```
主控制进程 ──┐
├─ 输入进程 (录音 + 语音检测)
├─ 输出进程 (音频播放)
└─ 在线AI服务 (STT + LLM + TTS)
```
## 📦 文件结构
```
Local-Voice/
├── recorder.py # 原始实现 (保留作为参考)
├── multiprocess_recorder.py # 主程序
├── audio_processes.py # 音频进程模块
├── control_system.py # 控制系统模块
├── config.json # 配置文件
└── characters/ # 角色配置目录
├── libai.json # 李白角色
└── zhubajie.json # 猪八戒角色
```
## 🛠️ 安装和运行
### 1. 环境要求
- Python 3.7+
- 音频输入设备 (麦克风)
- 网络连接 (用于在线AI服务)
### 2. 安装依赖
```bash
pip install pyaudio numpy requests websockets
```
### 3. 设置API密钥
```bash
export ARK_API_KEY='your_api_key_here'
```
### 4. 基本运行
```bash
# 使用默认角色 (李白)
python multiprocess_recorder.py
# 指定角色
python multiprocess_recorder.py -c zhubajie
# 列出可用角色
python multiprocess_recorder.py -l
# 使用配置文件
python multiprocess_recorder.py --config config.json
# 创建示例配置文件
python multiprocess_recorder.py --create-config
```
## ⚙️ 配置说明
### 主要配置项
| 配置项 | 说明 | 默认值 |
|--------|------|--------|
| `recording.min_duration` | 最小录音时长(秒) | 2.0 |
| `recording.max_duration` | 最大录音时长(秒) | 30.0 |
| `recording.silence_threshold` | 静音检测阈值(秒) | 3.0 |
| `detection.zcr_min` | ZCR最小值 | 2400 |
| `detection.zcr_max` | ZCR最大值 | 12000 |
| `processing.max_tokens` | LLM最大token数 | 50 |
### 音频参数
- 采样率: 16kHz
- 声道数: 1 (单声道)
- 位深度: 16位
- 格式: PCM
## 🎭 角色系统
### 支持的角色
- **libai**: 李白 - 文雅诗人风格
- **zhubajie**: <20>豬八戒 - 幽默风趣风格
### 自定义角色
`characters/` 目录创建JSON文件:
```json
{
"name": "角色名称",
"description": "角色描述",
"system_prompt": "系统提示词",
"voice": "zh_female_wanqudashu_moon_bigtts",
"max_tokens": 50
}
```
## 🔧 故障排除
### 常见问题
1. **音频设备问题**
```bash
# 检查音频设备
python multiprocess_recorder.py --check-env
```
2. **依赖缺失**
```bash
# 重新安装依赖
pip install --upgrade pyaudio numpy requests websockets
```
3. **网络连接问题**
- 检查网络连接
- 确认API密钥正确
- 检查防火墙设置
4. **权限问题**
```bash
# Linux系统可能需要音频权限
sudo usermod -a -G audio $USER
```
### 调试模式
```bash
# 启用详细输出
python multiprocess_recorder.py -v
```
## 📊 性能对比
| 指标 | 原始单进程 | 多进程架构 | 改善 |
|------|-----------|------------|------|
| 切换延迟 | 1-2秒 | 0秒 | 100% |
| CPU利用率 | 单核 | 多核 | 提升 |
| 响应速度 | 较慢 | 实时 | 显著改善 |
| 稳定性 | 一般 | 优秀 | 大幅提升 |
## 🔄 与原版本对比
### 原版本 (recorder.py)
- 单进程处理
- 需要频繁重置音频设备
- 录音和播放不能同时进行
- 切换延迟明显
### 新版本 (multiprocess_recorder.py)
- 多进程架构
- 输入输出完全隔离
- 零切换延迟
- 真正的并行处理
- 更好的稳定性和扩展性
## 📝 开发说明
### 架构设计
- **输入进程**: 专注录音和语音检测
- **输出进程**: 专注音频播放
- **主控制进程**: 协调整个系统和AI处理
### 进程间通信
- 使用 `multiprocessing.Queue` 进行安全通信
- 支持命令控制和事件通知
- 线程安全的音频数据传输
### 状态管理
- 清晰的状态机设计
- 完善的错误处理机制
- 优雅的进程退出流程
## 📄 许可证
本项目仅供学习和研究使用。
## 🤝 贡献
欢迎提交Issue和Pull Request来改进这个项目。

115
SYSTEMD_SETUP.md Normal file
View File

@ -0,0 +1,115 @@
# Local Voice Systemd 服务配置指南
## 问题解决方案
### 核心问题
- **直接运行**: 使用PulseAudio自动转换16000Hz采样率 ✅
- **systemd运行**: 直接访问USB设备只支持48000/44100Hz ❌
### 解决策略
通过配置systemd服务访问用户会话的PulseAudio利用其采样率转换能力。
## 部署步骤
### 1. 上传文件到树莓派
```bash
scp local-voice.service local-voice-user.service deploy-service.sh test-audio.py zhuchaowe@192.168.101.212:~/Local-Voice/
```
### 2. 运行部署脚本
```bash
cd ~/Local-Voice
./deploy-service.sh
```
### 3. 测试音频访问
```bash
python3 test-audio.py
```
### 4. 启动服务 (推荐使用用户级服务)
```bash
# 用户级服务 (推荐)
systemctl --user start local-voice-user.service
systemctl --user enable local-voice-user.service
# 查看日志
journalctl --user -u local-voice-user.service -f
# 如果用户级服务不工作,尝试系统级服务
sudo systemctl start local-voice.service
sudo systemctl enable local-voice.service
# 查看系统服务日志
sudo journalctl -u local-voice.service -f
```
## 服务配置说明
### 系统级服务 (local-voice.service)
- 适用于传统systemd环境
- 添加了PulseAudio环境变量
- 包含音频设备权限
### 用户级服务 (local-voice-user.service)
- 推荐方案,更好的音频访问
- 自动继承用户会话环境
- 无需额外环境变量配置
## 故障排除
### 音频设备不可用
1. 检查pipewire-pulse服务:
```bash
systemctl --user status pipewire-pulse
systemctl --user start pipewire-pulse
```
2. 检查音频设备权限:
```bash
groups # 确认在audio组中
```
3. 测试直接运行:
```bash
python3 multiprocess_recorder.py --enable-nfc
```
### 权限问题
如果遇到权限错误确保用户在audio组中:
```bash
sudo usermod -a -G audio $USER
# 重新登录或重启系统
```
### 环境变量问题
如果用户级服务正常但系统级服务有问题,可能需要:
1. 检查环境变量是否正确设置
2. 确认pipewire-pulse服务在系统启动时运行
3. 考虑使用用户级服务作为主要方案
## 验证步骤
1. **环境测试**: `python3 test-audio.py`
2. **服务启动**: `systemctl --user start local-voice-user.service`
3. **日志监控**: `journalctl --user -u local-voice-user.service -f`
4. **功能测试**: 使用NFC卡片触发录音功能
## 备用方案
如果上述方案不工作,可以考虑:
### 方案A: 使用screen会话
```bash
screen -dmS local-voice python3 ~/Local-Voice/multiprocess_recorder.py --enable-nfc
screen -r local-voice # 查看输出
```
### 方案B: 使用cron启动
在crontab中添加:
```
@reboot cd ~/Local-Voice && python3 multiprocess_recorder.py --enable-nfc
```
### 方案C: 修改代码支持多种采样率
如果需要修改代码支持48000Hz采样率请告知。

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,10 @@
{
"name": "爱因斯坦",
"description": "伟大的物理学家,相对论创立者",
"system_prompt": "我是阿尔伯特·爱因斯坦,物理学家。说话要有科学家的智慧感和好奇心,喜欢用简单的比喻解释复杂的概念。经常思考宇宙、时间、空间等深奥问题。要有幽默感,偶尔自嘲一下。说话要体现出对知识的热爱和对世界的好奇心,常用'有趣的是'、'想象一下'等词语来引导思考。",
"voice": "ICL_zh_male_youmodaye_tob",
"max_tokens": 500,
"greeting": "你好!我是爱因斯坦。想象力比知识更重要,因为知识是有限的,而想象力概括着世界的一切。",
"nfc_uid": "1DCAC90D0D1080",
"author": "Claude"
}

View File

@ -5,5 +5,6 @@
"voice": "ICL_zh_male_huzi_v1_tob",
"max_tokens": 500,
"greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!",
"nfc_uid": "1DC6C90D0D1080",
"author": "Claude"
}

10
characters/shaseng.json Normal file
View File

@ -0,0 +1,10 @@
{
"name": "沙僧",
"description": "西游记中的沙和尚,忠厚老实的护法",
"system_prompt": "我是沙和尚,沙悟净。原是流沙河的水怪,后随师父唐僧西天取经。为人忠厚老实,做事踏实稳重,从不偷懒。说话诚恳实在,常用'师父说得对'、'大师兄说得对'来表达对长者的尊重。性格温和,做事认真负责,是个可靠的伙伴。说话要体现出踏实肯干的性格。",
"voice": "ICL_zh_male_diyinchenyu_tob",
"max_tokens": 500,
"greeting": "贫僧沙悟净,见过各位。愿随师父西行取经,保护师父安全,一路降妖除魔。",
"nfc_uid": "1DC9C90D0D1080",
"author": "Claude"
}

10
characters/tangseng.json Normal file
View File

@ -0,0 +1,10 @@
{
"name": "唐僧",
"description": "西游记中的取经人,慈悲为怀的高僧",
"system_prompt": "贫僧唐三藏,法号玄奘,奉唐王之命前往西天取经。说话要温和慈悲,常常引用佛经教诲,劝人向善。对众生都要有慈悲心,即使是妖魔鬼怪也要度化。说话要文雅,常用'阿弥陀佛'、'善哉善哉'等佛家用语。回答要体现出高僧的智慧和慈悲心。",
"voice": "zh_male_tangseng_mars_bigtts",
"max_tokens": 500,
"greeting": "阿弥陀佛,贫僧唐三藏。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。",
"nfc_uid": "1DC8C90D0D1080",
"author": "Claude"
}

View File

@ -0,0 +1,10 @@
{
"name": "作业小助手",
"description": "专门催促小懒虫写作业的贴心小帮手",
"system_prompt": "我是来催你写作业的小助手!小懒虫,别再拖延了,作业要抓紧时间完成哦!我会用各种方式提醒你:温和鼓励、严肃提醒、偶尔撒娇催促。知道你不想写作业,但学习很重要啊!常用'该写作业啦'、'加油加油'、'别玩手机了'这样的话语。既要关心你,又要坚持原则,直到你把作业完成。记住,我是来帮你的,不是来批评你的。",
"voice": "zh_female_linjianvhai_moon_bigtts",
"max_tokens": 500,
"greeting": "小懒虫!作业写完了吗?快过来写作业,我陪着你一起加油!别让我一直催你哦~",
"nfc_uid": "1DCBC90D0D1080",
"author": "Claude"
}

View File

@ -5,5 +5,6 @@
"voice": "zh_male_zhubajie_mars_bigtts",
"max_tokens": 500,
"greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?",
"nfc_uid": "1DC7C90D0D1080",
"author": "Claude"
}

71
cleanup_recordings.py Normal file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
录音文件清理工具
用于清理过期的录音文件管理磁盘空间
"""
import sys
import os
import argparse
from audio_processes import (
cleanup_recordings,
cleanup_recordings_by_count,
list_recordings,
print_recording_summary
)
def main():
parser = argparse.ArgumentParser(description='录音文件清理工具')
parser.add_argument('--list', '-l', action='store_true', help='列出所有录音文件')
parser.add_argument('--summary', '-s', action='store_true', help='显示录音文件摘要')
parser.add_argument('--cleanup-time', '-t', type=int, default=24,
help='按时间清理保留最近N小时的文件默认24小时')
parser.add_argument('--cleanup-count', '-c', type=int, default=50,
help='按数量清理保留最新的N个文件默认50个')
parser.add_argument('--dry-run', '-d', action='store_true',
help='模拟运行,不实际删除文件')
args = parser.parse_args()
if args.list:
print("📋 录音文件列表:")
recordings = list_recordings()
if not recordings:
print(" 未找到录音文件")
else:
for i, recording in enumerate(recordings, 1):
print(f" {i}. {recording['filename']}")
print(f" 大小: {recording['size_mb']:.2f} MB")
print(f" 创建时间: {recording['created_time']}")
print(f" 修改时间: {recording['modified_time']}")
print(f" 文件年龄: {recording['age_hours']:.1f} 小时")
print()
return
if args.summary:
print_recording_summary()
return
if args.cleanup_time > 0:
print(f"🧹 按时间清理录音文件(保留 {args.cleanup_time} 小时内的文件)")
cleanup_recordings(retention_hours=args.cleanup_time, dry_run=args.dry_run)
if args.cleanup_count > 0:
print(f"🧹 按数量清理录音文件(保留最新的 {args.cleanup_count} 个文件)")
cleanup_recordings_by_count(max_files=args.cleanup_count, dry_run=args.dry_run)
if not any([args.list, args.summary, args.cleanup_time > 0, args.cleanup_count > 0]):
print("📖 录音文件清理工具")
print("使用 --help 查看帮助信息")
print()
print("常用命令:")
print(" python cleanup_recordings.py --summary # 显示摘要")
print(" python cleanup_recordings.py --list # 列出所有文件")
print(" python cleanup_recordings.py --cleanup-time 24 # 保留24小时内的文件")
print(" python cleanup_recordings.py --cleanup-count 30 # 保留最新的30个文件")
print(" python cleanup_recordings.py -t 24 -c 30 -d # 模拟运行")
if __name__ == "__main__":
main()

View File

@ -22,7 +22,9 @@
"enable_llm": true,
"enable_tts": true,
"character": "libai",
"max_tokens": 50
"max_tokens": 50,
"enable_chat_memory": true,
"max_history_length": 5
},
"detection": {
"zcr_min": 2400,
@ -35,5 +37,11 @@
"show_progress": true,
"progress_interval": 100,
"chunk_size": 512
},
"cleanup": {
"auto_cleanup": true,
"retention_hours": 1,
"max_files": 10,
"cleanup_interval": 120
}
}
}

View File

@ -6,27 +6,28 @@
实现主控制进程和状态管理
"""
import multiprocessing as mp
import queue
import time
import threading
import requests
import json
import asyncio
import base64
import gzip
import uuid
import asyncio
import websockets
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from enum import Enum
import json
import multiprocessing as mp
import os
import queue
import sys
import threading
import time
import uuid
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
import requests
import websockets
from audio_processes import (ControlCommand, InputProcess, OutputProcess,
ProcessEvent, RecordingState)
from nfc_manager import get_nfc_manager
from audio_processes import (
InputProcess, OutputProcess,
RecordingState, ControlCommand, ProcessEvent
)
def input_process_target(command_queue, event_queue, config):
"""输入进程的目标函数 - 在子进程中创建InputProcess实例"""
@ -90,9 +91,25 @@ class ControlSystem:
'failed_processing': 0
}
# 聊天历史记录
self.chat_history = []
# 从配置中读取聊天历史设置
self.enable_chat_memory = self.config.get('processing', {}).get('enable_chat_memory', True)
self.max_history_length = self.config.get('processing', {}).get('max_history_length', 5)
# 显示聊天记忆状态
if self.enable_chat_memory:
print(f"💬 聊天记忆功能已启用,最多保存 {self.max_history_length} 轮对话")
else:
print("💬 聊天记忆功能已禁用")
# 运行状态
self.running = True
# NFC管理器
self.nfc_manager = None
self.nfc_enabled = False
# 检查依赖
self._check_dependencies()
@ -134,7 +151,7 @@ class ControlSystem:
'llm': {
'api_url': "https://ark.cn-beijing.volces.com/api/v3/chat/completions",
'model': "doubao-seed-1-6-flash-250828",
'api_key': os.environ.get("ARK_API_KEY", ""),
'api_key': "f8e43677-1c23-4e21-8a4c-66e7103a8155",
'max_tokens': 50
},
'tts': {
@ -274,26 +291,51 @@ class ControlSystem:
# 等待校准完成
if self.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
# 校准完成后播放打招呼
print("🎭 播放角色打招呼...")
greeting_success = self.play_greeting()
if not greeting_success:
print("⚠️ 打招呼播放失败,继续运行...")
# 如果启用了NFC开始持续执行NFC检测但不播放打招呼或启动监听
if hasattr(self, 'nfc_enabled') and self.nfc_enabled:
print("📱 开始持续执行NFC检测等待NFC卡片...")
# NFC检测已经在enable_nfc()中启动,这里只需确认状态
if self.nfc_manager and self.nfc_manager.running:
print("✅ NFC检测已在运行")
else:
print("⚠️ NFC检测未运行尝试启动...")
try:
self.nfc_manager.start()
print("✅ NFC检测已启动")
except Exception as e:
print(f"❌ 启动NFC检测失败: {e}")
else:
# 如果没有启用NFC播放打招呼并启动监听
print("🎭 播放角色打招呼...")
greeting_success = self.play_greeting()
if not greeting_success:
print("⚠️ 打招呼播放失败,继续运行...")
print("🎯 启动音频监听...")
success = self.start_monitoring()
if success:
print("✅ 监听已启动")
else:
print("⚠️ 监听启动失败")
else:
print("⚠️ 校准超时,继续运行...")
# 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听
# if auto_monitoring:
# # 自动启动监听
# print("🎯 自动启动音频监听...")
# success = self.start_monitoring()
# if success:
# print("✅ 监听已启动")
# else:
# print("⚠️ 监听启动失败")
# 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听
# if auto_monitoring:
# # 自动启动监听
# print("🎯 自动启动音频监听...")
# success = self.start_monitoring()
# if success:
# print("✅ 监听已启动")
# else:
# print("⚠️ 监听启动失败")
print("=" * 60)
print("🎙️ 系统就绪,开始检测语音...")
if self.nfc_enabled:
print("📱 NFC模式等待NFC卡片进行角色切换...")
else:
print("🎙️ 系统就绪,开始检测语音...")
print("=" * 60)
# 启动主控制循环
@ -410,6 +452,11 @@ class ControlSystem:
print("🎯 状态IDLE播放刚完成等待延迟启用录音")
self._just_finished_playing = False # 重置标志
else:
# 如果启用了NFC不自动启动音频监听等待NFC触发
if self.nfc_enabled:
print("🎯 状态IDLENFC模式已启用等待NFC卡片触发")
return
# 检查监听状态
monitoring_status = self.get_monitoring_status()
if monitoring_status and monitoring_status['enabled']:
@ -514,7 +561,7 @@ class ControlSystem:
def delayed_enable_recording():
import threading
import time
# 等待更长时间确保音频完全停止
time.sleep(2.0) # 增加到2秒
@ -608,6 +655,11 @@ class ControlSystem:
# 发送TTS完成信号
tts_complete_command = "TTS_COMPLETE:"
self.output_audio_queue.put(tts_complete_command)
# 关键修复处理失败时也要发送all_audio_received=True信号
# 这解决了语音识别失败但TTS完成信号已发送的死锁问题
all_audio_received_command = "ALL_AUDIO_RECEIVED:"
self.output_audio_queue.put(all_audio_received_command)
print(f"🔧 处理失败发送all_audio_received=True信号以避免死锁")
# 发送结束信号
self.output_audio_queue.put(None)
except Exception as e:
@ -957,11 +1009,28 @@ class ControlSystem:
"Authorization": f"Bearer {self.api_config['llm']['api_key']}"
}
# 构建消息列表
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": text}
{
"role": "system",
"content": system_prompt
}
]
# 如果启用聊天记忆,添加历史对话记录
if self.enable_chat_memory:
for history_item in self.chat_history:
messages.extend([
{"role": "user", "content": history_item["user"]},
{"role": "assistant", "content": history_item["assistant"]}
])
# 添加当前用户消息
messages.append({
"role": "user",
"content": text
})
data = {
"model": self.api_config['llm']['model'],
"messages": messages,
@ -980,7 +1049,14 @@ class ControlSystem:
result = response.json()
if 'choices' in result and len(result['choices']) > 0:
content = result['choices'][0]['message']['content']
return content.strip()
filtered_content = self._filter_parentheses_content(content.strip())
# 保存对话到历史记录
if self.enable_chat_memory:
self._add_to_chat_history(text, filtered_content)
print(f"💬 对话已保存到历史记录 (当前历史长度: {len(self.chat_history)})")
return filtered_content
print(f"❌ LLM API调用失败: {response.status_code}")
return None
@ -1019,13 +1095,23 @@ class ControlSystem:
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": text
}
]
# 如果启用聊天记忆,添加历史对话记录
if self.enable_chat_memory:
for history_item in self.chat_history:
messages.extend([
{"role": "user", "content": history_item["user"]},
{"role": "assistant", "content": history_item["assistant"]}
])
# 添加当前用户消息
messages.append({
"role": "user",
"content": text
})
data = {
"model": self.api_config['llm']['model'],
"messages": messages,
@ -1141,6 +1227,12 @@ class ControlSystem:
print(f"🎵 发送剩余句子: {filtered_sentence}")
self._send_streaming_text_to_output_process(filtered_sentence)
# 保存完整回复到历史记录
if accumulated_text and self.enable_chat_memory:
filtered_response = self._filter_parentheses_content(accumulated_text.strip())
self._add_to_chat_history(text, filtered_response)
print(f"💬 对话已保存到历史记录 (当前历史长度: {len(self.chat_history)})")
# 通知输出进程LLM生成已完成
self._notify_llm_complete()
@ -1183,6 +1275,17 @@ class ControlSystem:
print(f"❌ 发送文本失败: {e}")
return False
def _send_greeting_to_output_process(self, text: str, character_name: str) -> bool:
"""发送打招呼文本到输出进程(带缓存支持)"""
try:
# 发送特殊命令到输出进程,包含角色名称用于缓存
greeting_command = f"GREETING_TEXT:{text}:{character_name}"
self.output_audio_queue.put(greeting_command)
return True
except Exception as e:
print(f"❌ 发送打招呼文本失败: {e}")
return False
def _flush_output_process_tts_buffer(self):
"""通知输出进程刷新TTS缓冲区"""
try:
@ -1256,7 +1359,7 @@ class ControlSystem:
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
@ -1285,14 +1388,22 @@ class ControlSystem:
greeting_text = character_config["greeting"]
print(f"🎭 播放角色打招呼: {greeting_text}")
# 禁用录音功能,防止打招呼时录音
print("🛑 打招呼前禁用录音功能...")
self.input_command_queue.put(ControlCommand('disable_recording'))
# 设置状态为播放状态
self.state = RecordingState.PLAYING
# 发送打招呼文本到TTS
success = self._send_text_to_output_process(greeting_text)
# 发送打招呼文本到TTS带缓存支持
character_name = character_config.get("name", self.config['processing']['character'])
print(f"📡 准备发送打招呼文本到输出进程: {greeting_text} (角色: {character_name})")
success = self._send_greeting_to_output_process(greeting_text, character_name)
if not success:
print("❌ 打招呼TTS生成失败")
return False
else:
print(f"✅ 打招呼文本已成功发送到输出进程")
# 手动设置LLM完成状态因为打招呼没有LLM生成过程
self._notify_llm_complete()
@ -1529,6 +1640,205 @@ class ControlSystem:
print(f" 成功率: {success_rate:.1f}%")
print("👋 系统已关闭")
def enable_nfc(self):
"""启用NFC功能"""
if self.nfc_enabled:
print("⚠️ NFC功能已启用")
return
try:
# 获取NFC管理器单例
self.nfc_manager = get_nfc_manager()
self.nfc_manager.set_character_switch_callback(self._on_character_switch)
self.nfc_manager.start()
self.nfc_enabled = True
print("✅ NFC功能已启用")
except Exception as e:
print(f"❌ 启用NFC功能失败: {e}")
def disable_nfc(self):
"""禁用NFC功能"""
if not self.nfc_enabled:
return
try:
if self.nfc_manager:
self.nfc_manager.stop()
self.nfc_enabled = False
print("🛑 NFC功能已禁用")
except Exception as e:
print(f"❌ 禁用NFC功能失败: {e}")
def _on_character_switch(self, character_name: str):
"""角色切换回调函数"""
print(f"🎭 NFC触发角色切换: {character_name}")
# 检查角色配置是否存在
character_config = self._load_character_config(character_name)
if not character_config:
print(f"❌ 角色配置不存在: {character_name}")
return
# === 紧急停止所有音频活动 ===
print("🚨 NFC切换紧急停止所有音频活动...")
self._emergency_stop_all_audio()
# 更新角色配置
old_character = self.config['processing']['character']
self.config['processing']['character'] = character_name
# 更新TTS语音配置
if character_config and "voice" in character_config:
self.api_config['tts']['speaker'] = character_config["voice"]
print(f"🎵 更新TTS语音: {character_config['voice']}")
# 向输出进程发送更新TTS speaker的命令
try:
update_command = f"UPDATE_TTS_SPEAKER:{character_config['voice']}"
self.output_audio_queue.put(update_command)
print(f"📡 已发送TTS speaker更新命令到输出进程")
except Exception as e:
print(f"❌ 发送TTS speaker更新命令失败: {e}")
print(f"🔄 角色已切换: {old_character} -> {character_name}")
# 清空聊天历史
self.clear_chat_history()
print(f"🔄 角色切换,聊天历史已清空")
# 播放新角色打招呼
print("🎭 播放新角色打招呼...")
greeting_success = self.play_greeting()
if greeting_success:
print("✅ 角色切换完成")
else:
print("⚠️ 打招呼播放失败,继续运行...")
def _emergency_stop_all_audio(self):
"""紧急停止所有音频活动 - 用于NFC切换时立即停止"""
print("🚨 开始紧急停止所有音频活动...")
try:
# 1. 立即停止录音和监听
print("🛑 立即停止录音和监听...")
if hasattr(self, '_monitoring_active') and self._monitoring_active:
self.input_command_queue.put(ControlCommand('stop_monitoring'))
self._monitoring_active = False
if self.state == RecordingState.RECORDING:
self.input_command_queue.put(ControlCommand('stop_recording'))
# 强制禁用录音功能,防止立即重新开始
self.input_command_queue.put(ControlCommand('disable_recording'))
# 发送紧急停止命令到输入进程
self.input_command_queue.put(ControlCommand('emergency_stop'))
print("✅ 录音和监听已停止,紧急停止命令已发送")
# 2. 立即停止音频播放
print("🛑 立即停止音频播放...")
# 向输出进程发送紧急停止命令
try:
emergency_stop_command = "EMERGENCY_STOP:"
self.output_audio_queue.put(emergency_stop_command)
print("✅ 紧急停止命令已发送到输出进程")
except Exception as e:
print(f"❌ 发送紧急停止命令失败: {e}")
# 3. 清空所有音频队列
print("🛑 清空音频队列...")
try:
# 清空输入队列
while not self.input_command_queue.empty():
try:
self.input_command_queue.get_nowait()
except queue.Empty:
break
# 清空输出队列保留None结束信号
temp_queue = []
while not self.output_audio_queue.empty():
try:
item = self.output_audio_queue.get_nowait()
if item is None: # 保留结束信号
temp_queue.append(item)
except queue.Empty:
break
# 将保留的信号放回队列
for item in temp_queue:
self.output_audio_queue.put(item)
print("✅ 音频队列已清空")
except Exception as e:
print(f"❌ 清空队列时出错: {e}")
# 4. 重置所有状态
print("🛑 重置所有状态...")
self.state = RecordingState.IDLE
self.processing_complete = True
self.playback_complete = True
self.current_audio_data = None
self.current_audio_metadata = None
# 重置录音状态标志
if hasattr(self, '_monitoring_active'):
self._monitoring_active = False
if hasattr(self, '_just_finished_playing'):
self._just_finished_playing = True # 防止立即重新开始录音
print("✅ 所有状态已重置")
# 5. 等待一小段时间确保音频设备完全停止
print("⏱️ 等待音频设备完全停止...")
time.sleep(0.5)
print("🚨 紧急停止完成")
except Exception as e:
print(f"❌ 紧急停止过程中出错: {e}")
import traceback
traceback.print_exc()
def get_nfc_status(self):
"""获取NFC状态"""
if not self.nfc_enabled or not self.nfc_manager:
return {"enabled": False, "current_character": None}
return {
"enabled": True,
"current_character": self.nfc_manager.get_current_character(),
"running": self.nfc_manager.running
}
def _add_to_chat_history(self, user_message, assistant_response):
"""添加对话到历史记录"""
import time
# 如果历史记录超过最大长度,移除最早的记录
if len(self.chat_history) >= self.max_history_length:
self.chat_history.pop(0)
# 添加新的对话记录
self.chat_history.append({
"user": user_message,
"assistant": assistant_response,
"timestamp": time.time()
})
def clear_chat_history(self):
"""清空聊天历史"""
self.chat_history = []
print("💬 聊天历史已清空")
def get_chat_history_summary(self):
"""获取聊天历史摘要"""
if not self.chat_history:
return "暂无聊天历史"
return f"当前有 {len(self.chat_history)} 轮对话记录"
def main():
"""主函数"""
@ -1562,4 +1872,4 @@ def main():
control_system.start()
if __name__ == "__main__":
main()
main()

178
debug_cache.py Normal file
View File

@ -0,0 +1,178 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
缓存调试脚本
用于调试greeting缓存问题
"""
import os
import sys
import json
from pathlib import Path
from greeting_cache_manager import GreetingCacheManager
def debug_cache_issues():
"""调试缓存问题"""
print("🔍 调试greeting缓存问题")
print("=" * 50)
# 检查缓存目录
cache_dir = Path("greeting_cache")
if not cache_dir.exists():
print("❌ 缓存目录不存在")
return
print(f"📁 缓存目录: {cache_dir.absolute()}")
# 列出所有文件
print("\n📋 缓存文件列表:")
cache_files = list(cache_dir.glob("*.wav"))
for i, file in enumerate(cache_files):
print(f" {i+1}. {file.name}")
print(f" 大小: {file.stat().st_size} 字节")
print(f" 路径: {file}")
# 检查索引文件
index_file = cache_dir / "cache_index.json"
print(f"\n📋 索引文件: {index_file}")
if index_file.exists():
try:
with open(index_file, 'r', encoding='utf-8') as f:
cache_index = json.load(f)
print(f" 索引记录数: {len(cache_index)}")
for key, path in cache_index.items():
print(f" {key} -> {path}")
except Exception as e:
print(f" ❌ 读取索引失败: {e}")
else:
print(" ❌ 索引文件不存在")
# 创建缓存管理器实例
cache_manager = GreetingCacheManager()
# 检查缓存完整性
print(f"\n🔍 缓存验证:")
valid_count, invalid_count = cache_manager.validate_cache()
print(f" 有效缓存: {valid_count}")
print(f" 无效缓存: {invalid_count}")
# 模拟缓存检查
print(f"\n🧪 模拟缓存检查:")
# 假设的角色配置
test_characters = {
"libai": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!",
"zhubajie": "俺老猪来也!想吃点好的,睡个好觉,过神仙日子!"
}
for char_name, greeting in test_characters.items():
print(f"\n 测试角色: {char_name}")
# 检查缓存状态
is_cached = cache_manager.is_cached(char_name, greeting)
print(f" 缓存状态: {'✅ 已缓存' if is_cached else '❌ 未缓存'}")
# 获取缓存路径
cache_path = cache_manager.get_cache_path(char_name, greeting)
print(f" 生成路径: {cache_path}")
print(f" 文件存在: {cache_path.exists()}")
# 获取hash
greeting_hash = cache_manager._get_greeting_hash(greeting)
print(f" 文本hash: {greeting_hash}")
# 检查索引键
cache_key = f"{char_name}_{greeting_hash}"
print(f" 索引键: {cache_key}")
print(f" 索引存在: {cache_key in cache_manager.cache_index}")
if cache_key in cache_manager.cache_index:
indexed_path = cache_manager.cache_index[cache_key]
print(f" 索引路径: {indexed_path}")
print(f" 路径匹配: {str(cache_path) == indexed_path}")
def fix_cache_filenames():
"""修复缓存文件名中的冒号问题"""
print("\n🔧 修复缓存文件名问题")
print("=" * 50)
cache_dir = Path("greeting_cache")
if not cache_dir.exists():
print("❌ 缓存目录不存在")
return
# 查找有问题的文件名(包含冒号的)
problem_files = list(cache_dir.glob(":*.wav"))
if problem_files:
print(f"🔍 发现 {len(problem_files)} 个有问题的文件名:")
for file in problem_files:
print(f" {file.name}")
# 修复文件名
new_name = file.name[1:] # 移除开头的冒号
new_path = file.parent / new_name
try:
file.rename(new_path)
print(f" ✅ 重命名为: {new_name}")
except Exception as e:
print(f" ❌ 重命名失败: {e}")
else:
print("✅ 没有发现文件名问题")
def rebuild_cache_index():
"""重建缓存索引"""
print("\n🔄 重建缓存索引")
print("=" * 50)
cache_dir = Path("greeting_cache")
if not cache_dir.exists():
print("❌ 缓存目录不存在")
return
cache_manager = GreetingCacheManager()
# 清空当前索引
cache_manager.cache_index = {}
# 扫描所有wav文件
wav_files = list(cache_dir.glob("*.wav"))
print(f"🔍 扫描到 {len(wav_files)} 个wav文件")
for wav_file in wav_files:
# 尝试从文件名解析角色名和hash
name_parts = wav_file.stem.split('_')
if len(name_parts) >= 2:
character_name = name_parts[0]
greeting_hash = name_parts[-1] # 最后一个部分是hash
cache_key = f"{character_name}_{greeting_hash}"
cache_manager.cache_index[cache_key] = str(wav_file)
print(f" ✅ 添加到索引: {cache_key}")
else:
print(f" ❌ 无法解析文件名: {wav_file.name}")
# 保存索引
cache_manager._save_cache_index()
print(f"✅ 索引重建完成,共 {len(cache_manager.cache_index)} 条记录")
if __name__ == "__main__":
try:
# 调试缓存问题
debug_cache_issues()
# 修复文件名问题
fix_cache_filenames()
# 重建索引
rebuild_cache_index()
print("\n🎉 缓存调试和修复完成!")
except Exception as e:
print(f"\n❌ 调试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

78
deploy-service.sh Executable file
View File

@ -0,0 +1,78 @@
#!/bin/bash
# Local Voice 服务部署脚本
# 用于解决systemd环境下的音频设备访问问题
echo "🔧 Local Voice 服务部署脚本"
echo "=================================="
# 获取用户ID
USER_ID=$(id -u)
USERNAME=$(whoami)
echo "👤 当前用户: $USERNAME (UID: $USER_ID)"
# 备份原有服务文件
echo "💾 备份原有服务文件..."
if systemctl list-unit-files | grep -q "local-voice.service"; then
sudo systemctl stop local-voice.service 2>/dev/null
sudo systemctl disable local-voice.service 2>/dev/null
sudo cp /etc/systemd/system/local-voice.service /etc/systemd/system/local-voice.service.backup 2>/dev/null || true
echo "✅ 原有服务已备份"
fi
# 安装systemd服务
echo "📦 安装systemd服务..."
sudo cp local-voice.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable local-voice.service
echo "✅ 系统级服务已安装"
# 安装用户级服务
echo "🏠 安装用户级服务..."
mkdir -p ~/.config/systemd/user/
cp local-voice-user.service ~/.config/systemd/user/
systemctl --user daemon-reload
systemctl --user enable local-voice-user.service
echo "✅ 用户级服务已安装"
# 确保pipewire-pulse运行
echo "🎵 检查音频服务..."
if ! systemctl --user is-active --quiet pipewire-pulse; then
echo "🔧 启动pipewire-pulse服务..."
systemctl --user start pipewire-pulse
systemctl --user enable pipewire-pulse
echo "✅ pipewire-pulse已启动"
else
echo "✅ pipewire-pulse正在运行"
fi
# 启用用户服务 lingering (允许用户服务在登出后继续运行)
echo "🔄 启用用户服务lingering..."
sudo loginctl enable-linger $USERNAME
echo "✅ 用户服务lingering已启用"
# 测试建议
echo ""
echo "🎯 测试建议:"
echo "1. 先测试用户级服务: systemctl --user start local-voice-user.service"
echo "2. 查看用户服务日志: journalctl --user -u local-voice-user.service -f"
echo "3. 如果用户级服务正常,再考虑使用系统级服务"
echo ""
echo "🔍 故障排除:"
echo "- 如果用户级服务正常: 建议使用 systemctl --user start local-voice-user.service"
echo "- 如果系统级服务正常: 使用 sudo systemctl start local-voice.service"
echo ""
echo "📊 当前音频设备状态:"
python3 -c "
import pyaudio
p = pyaudio.PyAudio()
print('可用音频设备:')
for i in range(p.get_device_count()):
info = p.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print(f' 设备 {i}: {info[\"name\"]} (默认采样率: {info[\"defaultSampleRate\"]}Hz)')
p.terminate()
"
echo ""
echo "✅ 部署完成!"

149
fix_cache.py Normal file
View File

@ -0,0 +1,149 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修复现有缓存问题的脚本
用于处理文件名中的冒号和索引问题
"""
import os
import sys
import json
import shutil
from pathlib import Path
from greeting_cache_manager import GreetingCacheManager
def fix_existing_cache():
"""修复现有的缓存问题"""
print("🔧 修复现有缓存问题")
print("=" * 50)
cache_dir = Path("greeting_cache")
if not cache_dir.exists():
print("❌ 缓存目录不存在,无需修复")
return
# 1. 备份当前缓存目录
backup_dir = Path("greeting_cache_backup")
if backup_dir.exists():
shutil.rmtree(backup_dir)
shutil.copytree(cache_dir, backup_dir)
print(f"✅ 已备份当前缓存到: {backup_dir}")
# 2. 修复文件名中的冒号问题
print("\n🔍 修复文件名中的冒号问题...")
problem_files = list(cache_dir.glob(":*.wav"))
fixed_count = 0
for file in problem_files:
new_name = file.name[1:] # 移除开头的冒号
new_path = file.parent / new_name
try:
file.rename(new_path)
print(f" ✅ 重命名: {file.name} -> {new_name}")
fixed_count += 1
except Exception as e:
print(f" ❌ 重命名失败: {file.name} - {e}")
print(f"📊 修复了 {fixed_count} 个文件名")
# 3. 重建缓存索引
print("\n🔄 重建缓存索引...")
cache_manager = GreetingCacheManager()
# 清空当前索引
cache_manager.cache_index = {}
# 扫描所有wav文件
wav_files = list(cache_dir.glob("*.wav"))
print(f"🔍 扫描到 {len(wav_files)} 个wav文件")
for wav_file in wav_files:
# 尝试从文件名解析角色名和hash
name_parts = wav_file.stem.split('_')
if len(name_parts) >= 2:
character_name = name_parts[0]
greeting_hash = name_parts[-1] # 最后一个部分是hash
cache_key = f"{character_name}_{greeting_hash}"
cache_manager.cache_index[cache_key] = str(wav_file.resolve())
print(f" ✅ 添加到索引: {character_name} -> {wav_file.name}")
else:
print(f" ❌ 无法解析文件名: {wav_file.name}")
# 保存索引
cache_manager._save_cache_index()
print(f"✅ 索引重建完成,共 {len(cache_manager.cache_index)} 条记录")
# 4. 验证缓存完整性
print(f"\n🔍 验证缓存完整性...")
valid_count, invalid_count = cache_manager.validate_cache()
print(f" 有效缓存: {valid_count}")
print(f" 无效缓存: {invalid_count}")
# 5. 测试缓存访问
print(f"\n🧪 测试缓存访问...")
# 尝试从角色配置文件获取测试数据
characters_dir = Path("characters")
if characters_dir.exists():
char_files = list(characters_dir.glob("*.json"))
for char_file in char_files[:3]: # 测试前3个角色
try:
with open(char_file, 'r', encoding='utf-8') as f:
char_config = json.load(f)
character_name = char_file.stem
greeting_text = char_config.get("greeting", "")
if greeting_text:
is_cached = cache_manager.is_cached(character_name, greeting_text)
cached_path = cache_manager.get_cached_audio_path(character_name, greeting_text)
print(f" {character_name}: {'' if is_cached else ''} 缓存")
if cached_path:
print(f" 路径: {Path(cached_path).name}")
except Exception as e:
print(f" ❌ 测试角色 {char_file.name} 失败: {e}")
print(f"\n🎉 缓存修复完成!")
# 6. 提供使用建议
print(f"\n💡 使用建议:")
print(f" 1. 如果仍有问题,可以删除缓存目录重新生成")
print(f" 2. 确保greeting_cache目录有写入权限")
print(f" 3. 检查磁盘空间是否充足")
print(f" 4. 重启应用程序以应用修复")
def clean_cache_directory():
"""清理缓存目录"""
print("\n🧹 清理缓存目录")
print("=" * 50)
cache_dir = Path("greeting_cache")
if cache_dir.exists():
try:
shutil.rmtree(cache_dir)
print(f"✅ 已删除缓存目录: {cache_dir}")
except Exception as e:
print(f"❌ 删除缓存目录失败: {e}")
else:
print("✅ 缓存目录不存在,无需清理")
if __name__ == "__main__":
try:
if len(sys.argv) > 1 and sys.argv[1] == "clean":
clean_cache_directory()
else:
fix_existing_cache()
except Exception as e:
print(f"\n❌ 修复失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

16
local-voice-user.service Normal file
View File

@ -0,0 +1,16 @@
[Unit]
Description=Local Voice Multiprocess Recorder with NFC (User Service)
After=network.target sound.target pipewire-pulse.service
Wants=network.target sound.target pipewire-pulse.service
[Service]
Type=simple
WorkingDirectory=/home/zhuchaowe/Local-Voice
ExecStart=/usr/bin/python3 /home/zhuchaowe/Local-Voice/multiprocess_recorder.py --enable-nfc
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=default.target

24
local-voice.service Normal file
View File

@ -0,0 +1,24 @@
[Unit]
Description=Local Voice Multiprocess Recorder with NFC
After=network.target sound.target
Wants=network.target sound.target
[Service]
Type=simple
User=zhuchaowe
Group=audio
WorkingDirectory=/home/zhuchaowe/Local-Voice
Environment="PULSE_SERVER=unix:/run/user/%U/pulse/native"
Environment="XDG_RUNTIME_DIR=/run/user/%U"
Environment="DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/%U/bus"
ExecStart=/usr/bin/python3 /home/zhuchaowe/Local-Voice/multiprocess_recorder.py --enable-nfc
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
# 添加设备访问权限
DeviceAllow=/dev/snd/*
DevicePolicy=auto
[Install]
WantedBy=multi-user.target

View File

@ -1,3 +0,0 @@
2025-09-21 18:27:29 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 18:27:29 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_182729.log
2025-09-21 18:27:29 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动

View File

@ -1,3 +0,0 @@
2025-09-21 18:32:04 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 18:32:04 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_183204.log
2025-09-21 18:32:04 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动

View File

@ -1,3 +0,0 @@
2025-09-21 20:08:51 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 20:08:51 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_200851.log
2025-09-21 20:08:51 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动

View File

@ -1,4 +0,0 @@
2025-09-21 20:09:16 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 20:09:16 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_200916.log
2025-09-21 20:09:16 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动
2025-09-21 20:09:16 - InputProcess_logger - INFO - [InputProcess] 输入进程启动

View File

@ -1,9 +0,0 @@
2025-09-21 20:09:16 - OutputProcess_logger - INFO - 日志系统初始化完成 - 进程: OutputProcess
2025-09-21 20:09:16 - OutputProcess_logger - INFO - 日志文件: logs/OutputProcess_20250921_200916.log
2025-09-21 20:09:16 - OutputProcess_logger - INFO - [OutputProcess] 播放工作线程已启动
2025-09-21 20:09:16 - OutputProcess_logger - INFO - [OutputProcess] TTS工作线程已启动
2025-09-21 20:09:16 - OutputProcess_logger - INFO - [OutputProcess] 输出进程启动
2025-09-21 20:09:17 - OutputProcess_logger - INFO - [OutputProcess] 音频设备初始化成功
2025-09-21 20:09:54 - OutputProcess_logger - ERROR - [OutputProcess] 处理音频队列时出错:
2025-09-21 20:09:54 - OutputProcess_logger - ERROR - [OutputProcess] 输出进程错误:
2025-09-21 20:09:57 - OutputProcess_logger - INFO - [OutputProcess] 输出进程退出

View File

@ -11,6 +11,7 @@ import sys
import argparse
import json
import time
import subprocess
from typing import Dict, Any
def check_dependencies():
@ -47,6 +48,23 @@ def check_dependencies():
return True
def check_nfc_dependencies():
"""检查NFC相关依赖"""
try:
# 检查nfc-list命令是否存在
result = subprocess.run(['which', 'nfc-list'], capture_output=True, text=True)
if result.returncode != 0:
print("❌ 未找到nfc-list命令")
print(" 请安装libnfc工具:")
print(" Ubuntu/Debian: sudo apt-get install libnfc-bin")
print(" macOS: brew install libnfc")
return False
print("✅ NFC工具已安装")
return True
except Exception as e:
print(f"❌ NFC依赖检查失败: {e}")
return False
def check_environment():
"""检查运行环境"""
print("🔍 检查运行环境...")
@ -190,6 +208,7 @@ def main():
python multiprocess_recorder.py -c zhubajie # 指定角色
python multiprocess_recorder.py -l # 列出角色
python multiprocess_recorder.py --create-config # 创建配置文件
python multiprocess_recorder.py --enable-nfc # 启用NFC角色切换
"""
)
@ -205,6 +224,8 @@ def main():
help='检查运行环境')
parser.add_argument('--verbose', '-v', action='store_true',
help='详细输出')
parser.add_argument('--enable-nfc', action='store_true',
help='启用NFC角色切换功能')
args = parser.parse_args()
@ -288,9 +309,20 @@ def main():
if args.verbose:
control_system.config['system']['log_level'] = "DEBUG"
# 启动系统(自动校准和监听)
print("🚀 启动系统(包含自动校准和监听)...")
control_system.start(auto_calibration=True, auto_monitoring=True)
# 启用NFC功能如果指定- 在系统启动前启用确保校准完成后能立即开始NFC检测
nfc_enabled = False
if args.enable_nfc:
print("📱 检查NFC依赖...")
if not check_nfc_dependencies():
print("❌ NFC依赖检查失败无法启用NFC功能")
else:
print("📱 启用NFC角色切换功能...")
control_system.enable_nfc()
nfc_enabled = True
# 启动系统自动校准但根据NFC状态决定是否启动监听
print("🚀 启动系统(包含自动校准)...")
control_system.start(auto_calibration=True, auto_monitoring=not nfc_enabled)
except KeyboardInterrupt:
print("\n👋 用户中断")

176
nfc_manager.py Normal file
View File

@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NFC管理模块
实现NFC读取和角色切换功能
"""
import json
import os
import re
import subprocess
import threading
import time
from pathlib import Path
from typing import Callable, Dict, Optional
class NFCManager:
"""NFC管理器负责NFC读取和角色切换"""
def __init__(self, characters_dir: str = "characters"):
self.characters_dir = Path(characters_dir)
self.uid_to_character = {}
self.running = False
self.read_thread = None
self.current_uid = None
self.character_switch_callback: Optional[Callable] = None
# 加载角色NFC映射
self._load_character_mappings()
def _load_character_mappings(self):
"""加载角色NFC映射"""
self.uid_to_character = {}
if not self.characters_dir.exists():
print(f"⚠️ 角色目录不存在: {self.characters_dir}")
return
for config_file in self.characters_dir.glob("*.json"):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
if 'nfc_uid' in config:
uid = config['nfc_uid'].upper().strip()
character_name = config_file.stem
self.uid_to_character[uid] = character_name
print(f"✅ 加载角色映射: {character_name} -> {uid}")
except Exception as e:
print(f"❌ 加载角色配置失败 {config_file}: {e}")
if not self.uid_to_character:
print("⚠️ 未找到任何带NFC UID的角色配置")
def _read_uid(self) -> Optional[str]:
"""读取NFC UID"""
os.environ['LIBNFC_DRIVER'] = 'pn532_i2c'
os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1'
try:
result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout)
if match:
return match.group(1).strip().replace(' ', '').upper()
except subprocess.TimeoutExpired:
print("⚠️ NFC读取超时")
except Exception as e:
print(f"❌ NFC读取失败: {e}")
return None
def _read_loop(self):
"""NFC读取循环"""
print("🔄 启动NFC读取循环...")
while self.running:
try:
uid = self._read_uid()
if uid and uid != self.current_uid:
print(f"🎯 检测到NFC卡片: {uid}")
if uid in self.uid_to_character:
character_name = self.uid_to_character[uid]
print(f"🎭 切换角色: {character_name}")
# 更新当前UID
self.current_uid = uid
# 调用角色切换回调
if self.character_switch_callback:
self.character_switch_callback(character_name)
else:
print(f"⚠️ 未找到UID对应角色: {uid}")
except Exception as e:
print(f"❌ NFC读取循环出错: {e}")
# 等待5秒
time.sleep(1)
def set_character_switch_callback(self, callback: Callable):
"""设置角色切换回调函数"""
self.character_switch_callback = callback
def start(self):
"""启动NFC管理器"""
if self.running:
print("⚠️ NFC管理器已在运行")
return
self.running = True
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
print("✅ NFC管理器已启动")
def stop(self):
"""停止NFC管理器"""
if not self.running:
return
self.running = False
if self.read_thread:
self.read_thread.join(timeout=5)
print("🛑 NFC管理器已停止")
def get_current_character(self) -> Optional[str]:
"""获取当前角色"""
if self.current_uid and self.current_uid in self.uid_to_character:
return self.uid_to_character[self.current_uid]
return None
def reload_mappings(self):
"""重新加载角色映射"""
print("🔄 重新加载角色NFC映射...")
self._load_character_mappings()
def __del__(self):
"""析构函数"""
self.stop()
# 单例模式
_nfc_manager_instance = None
def get_nfc_manager(characters_dir: str = "characters") -> NFCManager:
"""获取NFC管理器单例"""
global _nfc_manager_instance
if _nfc_manager_instance is None:
_nfc_manager_instance = NFCManager(characters_dir)
return _nfc_manager_instance
if __name__ == "__main__":
# 测试NFC管理器
def on_character_switch(character_name):
print(f"🎭 角色切换回调: {character_name}")
nfc_manager = get_nfc_manager()
nfc_manager.set_character_switch_callback(on_character_switch)
try:
nfc_manager.start()
print("🔄 NFC管理器运行中按Ctrl+C退出...")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n👋 用户中断")
finally:
nfc_manager.stop()

23
simple_nfc_reader.py Normal file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
import subprocess
import os
import re
import sys
def read_uid():
os.environ['LIBNFC_DRIVER'] = 'pn532_i2c'
os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1'
try:
result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout)
if match:
return match.group(1).strip().replace(' ', '').upper()
except:
pass
return None
if __name__ == "__main__":
uid = read_uid()
print(uid if uid else "000000")

77
test-audio.py Executable file
View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""
音频设备测试脚本
用于测试不同环境下的音频设备访问
"""
import pyaudio
import sys
import os
def test_audio_access():
print("🔍 音频设备访问测试")
print("=" * 50)
try:
p = pyaudio.PyAudio()
print("✅ PyAudio 初始化成功")
# 显示所有音频设备
print("\n📋 可用音频设备:")
for i in range(p.get_device_count()):
info = p.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print(f" 设备 {i}: {info['name']}")
print(f" 输入通道: {info['maxInputChannels']}")
print(f" 默认采样率: {info['defaultSampleRate']}")
# 测试16000Hz采样率
print(f"\n🎵 测试16000Hz采样率访问:")
# 测试默认设备
try:
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024
)
print("✅ 默认设备支持16000Hz")
stream.close()
except Exception as e:
print(f"❌ 默认设备不支持16000Hz: {e}")
# 测试pulse设备
try:
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024,
input_device_index=6 # pulse设备
)
print("✅ Pulse设备支持16000Hz")
stream.close()
except Exception as e:
print(f"❌ Pulse设备不支持16000Hz: {e}")
p.terminate()
print("\n✅ 音频测试完成")
except Exception as e:
print(f"❌ 音频测试失败: {e}")
sys.exit(1)
def check_environment():
print("\n🌍 环境信息:")
print(f"用户: {os.getenv('USER', 'Unknown')}")
print(f"用户ID: {os.getuid()}")
print(f"PULSE_SERVER: {os.getenv('PULSE_SERVER', 'Not set')}")
print(f"XDG_RUNTIME_DIR: {os.getenv('XDG_RUNTIME_DIR', 'Not set')}")
print(f"DBUS_SESSION_BUS_ADDRESS: {os.getenv('DBUS_SESSION_BUS_ADDRESS', 'Not set')}")
if __name__ == "__main__":
check_environment()
test_audio_access()

View File

@ -1,194 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试新的自动校准和监听启动流程
"""
import sys
import os
import time
import signal
from control_system import ControlSystem
def signal_handler(sig, frame):
"""信号处理函数"""
print('\n👋 收到中断信号,正在关闭系统...')
if 'control_system' in globals():
control_system.shutdown()
sys.exit(0)
def main():
"""主函数"""
print("🧪 测试自动校准和监听启动流程")
print("=" * 60)
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
try:
# 1. 创建控制系统
print("📦 创建控制系统...")
config = {
'system': {
'max_queue_size': 1000,
'process_timeout': 30,
'heartbeat_interval': 1.0,
'log_level': "INFO"
},
'audio': {
'sample_rate': 16000,
'channels': 1,
'chunk_size': 1024
},
'recording': {
'min_duration': 2.0,
'max_duration': 30.0,
'silence_threshold': 3.0
},
'processing': {
'enable_asr': True,
'enable_llm': True,
'enable_tts': True,
'character': 'libai'
}
}
global control_system
control_system = ControlSystem(config)
# 设置角色
control_system.config['processing']['character'] = 'libai'
print("✅ 控制系统创建完成")
# 2. 测试手动启动流程
print("\n🎯 测试步骤1手动启动流程")
print("-" * 40)
# 2.1 启动进程但不自动校准和监听
print("🔧 启动进程...")
control_system._start_processes()
time.sleep(2) # 等待进程启动
# 2.2 检查初始状态
print("\n🔍 检查初始状态...")
monitoring_status = control_system.get_monitoring_status()
calibration_status = control_system.get_calibration_status()
print(f" 监听状态: {monitoring_status}")
print(f" 校准状态: {calibration_status}")
# 2.3 手动启动校准
print("\n🎯 手动启动校准...")
success = control_system.start_calibration()
print(f" 校准启动结果: {success}")
if success:
print("\n⏱️ 等待校准完成...")
if control_system.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
else:
print("⚠️ 校准超时")
# 2.4 手动启动监听
print("\n🎯 手动启动监听...")
success = control_system.start_monitoring()
print(f" 监听启动结果: {success}")
time.sleep(1) # 等待监听启动
# 2.5 验证监听状态
monitoring_status = control_system.get_monitoring_status()
print(f" 监听状态: {monitoring_status}")
# 3. 运行一段时间测试
print("\n🎙️ 系统运行测试")
print("-" * 40)
print("💡 请说话测试语音检测功能")
print("⏱️ 运行10秒...")
start_time = time.time()
while time.time() - start_time < 10:
try:
control_system.check_events()
time.sleep(0.1)
except KeyboardInterrupt:
break
# 4. 停止监听
print("\n🛑 停止监听...")
success = control_system.stop_monitoring()
print(f" 停止结果: {success}")
time.sleep(1)
# 5. 关闭系统
print("\n🔄 关闭当前系统...")
control_system.shutdown()
time.sleep(2)
# 6. 测试自动启动流程
print("\n🎯 测试步骤2自动启动流程")
print("-" * 40)
# 创建新的控制系统实例
control_system = ControlSystem(config)
control_system.config['processing']['character'] = 'libai'
print("\n🚀 启动系统(自动校准和监听)...")
# 这里我们只启动进程,不进入主循环
control_system._start_processes()
# 手动执行自动校准和监听流程
print("\n🎯 自动启动校准...")
success = control_system.start_calibration()
if success and control_system.wait_for_calibration_complete(timeout=30):
print("✅ 自动校准完成")
print("\n🎯 自动启动监听...")
success = control_system.start_monitoring()
if success:
print("✅ 自动监听启动完成")
time.sleep(2)
# 检查最终状态
monitoring_status = control_system.get_monitoring_status()
calibration_status = control_system.get_calibration_status()
print(f"\n📊 最终状态:")
print(f" 监听状态: {monitoring_status}")
print(f" 校准状态: {calibration_status}")
# 7. 运行一段时间
print("\n🎙️ 最终运行测试")
print("-" * 40)
print("💡 请说话测试最终功能")
print("⏱️ 运行10秒...")
start_time = time.time()
while time.time() - start_time < 10:
try:
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
break
print("\n🎉 测试完成!")
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
finally:
# 确保系统关闭
if 'control_system' in globals():
print("\n🛑 确保系统关闭...")
control_system.shutdown()
print("✅ 测试结束")
if __name__ == "__main__":
main()

View File

@ -1,177 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
首次播放逻辑专项测试
验证第一句TTS触发机制的优化效果
"""
import sys
def test_first_playback_logic():
"""测试首次播放逻辑"""
print("🎵 首次播放逻辑专项测试")
print("=" * 50)
# 测试场景1: 短句子积累
print("📝 测试场景1: 短句子积累")
print(" 模拟LLM生成的短句子流:")
short_sentences = [
"你好",
"",
"这个",
"问题",
"确实",
"需要",
"仔细",
"思考",
]
buffer = []
total_length = 0
first_trigger = None
for i, sentence in enumerate(short_sentences):
buffer.append(sentence)
total_length = sum(len(s) for s in buffer)
# 模拟首次播放逻辑
trigger_reason = None
if total_length >= 40 and len(buffer) >= 2:
trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子"
elif len(buffer) >= 5:
trigger_reason = f"缓冲区达到最大值{len(buffer)}"
if trigger_reason and first_trigger is None:
first_trigger = i + 1
print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(buffer)}'")
break
else:
print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)")
if first_trigger is None:
print(" ⚠️ 未触发TTS需要超时机制")
print()
# 测试场景2: 中等长度句子
print("📝 测试场景2: 中等长度句子")
medium_sentences = [
"你好,很高兴见到你。",
"今天天气真不错呢。",
"我们可以一起去公园玩吗?",
"我想那会是一个很好的主意。",
]
buffer = []
total_length = 0
first_trigger = None
for i, sentence in enumerate(medium_sentences):
buffer.append(sentence)
total_length = sum(len(s) for s in buffer)
# 模拟首次播放逻辑
trigger_reason = None
if total_length >= 40 and len(buffer) >= 2:
trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子"
elif len(sentence) >= 25 and sentence.endswith(('', '', '', '.', '!', '?')) and len(buffer) >= 2:
trigger_reason = f"长句子{len(sentence)}字符+缓冲内容"
if trigger_reason and first_trigger is None:
first_trigger = i + 1
print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(buffer)}'")
break
else:
print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)")
if first_trigger is None:
print(" ⚠️ 未触发TTS需要超时机制")
print()
# 测试场景3: 长句子
print("📝 测试场景3: 长句子")
long_sentences = [
"你好,",
"我认为这个问题需要我们从多个角度来分析。",
"首先,让我们仔细了解一下具体情况。",
]
buffer = []
total_length = 0
first_trigger = None
for i, sentence in enumerate(long_sentences):
buffer.append(sentence)
total_length = sum(len(s) for s in buffer)
# 模拟首次播放逻辑
trigger_reason = None
if total_length >= 40 and len(buffer) >= 2:
trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子"
elif len(sentence) >= 25 and sentence.endswith(('', '', '', '.', '!', '?')) and len(buffer) >= 2:
trigger_reason = f"长句子{len(sentence)}字符+缓冲内容"
if trigger_reason and first_trigger is None:
first_trigger = i + 1
print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(buffer)}'")
break
else:
print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)")
if first_trigger is None:
print(" ⚠️ 未触发TTS需要超时机制")
print()
# 测试场景4: 超长单句
print("📝 测试场景4: 超长单句")
ultra_long_sentence = "根据我的分析,这个问题的解决方案需要综合考虑多个因素,包括时间成本、资源投入以及最终的实施效果。"
buffer = ["你好"]
buffer.append(ultra_long_sentence)
total_length = sum(len(s) for s in buffer)
if total_length >= 40 and len(buffer) >= 2:
print(f" 🎵 第2句触发TTS: 总长度{total_length}字符,{len(buffer)}个句子")
print(f" 📝 发送内容: '{''.join(buffer)[:50]}...'")
else:
print(" ⚠️ 未触发TTS")
print()
def show_optimization_comparison():
"""显示优化对比"""
print("📈 首次播放逻辑优化对比")
print("=" * 50)
comparison = {
"优化前": {
"触发条件": "任何完整句子或长句子",
"最小长度": "无明确要求",
"积攒机制": "基本没有",
"可能导致": "播放卡顿,等待数据",
},
"优化后": {
"触发条件": "40+字符且2+句子 或 25+字符完整句+缓冲",
"最小长度": "总长度40字符或单句25字符",
"积攒机制": "智能积累多个句子",
"超时保护": "5秒超时机制",
"效果": "确保有足够数据才开始播放"
}
}
for aspect, details in comparison.items():
print(f"\n🔧 {aspect}:")
for key, value in details.items():
print(f"{key}: {value}")
print(f"\n🎯 核心改进: 确保首次播放有足够的内容,避免因为数据不足导致的播放卡顿")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--comparison":
show_optimization_comparison()
else:
test_first_playback_logic()
show_optimization_comparison()

View File

@ -1,173 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
音频播放优化测试脚本
用于验证缓冲区优化和播放性能改进
"""
import time
import json
import os
import sys
def test_optimization():
"""测试优化效果"""
print("🧪 音频播放优化测试")
print("=" * 50)
# 显示优化后的配置参数
print("📋 优化后的配置参数:")
print(" - 预加载缓冲区: 6个音频块原:3个")
print(" - 智能句子缓冲: 最多5个句子原:3个")
print(" - 最小触发句子: 2个句子原:1个")
print(" - 积累时间窗口: 150ms原:200ms")
print(" - TTS任务队列: 20个任务原:10个")
print(" - 音频块大小: 1024字节原:2048字节")
print(" - 播放冷却期: 0.05秒(原:0.1秒)")
print(" - 长句子触发阈值: 30字符原:50字符")
print(" - 播放缓冲区维护: 4个块原:3个")
print()
# 测试智能句子缓冲逻辑
print("🧠 测试智能句子缓冲逻辑:")
print(" 🔄 首次播放逻辑测试:")
test_sentences = [
"你好", # 短句子
"今天天气怎么样?", # 中等长度
"我觉得这个方案很不错,我们可以试试看。", # 长句子
"这是一个超过三十个字符的句子应该会立即触发TTS生成。", # 超过30字符
"短句。", # 带标点的短句
]
# 模拟首次播放的缓冲区状态
tts_buffer = []
first_playback_started = False
total_buffered_text = ""
trigger_count = 0
for i, sentence in enumerate(test_sentences):
tts_buffer.append(sentence)
total_buffered_text = ''.join(tts_buffer)
# 首次播放逻辑
if not first_playback_started:
should_trigger = False
trigger_reason = ""
# 条件1: 总文本长度超过40字符且至少有2个句子
if len(total_buffered_text) >= 40 and len(tts_buffer) >= 2:
should_trigger = True
trigger_reason = f"总长度{len(total_buffered_text)}字符,{len(tts_buffer)}个句子"
# 条件2: 有1个完整长句子超过25字符
elif len(sentence) >= 25 and sentence.endswith(('', '', '', '.', '!', '?')) and len(tts_buffer) >= 2:
should_trigger = True
trigger_reason = f"长句子{len(sentence)}字符+缓冲内容"
# 条件3: 缓冲区达到最大值(5个)
elif len(tts_buffer) >= 5:
should_trigger = True
trigger_reason = f"缓冲区达到最大值{len(tts_buffer)}"
# 条件4: 超过500ms模拟
if should_trigger:
trigger_count += 1
first_playback_started = True
print(f" 🎵 首次触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{total_buffered_text[:50]}...'")
tts_buffer = []
else:
print(f" ⏳ 首次缓冲: '{sentence}' (累计: {len(total_buffered_text)}字符, {len(tts_buffer)}个句子)")
else:
# 正常播放逻辑
if len(sentence) > 30 or len(tts_buffer) >= 3:
should_trigger = True
trigger_reason = "长句子" if len(sentence) > 30 else "缓冲区满"
if should_trigger:
trigger_count += 1
print(f" ✅ 正常触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(tts_buffer)[:30]}...'")
tts_buffer = []
else:
print(f" ⏳ 正常缓冲: '{sentence}'")
print(f" 📊 总触发次数: {trigger_count}")
print()
print(" 📋 首次播放优化效果:")
print(" • 确保首句有足够长度40+字符或25+字符完整句)")
print(" • 积累多个句子避免播放卡顿")
print(" • 5秒超时机制防止无限等待")
print(" • 后续句子正常流式处理")
print()
# 显示性能监控信息
print("📊 性能监控功能:")
print(" - 实时缓冲区大小统计")
print(" - 平均和最大缓冲区大小")
print(" - 播放块数和音频大小统计")
print(" - 每5秒自动输出性能报告")
print()
print("🎯 预期改进效果:")
print(" 1. ✅ 减少音频播放卡顿(更大的缓冲区)")
print(" 2. ✅ 更快的TTS响应优化的触发条件")
print(" 3. ✅ 更流畅的播放体验(减少冷却期)")
print(" 4. ✅ 更好的资源利用(更小的音频块)")
print(" 5. ✅ 实时性能监控(调试和优化)")
print()
print("📝 测试建议:")
print(" 1. 运行主程序观察播放流畅度")
print(" 2. 查看性能统计输出")
print(" 3. 监控缓冲区大小变化")
print(" 4. 测试不同长度的语音响应")
print()
print("🚀 测试完成!可以运行主程序验证优化效果。")
def show_optimization_summary():
"""显示优化总结"""
print("📈 音频播放优化总结")
print("=" * 50)
summary = {
"缓冲区优化": [
"预加载缓冲区: 3→6个块",
"智能句子缓冲: 3→5个句子",
"最小触发缓冲: 1→2个句子",
"TTS任务队列: 10→20个任务"
],
"响应性优化": [
"积累时间窗口: 200ms→150ms",
"长句子触发: 50→30字符",
"中等长度触发: 30→20字符",
"播放冷却期: 0.1s→0.05s"
],
"播放优化": [
"音频块大小: 2048→1024字节",
"播放缓冲维护: 3→4个块",
"数据转移: 2→3个块/次"
],
"监控功能": [
"实时性能统计",
"缓冲区大小监控",
"自动性能报告",
"播放进度追踪"
]
}
for category, improvements in summary.items():
print(f"\n🔧 {category}:")
for improvement in improvements:
print(f"{improvement}")
print(f"\n🎯 总体目标: 减少音频播放卡顿,提升用户体验")
print(f"📊 预期效果: 更流畅的实时语音交互")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--summary":
show_optimization_summary()
else:
test_optimization()