Compare commits

...

5 Commits

Author SHA1 Message Date
朱潮
9871b445f0 回声待处理 2025-09-21 12:09:03 +08:00
朱潮
3f70062c2b 回声待处理 2025-09-21 11:40:54 +08:00
朱潮
e1d634af1f 回声待处理 2025-09-21 10:48:51 +08:00
朱潮
9523176d60 回声待处理 2025-09-21 03:00:11 +08:00
朱潮
aed69e9c54 回声待处理 2025-09-20 23:29:47 +08:00
22 changed files with 4843 additions and 1133 deletions

View File

@ -0,0 +1,127 @@
# Audio Processes 改进总结
## 问题背景
- 原始问题TTS音频只播放3个字符就停止出现ALSA underrun错误
- 根本原因:音频缓冲区管理不当,播放策略过于保守
## 改进内容
### 1. 音频播放优化 (_play_audio 方法)
- **改进前**:保守的播放策略,需要缓冲区有足够数据才开始播放
- **改进后**
- 借鉴 recorder.py 的播放策略:只要有数据就播放
- 添加错误恢复机制,自动检测和恢复 ALSA underrun
- 优化缓冲区管理,减少延迟
### 2. TTS 工作线程模式
- **参考**: recorder.py 的 TTS 工作线程实现
- **实现功能**
- 独立的 TTS 工作线程处理音频生成
- 任务队列管理,避免阻塞主线程
- 统一的 TTS 请求接口 `process_tts_request()`
- 支持流式音频处理
### 3. 统一的音频播放队列
- **InputProcess 和 OutputProcess 都支持**
- TTS 工作线程
- 音频生成和播放队列
- 统一的错误处理和日志记录
### 4. 关键改进点
#### 音频播放策略
```python
# 改进前:保守策略
if len(self.playback_buffer) > 2: # 需要缓冲区有足够数据
# 开始播放
# 改进后:积极策略 + 错误恢复
audio_chunk = self.playback_buffer.pop(0)
if audio_chunk and len(audio_chunk) > 0:
try:
self.output_stream.write(audio_chunk)
# 统计信息
except Exception as e:
# ALSA underrun 错误恢复
if "underrun" in str(e).lower():
# 自动恢复音频流
```
#### TTS 工作线程
```python
def _tts_worker(self):
"""TTS工作线程 - 处理TTS任务队列"""
while self.tts_worker_running:
try:
task = self.tts_task_queue.get(timeout=1.0)
if task is None:
break
task_type, content = task
if task_type == "tts_sentence":
self._generate_tts_audio(content)
self.tts_task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"TTS工作线程错误: {e}")
```
#### 错误恢复机制
```python
# ALSA underrun 检测和恢复
if "underrun" in str(e).lower() or "alsa" in str(e).lower():
self.logger.info("检测到ALSA underrun尝试恢复音频流")
try:
if self.output_stream:
self.output_stream.stop_stream()
time.sleep(0.1)
self.output_stream.start_stream()
self.logger.info("音频流已恢复")
except Exception as recovery_e:
self.logger.error(f"恢复音频流失败: {recovery_e}")
self.playback_buffer.clear()
```
### 5. 性能优化
- 减少日志输出频率,提高性能
- 优化队列处理策略,使用适当的超时设置
- 动态调整休眠时间根据播放状态优化CPU使用
### 6. 测试和验证
- 创建了测试脚本 `test_audio_processes.py`
- 验证了语法正确性
- 可以测试 TTS 功能的完整性
## 使用方法
### 在控制系统中使用
```python
from audio_processes import InputProcess, OutputProcess
# 创建输入和输出进程
input_process = InputProcess(command_queue, event_queue)
output_process = OutputProcess(audio_queue)
# 处理TTS请求
output_process.process_tts_request("你好,这是测试语音")
```
### 独立测试
```bash
python test_audio_processes.py
```
## 预期效果
- 解决 ALSA underrun 错误
- 提高音频播放的流畅性
- 减少 TTS 处理的延迟
- 提供更稳定的音频处理能力
## 注意事项
1. 确保系统安装了必要的依赖:`requests`, `pyaudio`
2. 检查音频设备是否正常工作
3. 网络连接正常用于TTS服务
4. 适当调整音频参数以适应不同环境

190
README_multiprocess.md Normal file
View File

@ -0,0 +1,190 @@
# 多进程音频录音系统
基于进程隔离的音频处理架构,实现零延迟的录音和播放切换。
## 🚀 系统特点
### 核心优势
- **多进程架构**: 输入输出进程完全隔离,无需设备重置
- **零切换延迟**: 彻底解决传统单进程的音频切换问题
- **实时响应**: 并行处理录音和播放,真正的实时体验
- **智能检测**: 基于ZCR(零交叉率)的精确语音识别
- **流式TTS**: 实时音频生成和播放,减少等待时间
- **角色扮演**: 支持多种AI角色和音色
### 技术架构
```
主控制进程 ──┐
├─ 输入进程 (录音 + 语音检测)
├─ 输出进程 (音频播放)
└─ 在线AI服务 (STT + LLM + TTS)
```
## 📦 文件结构
```
Local-Voice/
├── recorder.py # 原始实现 (保留作为参考)
├── multiprocess_recorder.py # 主程序
├── audio_processes.py # 音频进程模块
├── control_system.py # 控制系统模块
├── config.json # 配置文件
└── characters/ # 角色配置目录
├── libai.json # 李白角色
└── zhubajie.json # 猪八戒角色
```
## 🛠️ 安装和运行
### 1. 环境要求
- Python 3.7+
- 音频输入设备 (麦克风)
- 网络连接 (用于在线AI服务)
### 2. 安装依赖
```bash
pip install pyaudio numpy requests websockets
```
### 3. 设置API密钥
```bash
export ARK_API_KEY='your_api_key_here'
```
### 4. 基本运行
```bash
# 使用默认角色 (李白)
python multiprocess_recorder.py
# 指定角色
python multiprocess_recorder.py -c zhubajie
# 列出可用角色
python multiprocess_recorder.py -l
# 使用配置文件
python multiprocess_recorder.py --config config.json
# 创建示例配置文件
python multiprocess_recorder.py --create-config
```
## ⚙️ 配置说明
### 主要配置项
| 配置项 | 说明 | 默认值 |
|--------|------|--------|
| `recording.min_duration` | 最小录音时长(秒) | 2.0 |
| `recording.max_duration` | 最大录音时长(秒) | 30.0 |
| `recording.silence_threshold` | 静音检测阈值(秒) | 3.0 |
| `detection.zcr_min` | ZCR最小值 | 2400 |
| `detection.zcr_max` | ZCR最大值 | 12000 |
| `processing.max_tokens` | LLM最大token数 | 50 |
### 音频参数
- 采样率: 16kHz
- 声道数: 1 (单声道)
- 位深度: 16位
- 格式: PCM
## 🎭 角色系统
### 支持的角色
- **libai**: 李白 - 文雅诗人风格
- **zhubajie**: <20>豬八戒 - 幽默风趣风格
### 自定义角色
`characters/` 目录创建JSON文件:
```json
{
"name": "角色名称",
"description": "角色描述",
"system_prompt": "系统提示词",
"voice": "zh_female_wanqudashu_moon_bigtts",
"max_tokens": 50
}
```
## 🔧 故障排除
### 常见问题
1. **音频设备问题**
```bash
# 检查音频设备
python multiprocess_recorder.py --check-env
```
2. **依赖缺失**
```bash
# 重新安装依赖
pip install --upgrade pyaudio numpy requests websockets
```
3. **网络连接问题**
- 检查网络连接
- 确认API密钥正确
- 检查防火墙设置
4. **权限问题**
```bash
# Linux系统可能需要音频权限
sudo usermod -a -G audio $USER
```
### 调试模式
```bash
# 启用详细输出
python multiprocess_recorder.py -v
```
## 📊 性能对比
| 指标 | 原始单进程 | 多进程架构 | 改善 |
|------|-----------|------------|------|
| 切换延迟 | 1-2秒 | 0秒 | 100% |
| CPU利用率 | 单核 | 多核 | 提升 |
| 响应速度 | 较慢 | 实时 | 显著改善 |
| 稳定性 | 一般 | 优秀 | 大幅提升 |
## 🔄 与原版本对比
### 原版本 (recorder.py)
- 单进程处理
- 需要频繁重置音频设备
- 录音和播放不能同时进行
- 切换延迟明显
### 新版本 (multiprocess_recorder.py)
- 多进程架构
- 输入输出完全隔离
- 零切换延迟
- 真正的并行处理
- 更好的稳定性和扩展性
## 📝 开发说明
### 架构设计
- **输入进程**: 专注录音和语音检测
- **输出进程**: 专注音频播放
- **主控制进程**: 协调整个系统和AI处理
### 进程间通信
- 使用 `multiprocessing.Queue` 进行安全通信
- 支持命令控制和事件通知
- 线程安全的音频数据传输
### 状态管理
- 清晰的状态机设计
- 完善的错误处理机制
- 优雅的进程退出流程
## 📄 许可证
本项目仅供学习和研究使用。
## 🤝 贡献
欢迎提交Issue和Pull Request来改进这个项目。

287
asr_diagnostic.py Normal file
View File

@ -0,0 +1,287 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
语音识别诊断工具
用于测试和诊断语音识别功能的具体问题
"""
import asyncio
import json
import gzip
import uuid
import numpy as np
import wave
import os
from typing import Optional
class ASRDiagnostic:
"""ASR诊断工具"""
def __init__(self):
self.api_config = {
'asr': {
'appid': "8718217928",
'cluster': "volcano_tts",
'token': "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc",
'ws_url': "wss://openspeech.bytedance.com/api/v2/asr"
}
}
def generate_asr_header(self, message_type=1, message_type_specific_flags=0):
"""生成ASR头部"""
PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001
JSON = 0b0001
GZIP = 0b0001
header = bytearray()
header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE)
header.append((message_type << 4) | message_type_specific_flags)
header.append((JSON << 4) | GZIP)
header.append(0x00) # reserved
return header
def parse_asr_response(self, res):
"""解析ASR响应"""
print(f"🔍 解析响应,原始大小: {len(res)} 字节")
if len(res) < 8:
print(f"❌ 响应太短,无法解析")
return {}
try:
message_type = res[1] >> 4
payload_size = int.from_bytes(res[4:8], "big", signed=False)
payload_msg = res[8:8+payload_size]
print(f"📋 消息类型: {message_type}, 载荷大小: {payload_size}")
if message_type == 0b1001: # SERVER_FULL_RESPONSE
try:
if payload_msg.startswith(b'{'):
result = json.loads(payload_msg.decode('utf-8'))
print(f"✅ 成功解析JSON响应")
return result
else:
print(f"❌ 响应不是JSON格式")
except Exception as e:
print(f"❌ JSON解析失败: {e}")
except Exception as e:
print(f"❌ 响应解析异常: {e}")
return {}
async def test_asr_with_audio_file(self, audio_file_path: str):
"""使用音频文件测试ASR"""
print(f"🎵 测试ASR - 音频文件: {audio_file_path}")
if not os.path.exists(audio_file_path):
print(f"❌ 音频文件不存在: {audio_file_path}")
return
try:
# 读取音频文件
with wave.open(audio_file_path, 'rb') as wf:
channels = wf.getnchannels()
width = wf.getsampwidth()
rate = wf.getframerate()
frames = wf.readframes(wf.getnframes())
print(f"📊 音频信息: 采样率={rate}Hz, 声道={channels}, 位深={width*8}bits")
print(f"📊 音频大小: {len(frames)} 字节")
# 如果是立体声,转换为单声道
if channels > 1:
audio_array = np.frombuffer(frames, dtype=np.int16)
audio_array = audio_array.reshape(-1, channels)
audio_array = np.mean(audio_array, axis=1).astype(np.int16)
frames = audio_array.tobytes()
print(f"🔄 已转换为单声道")
return await self._test_asr_connection(frames)
except Exception as e:
print(f"❌ 音频文件处理失败: {e}")
return None
async def test_asr_with_silence(self):
"""测试静音音频"""
print(f"🔇 测试ASR - 静音音频")
# 生成3秒的静音音频 (16kHz, 16bit, 单声道)
duration = 3 # 秒
sample_rate = 16000
silence_data = bytes(duration * sample_rate * 2) # 2 bytes per sample
return await self._test_asr_connection(silence_data)
async def test_asr_with_noise(self):
"""测试噪音音频"""
print(f"📢 测试ASR - 噪音音频")
# 生成3秒的随机噪音
duration = 3 # 秒
sample_rate = 16000
noise_data = np.random.randint(-32768, 32767, duration * sample_rate, dtype=np.int16)
noise_data = noise_data.tobytes()
return await self._test_asr_connection(noise_data)
async def _test_asr_connection(self, audio_data: bytes):
"""测试ASR连接"""
try:
import websockets
# 构建请求参数
reqid = str(uuid.uuid4())
request_params = {
'app': {
'appid': self.api_config['asr']['appid'],
'cluster': self.api_config['asr']['cluster'],
'token': self.api_config['asr']['token'],
},
'user': {
'uid': 'asr_diagnostic'
},
'request': {
'reqid': reqid,
'nbest': 1,
'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate',
'show_language': False,
'show_utterances': False,
'result_type': 'full',
"sequence": 1
},
'audio': {
'format': 'wav',
'rate': 16000,
'language': 'zh-CN',
'bits': 16,
'channel': 1,
'codec': 'raw'
}
}
print(f"📋 ASR请求参数:")
print(f" - AppID: {request_params['app']['appid']}")
print(f" - Cluster: {request_params['app']['cluster']}")
print(f" - Token: {request_params['app']['token'][:20]}...")
print(f" - RequestID: {reqid}")
# 构建请求
payload_bytes = str.encode(json.dumps(request_params))
payload_bytes = gzip.compress(payload_bytes)
full_client_request = bytearray(self.generate_asr_header())
full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
full_client_request.extend(payload_bytes)
# 设置认证头
additional_headers = {'Authorization': 'Bearer; {}'.format(self.api_config['asr']['token'])}
print(f"📡 连接WebSocket...")
# 连接WebSocket
async with websockets.connect(
self.api_config['asr']['ws_url'],
additional_headers=additional_headers,
max_size=1000000000
) as ws:
print(f"✅ WebSocket连接成功")
# 发送请求
print(f"📤 发送ASR配置请求...")
await ws.send(full_client_request)
res = await ws.recv()
result = self.parse_asr_response(res)
print(f"📥 配置响应: {result}")
# 发送音频数据
chunk_size = int(1 * 2 * 16000 * 15000 / 1000) # 1秒 chunks
total_chunks = 0
for offset in range(0, len(audio_data), chunk_size):
chunk = audio_data[offset:offset + chunk_size]
last = (offset + chunk_size) >= len(audio_data)
payload_bytes = gzip.compress(chunk)
audio_only_request = bytearray(
self.generate_asr_header(
message_type=0b0010,
message_type_specific_flags=0b0010 if last else 0
)
)
audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))
audio_only_request.extend(payload_bytes)
await ws.send(audio_only_request)
res = await ws.recv()
result = self.parse_asr_response(res)
total_chunks += 1
if last:
print(f"📨 发送最后一块音频数据 (总计{total_chunks}块)")
# 获取最终结果
print(f"🎯 等待最终识别结果...")
if 'payload_msg' in result and 'result' in result['payload_msg']:
results = result['payload_msg']['result']
print(f"📝 ASR返回结果数量: {len(results)}")
if results:
text = results[0].get('text', '识别失败')
print(f"✅ 识别结果: {text}")
return text
else:
print(f"❌ ASR结果为空")
else:
print(f"❌ ASR响应格式异常: {result.keys()}")
print(f"📋 完整响应: {result}")
return None
except Exception as e:
print(f"❌ ASR连接异常: {e}")
import traceback
print(f"❌ 详细错误:\n{traceback.format_exc()}")
return None
async def run_diagnostic(self):
"""运行完整诊断"""
print("🔧 ASR诊断工具")
print("=" * 50)
# 1. 测试静音
print("\n1⃣ 测试静音识别...")
await self.test_asr_with_silence()
# 2. 测试噪音
print("\n2⃣ 测试噪音识别...")
await self.test_asr_with_noise()
# 3. 测试录音文件(如果存在)
recording_files = [f for f in os.listdir('.') if f.startswith('recording_') and f.endswith('.wav')]
if recording_files:
print(f"\n3⃣ 测试录音文件...")
for file in recording_files[:3]: # 最多测试3个文件
await self.test_asr_with_audio_file(file)
else:
print(f"\n3⃣ 跳过录音文件测试 (无录音文件)")
print(f"\n✅ 诊断完成")
def main():
"""主函数"""
diagnostic = ASRDiagnostic()
try:
asyncio.run(diagnostic.run_diagnostic())
except KeyboardInterrupt:
print(f"\n🛑 诊断被用户中断")
except Exception as e:
print(f"❌ 诊断工具异常: {e}")
if __name__ == "__main__":
main()

1701
audio_processes.py Normal file

File diff suppressed because it is too large Load Diff

39
config.json Normal file
View File

@ -0,0 +1,39 @@
{
"system": {
"max_queue_size": 1000,
"process_timeout": 30,
"heartbeat_interval": 1.0,
"log_level": "INFO"
},
"audio": {
"sample_rate": 16000,
"channels": 1,
"chunk_size": 1024,
"format": "paInt16"
},
"recording": {
"min_duration": 3.0,
"max_duration": 30.0,
"silence_threshold": 3.0,
"pre_record_duration": 2.0
},
"processing": {
"enable_asr": true,
"enable_llm": true,
"enable_tts": true,
"character": "libai",
"max_tokens": 50
},
"detection": {
"zcr_min": 2400,
"zcr_max": 12000,
"consecutive_silence_count": 30,
"max_zcr_history": 50
},
"playback": {
"buffer_size": 1000,
"show_progress": true,
"progress_interval": 100,
"chunk_size": 512
}
}

1362
control_system.py Normal file

File diff suppressed because it is too large Load Diff

377
enhanced_voice_detector.py Normal file
View File

@ -0,0 +1,377 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
高级语音检测器
结合能量+ZCR双重检测的自适应语音检测算法
针对16000Hz采样率优化
"""
import numpy as np
import time
from collections import deque
from typing import Dict, Any, Optional
import pyaudio
class EnhancedVoiceDetector:
"""增强版语音检测器"""
def __init__(self, sample_rate=16000, chunk_size=1024):
self.sample_rate = sample_rate
self.chunk_size = chunk_size
# 历史数据窗口
self.energy_window = deque(maxlen=100)
self.zcr_window = deque(maxlen=100)
# 统计信息
self.energy_stats = {
'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0,
'median': 0, 'q75': 0, 'q25': 0
}
self.zcr_stats = {
'mean': 0, 'std': 0, 'min': float('inf'), 'max': 0,
'median': 0, 'q75': 0, 'q25': 0
}
# 检测参数
self.calibration_mode = True
self.calibration_samples = 0
self.required_calibration = 100 # 需要100个样本来校准
# 自适应参数 - 调整为更敏感
self.energy_multiplier = 1.0 # 能量阈值倍数(降低)
self.zcr_std_multiplier = 1.0 # ZCR标准差倍数降低
self.min_energy_threshold = 80 # 最小能量阈值(降低)
self.consecutive_voice_threshold = 2 # 连续语音检测阈值(降低)
self.consecutive_silence_threshold = 15 # 连续静音检测阈值(增加)
# 状态跟踪
self.consecutive_voice_count = 0
self.consecutive_silence_count = 0
self.last_voice_time = 0
# 调试信息
self.debug_mode = True
self.voice_count = 0
self.total_samples = 0
self._last_voice_state = False
def calculate_energy(self, audio_data: bytes) -> float:
"""计算音频能量RMS"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# RMS能量计算
rms = np.sqrt(np.mean(audio_array.astype(float) ** 2))
return rms
def calculate_zcr(self, audio_data: bytes) -> float:
"""计算零交叉率"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
zcr = zero_crossings / len(audio_array) * self.sample_rate
return zcr
def update_statistics(self, energy: float, zcr: float):
"""更新统计信息"""
self.energy_window.append(energy)
self.zcr_window.append(zcr)
if len(self.energy_window) >= 20:
# 计算详细统计信息
energy_array = np.array(self.energy_window)
zcr_array = np.array(self.zcr_window)
# 基础统计
self.energy_stats['mean'] = np.mean(energy_array)
self.energy_stats['std'] = np.std(energy_array)
self.energy_stats['min'] = np.min(energy_array)
self.energy_stats['max'] = np.max(energy_array)
self.energy_stats['median'] = np.median(energy_array)
self.energy_stats['q25'] = np.percentile(energy_array, 25)
self.energy_stats['q75'] = np.percentile(energy_array, 75)
self.zcr_stats['mean'] = np.mean(zcr_array)
self.zcr_stats['std'] = np.std(zcr_array)
self.zcr_stats['min'] = np.min(zcr_array)
self.zcr_stats['max'] = np.max(zcr_array)
self.zcr_stats['median'] = np.median(zcr_array)
self.zcr_stats['q25'] = np.percentile(zcr_array, 25)
self.zcr_stats['q75'] = np.percentile(zcr_array, 75)
def get_adaptive_thresholds(self) -> Dict[str, float]:
"""获取自适应阈值"""
if len(self.energy_window) < 30:
# 使用更敏感的固定阈值
return {
'energy_threshold': 120,
'zcr_min': 2000,
'zcr_max': 13000
}
# 计算动态能量阈值 - 使用更合理的算法
# 基于中位数和标准差,但使用更保守的倍数
base_energy_threshold = (self.energy_stats['median'] +
self.energy_multiplier * self.energy_stats['std'])
# 使用四分位数来避免异常值影响
q75 = self.energy_stats['q75']
q25 = self.energy_stats['q25']
iqr = q75 - q25 # 四分位距
# 基于IQR的鲁棒阈值 - 更敏感
iqr_threshold = q75 + 0.5 * iqr
# 结合两种方法的阈值 - 使用更低的阈值
energy_threshold = max(self.min_energy_threshold,
min(base_energy_threshold * 0.7, iqr_threshold))
# 计算动态ZCR阈值
zcr_center = self.zcr_stats['median']
zcr_spread = self.zcr_std_multiplier * self.zcr_stats['std']
# 确保ZCR范围在合理区间内 - 更宽松
zcr_min = max(1500, min(3000, zcr_center - zcr_spread))
zcr_max = min(14000, max(6000, zcr_center + zcr_spread * 2.0))
# 确保最小范围
if zcr_max - zcr_min < 2000:
zcr_max = zcr_min + 2000
return {
'energy_threshold': energy_threshold,
'zcr_min': zcr_min,
'zcr_max': zcr_max
}
def is_voice_basic(self, energy: float, zcr: float) -> bool:
"""基础语音检测(单帧)"""
thresholds = self.get_adaptive_thresholds()
# 能量检测
energy_ok = energy > thresholds['energy_threshold']
# ZCR检测
zcr_ok = thresholds['zcr_min'] < zcr < thresholds['zcr_max']
# 双重条件
return energy_ok and zcr_ok
def is_voice_advanced(self, audio_data: bytes) -> Dict[str, Any]:
"""高级语音检测(带状态跟踪)"""
# 计算特征
energy = self.calculate_energy(audio_data)
zcr = self.calculate_zcr(audio_data)
# 更新统计
self.update_statistics(energy, zcr)
# 总样本计数
self.total_samples += 1
# 校准模式
if self.calibration_mode:
self.calibration_samples += 1
if self.calibration_samples >= self.required_calibration:
self.calibration_mode = False
if self.debug_mode:
print(f"\n🎯 校准完成!")
print(f" 能量统计: {self.energy_stats['median']:.0f}±{self.energy_stats['std']:.0f}")
print(f" ZCR统计: {self.zcr_stats['median']:.0f}±{self.zcr_stats['std']:.0f}")
return {
'is_voice': False,
'energy': energy,
'zcr': zcr,
'calibrating': True,
'calibration_progress': self.calibration_samples / self.required_calibration,
'confidence': 0.0
}
# 基础检测
is_voice_frame = self.is_voice_basic(energy, zcr)
# 状态机处理
if is_voice_frame:
self.consecutive_voice_count += 1
self.consecutive_silence_count = 0
self.last_voice_time = time.time()
else:
self.consecutive_silence_count += 1
if self.consecutive_silence_count >= self.consecutive_silence_threshold:
self.consecutive_voice_count = 0
# 最终决策(需要连续检测到语音)
final_voice_detected = self.consecutive_voice_count >= self.consecutive_voice_threshold
if final_voice_detected and not hasattr(self, '_last_voice_state') or not self._last_voice_state:
self.voice_count += 1
# 更新最后状态
self._last_voice_state = final_voice_detected
# 计算置信度
thresholds = self.get_adaptive_thresholds()
energy_confidence = min(1.0, energy / thresholds['energy_threshold'])
zcr_confidence = 1.0 if thresholds['zcr_min'] < zcr < thresholds['zcr_max'] else 0.0
confidence = (energy_confidence + zcr_confidence) / 2
return {
'is_voice': final_voice_detected,
'energy': energy,
'zcr': zcr,
'confidence': confidence,
'energy_threshold': thresholds['energy_threshold'],
'zcr_min': thresholds['zcr_min'],
'zcr_max': thresholds['zcr_max'],
'consecutive_voice_count': self.consecutive_voice_count,
'consecutive_silence_count': self.consecutive_silence_count,
'calibrating': False,
'voice_detection_rate': self.voice_count / self.total_samples if self.total_samples > 0 else 0
}
def get_debug_info(self) -> str:
"""获取调试信息"""
if self.calibration_mode:
return f"校准中: {self.calibration_samples}/{self.required_calibration}"
thresholds = self.get_adaptive_thresholds()
return (f"能量阈值: {thresholds['energy_threshold']:.0f} | "
f"ZCR范围: {thresholds['zcr_min']:.0f}-{thresholds['zcr_max']:.0f} | "
f"检测率: {self.voice_count}/{self.total_samples} ({self.voice_count/self.total_samples*100:.1f}%)")
def reset(self):
"""重置检测器状态"""
self.energy_window.clear()
self.zcr_window.clear()
self.calibration_mode = True
self.calibration_samples = 0
self.consecutive_voice_count = 0
self.consecutive_silence_count = 0
self.voice_count = 0
self.total_samples = 0
class VoiceDetectorTester:
"""语音检测器测试器"""
def __init__(self):
self.detector = EnhancedVoiceDetector()
def run_test(self, duration=30):
"""运行测试"""
print("🎙️ 增强版语音检测器测试")
print("=" * 50)
print("📊 检测算法: 能量+ZCR双重检测")
print("📈 采样率: 16000Hz")
print("🔄 自适应阈值: 启用")
print("⏱️ 测试时长: 30秒")
print("💡 请说话测试检测效果...")
print("🛑 按 Ctrl+C 提前结束")
print("=" * 50)
try:
# 初始化音频
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024
)
start_time = time.time()
voice_segments = []
current_segment = None
while time.time() - start_time < duration:
# 读取音频数据
data = stream.read(1024, exception_on_overflow=False)
# 检测语音
result = self.detector.is_voice_advanced(data)
# 处理语音段
if result['is_voice']:
if current_segment is None:
current_segment = {
'start_time': time.time(),
'start_sample': self.detector.total_samples
}
else:
if current_segment is not None:
current_segment['end_time'] = time.time()
current_segment['end_sample'] = self.detector.total_samples
current_segment['duration'] = current_segment['end_time'] - current_segment['start_time']
voice_segments.append(current_segment)
current_segment = None
# 显示状态
if result['calibrating']:
progress = result['calibration_progress'] * 100
status = f"\r🔧 校准中: {progress:.0f}% | 能量: {result['energy']:.0f} | ZCR: {result['zcr']:.0f}"
else:
status_icon = "🎤" if result['is_voice'] else "🔇"
status_color = "\033[92m" if result['is_voice'] else "\033[90m"
reset_color = "\033[0m"
status = (f"{status_color}{status_icon} "
f"能量: {result['energy']:.0f}/{result['energy_threshold']:.0f} | "
f"ZCR: {result['zcr']:.0f} ({result['zcr_min']:.0f}-{result['zcr_max']:.0f}) | "
f"置信度: {result['confidence']:.2f} | "
f"连续: {result['consecutive_voice_count']}/{result['consecutive_silence_count']}{reset_color}")
print(f"\r{status}", end='', flush=True)
time.sleep(0.01)
# 结束当前段
if current_segment is not None:
current_segment['end_time'] = time.time()
current_segment['duration'] = current_segment['end_time'] - current_segment['start_time']
voice_segments.append(current_segment)
# 显示统计结果
print(f"\n\n📊 测试结果统计:")
print(f" 总检测时长: {duration}")
print(f" 检测到语音段: {len(voice_segments)}")
print(f" 总语音时长: {sum(s['duration'] for s in voice_segments):.1f}")
print(f" 语音占比: {sum(s['duration'] for s in voice_segments)/duration*100:.1f}%")
print(f" 平均置信度: {np.mean([r['confidence'] for r in [self.detector.is_voice_advanced(b'test') for _ in range(10)]]):.2f}")
if voice_segments:
print(f" 平均语音段时长: {np.mean([s['duration'] for s in voice_segments]):.1f}")
print(f" 最长语音段: {max(s['duration'] for s in voice_segments):.1f}")
print(f"\n🎯 检测器状态:")
print(f" {self.detector.get_debug_info()}")
except KeyboardInterrupt:
print(f"\n\n🛑 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
finally:
try:
if 'stream' in locals():
stream.stop_stream()
stream.close()
if 'audio' in locals():
audio.terminate()
except:
pass
def main():
"""主函数"""
tester = VoiceDetectorTester()
tester.run_test()
if __name__ == "__main__":
main()

View File

@ -1,74 +0,0 @@
#!/bin/bash
# 智能语音助手系统安装脚本
# 适用于树莓派和Linux系统
echo "🚀 智能语音助手系统 - 安装脚本"
echo "================================"
# 检查是否为root用户
if [ "$EUID" -eq 0 ]; then
echo "⚠️ 请不要以root身份运行此脚本"
echo " 建议使用普通用户: sudo ./install.sh"
exit 1
fi
# 更新包管理器
echo "📦 更新包管理器..."
sudo apt-get update
# 安装系统依赖
echo "🔧 安装系统依赖..."
sudo apt-get install -y \
python3 \
python3-pip \
portaudio19-dev \
python3-dev \
alsa-utils
# 安装Python依赖
echo "🐍 安装Python依赖..."
pip3 install --user \
websockets \
requests \
pyaudio \
numpy
# 检查音频播放器
echo "🔊 检查音频播放器..."
if command -v aplay >/dev/null 2>&1; then
echo "✅ aplay 已安装支持PCM/WAV播放"
else
echo "❌ aplay 安装失败"
fi
# 检查Python模块
echo "🧪 检查Python模块..."
python3 -c "import websockets, requests, pyaudio, numpy" 2>/dev/null
if [ $? -eq 0 ]; then
echo "✅ 所有Python依赖已安装"
else
echo "❌ 部分Python依赖安装失败"
fi
echo ""
echo "✅ 安装完成!"
echo ""
echo "📋 使用说明:"
echo "1. 设置API密钥如需使用大语言模型:"
echo " export ARK_API_KEY='your_api_key_here'"
echo ""
echo "2. 运行程序:"
echo " python3 recorder.py"
echo ""
echo "3. 故障排除:"
echo " - 如果遇到权限问题请确保用户在audio组中:"
echo " sudo usermod -a -G audio \$USER"
echo " - 然后重新登录或重启系统"
echo ""
echo "🎯 系统功能:"
echo "- 🎙️ 智能语音录制"
echo "- 🤖 在线语音识别"
echo "- 💬 AI智能对话"
echo "- 🔊 语音回复合成"
echo "- 📁 自动文件管理"

View File

@ -0,0 +1,6 @@
2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_115617.log
2025-09-21 11:56:17 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动
2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志系统初始化完成 - 进程: InputProcess
2025-09-21 11:56:17 - InputProcess_logger - INFO - 日志文件: logs/InputProcess_20250921_115617.log
2025-09-21 11:56:17 - InputProcess_logger - INFO - [InputProcess] TTS工作线程已启动

View File

@ -0,0 +1,8 @@
2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志系统初始化完成 - 进程: OutputProcess
2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志文件: logs/OutputProcess_20250921_115614.log
2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] 播放工作线程已启动
2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] TTS工作线程已启动
2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志系统初始化完成 - 进程: OutputProcess
2025-09-21 11:56:14 - OutputProcess_logger - INFO - 日志文件: logs/OutputProcess_20250921_115614.log
2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] 播放工作线程已启动
2025-09-21 11:56:14 - OutputProcess_logger - INFO - [OutputProcess] TTS工作线程已启动

305
multiprocess_recorder.py Normal file
View File

@ -0,0 +1,305 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程音频录音系统
基于进程隔离的音频处理架构
"""
import os
import sys
import argparse
import json
import time
from typing import Dict, Any
def check_dependencies():
"""检查系统依赖"""
missing_deps = []
try:
import pyaudio
except ImportError:
missing_deps.append("pyaudio")
try:
import numpy
except ImportError:
missing_deps.append("numpy")
try:
import requests
except ImportError:
missing_deps.append("requests")
try:
import websockets
except ImportError:
missing_deps.append("websockets")
if missing_deps:
print("❌ 缺少以下依赖库:")
for dep in missing_deps:
print(f" - {dep}")
print("\n请运行以下命令安装:")
print(f"pip install {' '.join(missing_deps)}")
return False
return True
def check_environment():
"""检查运行环境"""
print("🔍 检查运行环境...")
# 检查Python版本
python_version = sys.version_info
if python_version.major < 3 or (python_version.major == 3 and python_version.minor < 7):
print(f"❌ Python版本过低: {python_version.major}.{python_version.minor}")
print("需要Python 3.7或更高版本")
return False
print(f"✅ Python版本: {python_version.major}.{python_version.minor}.{python_version.micro}")
# 检查操作系统
import platform
system = platform.system().lower()
print(f"✅ 操作系统: {system}")
# 检查音频设备
try:
import pyaudio
audio = pyaudio.PyAudio()
device_count = audio.get_device_count()
print(f"✅ 音频设备数量: {device_count}")
if device_count == 0:
print("❌ 未检测到音频设备")
return False
audio.terminate()
except Exception as e:
print(f"❌ 音频设备检查失败: {e}")
return False
# 检查网络连接
try:
import requests
response = requests.get("https://www.baidu.com", timeout=5)
print("✅ 网络连接正常")
except:
print("⚠️ 网络连接可能有问题会影响在线AI功能")
# 检查API密钥
api_key = os.environ.get("ARK_API_KEY")
if api_key:
print("✅ ARK_API_KEY 已设置")
else:
print("⚠️ ARK_API_KEY 未设置,大语言模型功能将被禁用")
print(" 请运行: export ARK_API_KEY='your_api_key_here'")
return True
def list_characters():
"""列出可用角色"""
characters_dir = os.path.join(os.path.dirname(__file__), "characters")
if not os.path.exists(characters_dir):
print("❌ 角色目录不存在")
return
characters = []
for file in os.listdir(characters_dir):
if file.endswith('.json'):
character_name = file[:-5]
config_file = os.path.join(characters_dir, file)
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
name = config.get('name', character_name)
desc = config.get('description', '无描述')
characters.append(f"{character_name}: {name} - {desc}")
except:
characters.append(f"{character_name}: 配置文件读取失败")
if characters:
print("🎭 可用角色列表:")
for char in characters:
print(f" - {char}")
else:
print("❌ 未找到任何角色配置文件")
def create_sample_config():
"""创建示例配置文件"""
config = {
"system": {
"max_queue_size": 1000,
"process_timeout": 30,
"heartbeat_interval": 1.0,
"log_level": "INFO"
},
"audio": {
"sample_rate": 16000,
"channels": 1,
"chunk_size": 1024,
"format": "paInt16"
},
"recording": {
"min_duration": 2.0,
"max_duration": 30.0,
"silence_threshold": 3.0,
"pre_record_duration": 2.0
},
"processing": {
"enable_asr": True,
"enable_llm": True,
"enable_tts": True,
"character": "libai",
"max_tokens": 50
},
"detection": {
"zcr_min": 2400,
"zcr_max": 12000,
"consecutive_silence_count": 30,
"max_zcr_history": 30
},
"playback": {
"buffer_size": 1000,
"show_progress": True,
"progress_interval": 100,
"chunk_size": 512
}
}
config_file = "config.json"
try:
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"✅ 示例配置文件已创建: {config_file}")
except Exception as e:
print(f"❌ 创建配置文件失败: {e}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='多进程音频录音系统',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
python multiprocess_recorder.py # 使用默认角色
python multiprocess_recorder.py -c zhubajie # 指定角色
python multiprocess_recorder.py -l # 列出角色
python multiprocess_recorder.py --create-config # 创建配置文件
"""
)
parser.add_argument('--character', '-c', type=str, default='libai',
help='选择角色 (默认: libai)')
parser.add_argument('--list-characters', '-l', action='store_true',
help='列出所有可用角色')
parser.add_argument('--config', type=str,
help='配置文件路径')
parser.add_argument('--create-config', action='store_true',
help='创建示例配置文件')
parser.add_argument('--check-env', action='store_true',
help='检查运行环境')
parser.add_argument('--verbose', '-v', action='store_true',
help='详细输出')
args = parser.parse_args()
# 显示欢迎信息
print("🚀 多进程音频录音系统")
print("=" * 60)
# 检查依赖
if not check_dependencies():
sys.exit(1)
# 创建配置文件
if args.create_config:
create_sample_config()
return
# 检查环境
if args.check_env:
check_environment()
return
# 列出角色
if args.list_characters:
list_characters()
return
# 检查characters目录
characters_dir = os.path.join(os.path.dirname(__file__), "characters")
if not os.path.exists(characters_dir):
print(f"⚠️ 角色目录不存在: {characters_dir}")
print("请确保characters目录存在并包含角色配置文件")
# 检查指定角色
character_file = os.path.join(characters_dir, f"{args.character}.json")
if not os.path.exists(character_file):
print(f"⚠️ 角色文件不存在: {character_file}")
print(f"可用角色:")
list_characters()
return
print(f"🎭 当前角色: {args.character}")
print("🎯 系统特点:")
print(" - 多进程架构:输入输出完全隔离")
print(" - 零切换延迟:无需音频设备重置")
print(" - 实时响应:并行处理录音和播放")
print(" - 智能检测基于ZCR的语音识别")
print(" - 流式TTS实时音频生成和播放")
print(" - 角色扮演支持多种AI角色")
print("=" * 60)
# 显示使用说明
print("📖 使用说明:")
print(" - 检测到语音自动开始录音")
print(" - 持续静音3秒自动结束录音")
print(" - 录音完成后自动处理和播放")
print(" - 按 Ctrl+C 退出")
print("=" * 60)
# 加载配置
config = None
if args.config:
try:
with open(args.config, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"📋 加载配置文件: {args.config}")
except Exception as e:
print(f"⚠️ 配置文件加载失败: {e}")
print("使用默认配置")
try:
# 导入控制系统
from control_system import ControlSystem
# 创建控制系统
control_system = ControlSystem(config)
# 设置角色
control_system.config['processing']['character'] = args.character
# 设置日志级别
if args.verbose:
control_system.config['system']['log_level'] = "DEBUG"
# 启动系统
control_system.start()
except KeyboardInterrupt:
print("\n👋 用户中断")
except Exception as e:
print(f"❌ 系统启动失败: {e}")
if args.verbose:
import traceback
traceback.print_exc()
finally:
print("👋 系统退出")
if __name__ == "__main__":
main()

98
process_logger.py Normal file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
日志配置模块
为多进程录音系统提供日志记录功能
"""
import logging
import os
import sys
from datetime import datetime
from typing import Optional
def setup_process_logger(process_name: str, log_dir: str = "logs") -> logging.Logger:
"""
为进程设置日志记录器
Args:
process_name: 进程名称用于日志文件名
log_dir: 日志目录路径
Returns:
配置好的日志记录器
"""
# 创建日志目录
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成日志文件名(包含时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f"{process_name}_{timestamp}.log")
# 创建日志记录器
logger = logging.getLogger(f"{process_name}_logger")
logger.setLevel(logging.DEBUG)
# 清除现有的处理器
logger.handlers.clear()
# 文件处理器(记录所有级别)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 控制台处理器只记录INFO及以上级别
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# 创建格式化器
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# 设置格式化器
file_handler.setFormatter(file_formatter)
console_handler.setFormatter(console_formatter)
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.info(f"日志系统初始化完成 - 进程: {process_name}")
logger.info(f"日志文件: {log_file}")
return logger
class ProcessLogger:
"""进程日志包装器"""
def __init__(self, process_name: str, log_dir: str = "logs"):
self.process_name = process_name
self.logger = setup_process_logger(process_name, log_dir)
def debug(self, message: str):
"""调试日志"""
self.logger.debug(f"[{self.process_name}] {message}")
def info(self, message: str):
"""信息日志"""
self.logger.info(f"[{self.process_name}] {message}")
def warning(self, message: str):
"""警告日志"""
self.logger.warning(f"[{self.process_name}] {message}")
def error(self, message: str):
"""错误日志"""
self.logger.error(f"[{self.process_name}] {message}")
def critical(self, message: str):
"""严重错误日志"""
self.logger.critical(f"[{self.process_name}] {message}")

View File

@ -401,12 +401,16 @@ class EnergyBasedRecorder:
if time_since_last >= self.tts_accumulation_time and len(self.tts_buffer) >= self.tts_buffer_min_size:
return True
# 检查句子特征 - 长句子优先调整为100字符防止回声
if len(sentence) > 100: # 超过100字符的长句子立即触发
# 检查是否为完整句子(使用新的严格检测
if self._is_complete_sentence(sentence):
return True
# 中等长度句子80-100字符如果有结束标点也触发
if len(sentence) > 80:
# 检查句子特征 - 长句子优先50字符以上
if len(sentence) > 50: # 超过50字符的句子立即触发
return True
# 中等长度句子30-50字符如果有结束标点也触发
if len(sentence) > 30:
end_punctuations = ['', '', '', '.', '!', '?']
if any(sentence.strip().endswith(p) for p in end_punctuations):
return True
@ -1808,12 +1812,18 @@ class EnergyBasedRecorder:
if not text or len(text.strip()) == 0:
return False
# 增加最小长度要求 - 至少10个字符才考虑作为完整句子
if len(text.strip()) < 10:
return False
# 句子结束标点符号
sentence_endings = r'[。!?.!?]'
# 检查是否以句子结束符结尾
if re.search(sentence_endings + r'\s*$', text):
return True
# 对于以结束符结尾的句子要求至少15个字符
if len(text.strip()) >= 15:
return True
# 检查是否包含句子结束符(可能在句子中间)
if re.search(sentence_endings, text):
@ -1821,18 +1831,25 @@ class EnergyBasedRecorder:
remaining_text = re.split(sentence_endings, text, 1)[-1]
if len(remaining_text.strip()) > 0:
return False
# 对于包含结束符的句子要求至少20个字符
if len(text.strip()) >= 20:
return True
# 对于较长的文本超过50字符即使没有结束符也可以考虑
if len(text.strip()) >= 50:
return True
# 对于较短的文本,如果包含常见完整句式模式
common_patterns = [
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
r'^[\u4e00-\u9fff]{2,4}[的得了]$' # 2-4个中文字+的/了/得
]
for pattern in common_patterns:
if re.match(pattern, text):
return True
# 对于中等长度的文本,如果包含常见完整句式模式
if len(text.strip()) >= 20:
common_patterns = [
r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]',
r'^(你好|谢谢|再见|是的|不是|好的|没问题)',
r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得
]
for pattern in common_patterns:
if re.match(pattern, text):
return True
return False

113
start_with_logging.py Normal file
View File

@ -0,0 +1,113 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
启动脚本示例
演示如何使用带日志记录的多进程录音系统
"""
import os
import sys
import argparse
from datetime import datetime
def ensure_logs_directory():
"""确保日志目录存在"""
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
print(f"✅ 创建日志目录: {log_dir}")
return log_dir
def cleanup_old_logs(log_dir="logs", max_files=10):
"""清理旧的日志文件"""
if not os.path.exists(log_dir):
return
log_files = []
for file in os.listdir(log_dir):
if file.endswith('.log'):
file_path = os.path.join(log_dir, file)
log_files.append((file_path, os.path.getmtime(file_path)))
# 按修改时间排序,删除最旧的文件
log_files.sort(key=lambda x: x[1])
while len(log_files) > max_files:
oldest_file = log_files[0][0]
try:
os.remove(oldest_file)
print(f"🗑️ 删除旧日志文件: {oldest_file}")
log_files.pop(0)
except Exception as e:
print(f"⚠️ 删除日志文件失败 {oldest_file}: {e}")
break
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='带日志记录的多进程录音系统启动器',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
python start_with_logging.py # 使用默认设置
python start_with_logging.py --clean-logs # 清理旧日志
python start_with_logging.py --log-dir my_logs # 指定日志目录
"""
)
parser.add_argument('--character', '-c', type=str, default='libai',
help='选择角色 (默认: libai)')
parser.add_argument('--log-dir', type=str, default='logs',
help='日志目录路径 (默认: logs)')
parser.add_argument('--clean-logs', action='store_true',
help='清理旧日志文件')
parser.add_argument('--max-log-files', type=int, default=10,
help='保留的最大日志文件数量 (默认: 10)')
parser.add_argument('--config', type=str,
help='配置文件路径')
parser.add_argument('--verbose', '-v', action='store_true',
help='详细输出')
args = parser.parse_args()
print("🚀 带日志记录的多进程录音系统")
print("=" * 60)
# 确保日志目录存在
log_dir = ensure_logs_directory()
# 清理旧日志文件
if args.clean_logs:
cleanup_old_logs(log_dir, args.max_log_files)
# 显示日志配置信息
print(f"📁 日志目录: {log_dir}")
print(f"🎭 角色: {args.character}")
print("=" * 60)
# 导入主模块并启动
try:
# 修改sys.argv以传递参数给主程序
sys.argv = ['multiprocess_recorder.py']
if args.character:
sys.argv.extend(['-c', args.character])
if args.config:
sys.argv.extend(['--config', args.config])
if args.verbose:
sys.argv.append('--verbose')
# 导入并运行主程序
import multiprocess_recorder
multiprocess_recorder.main()
except KeyboardInterrupt:
print("\n👋 用户中断")
except Exception as e:
print(f"❌ 启动失败: {e}")
if args.verbose:
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@ -1,96 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试大语言模型API功能
"""
import os
import requests
import json
def test_llm_api():
"""测试大语言模型API"""
# 检查API密钥
api_key = os.environ.get("ARK_API_KEY")
if not api_key:
print("❌ 未设置 ARK_API_KEY 环境变量")
return False
print(f"✅ API密钥已设置: {api_key[:20]}...")
# API配置
api_url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
model = "doubao-1-5-pro-32k-250115"
# 测试消息
test_message = "你好,请简单介绍一下自己"
try:
print("🤖 测试大语言模型API...")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"model": model,
"messages": [
{
"role": "system",
"content": "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。"
},
{
"role": "user",
"content": test_message
}
]
}
response = requests.post(api_url, headers=headers, json=data, timeout=30)
print(f"📡 HTTP状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print("✅ API调用成功")
if "choices" in result and len(result["choices"]) > 0:
llm_response = result["choices"][0]["message"]["content"]
print(f"💬 AI回复: {llm_response}")
# 显示完整响应结构
print("\n📋 完整响应结构:")
print(json.dumps(result, indent=2, ensure_ascii=False))
return True
else:
print("❌ 响应格式错误")
print(f"响应内容: {response.text}")
return False
else:
print(f"❌ API调用失败: {response.status_code}")
print(f"响应内容: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求失败: {e}")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
if __name__ == "__main__":
print("🧪 测试大语言模型API功能")
print("=" * 50)
success = test_llm_api()
if success:
print("\n✅ 大语言模型功能测试通过!")
print("🚀 现在可以运行完整的语音助手系统了")
else:
print("\n❌ 大语言模型功能测试失败")
print("🔧 请检查API密钥和网络连接")

View File

@ -1,108 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试流式响应解析的脚本
"""
import json
import requests
import os
def test_streaming_response():
"""测试流式响应解析"""
# 检查API密钥
api_key = os.environ.get("ARK_API_KEY")
if not api_key:
print("❌ 请设置 ARK_API_KEY 环境变量")
return
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
data = {
"messages": [
{
"content": "你是一个智能助手,请回答问题。",
"role": "system"
},
{
"content": "你好,请简单介绍一下自己",
"role": "user"
}
],
"model": "doubao-1-5-pro-32k-250115",
"stream": True
}
print("🚀 开始测试流式响应...")
try:
response = requests.post(
"https://ark.cn-beijing.volces.com/api/v3/chat/completions",
headers=headers,
json=data,
stream=True,
timeout=30
)
print(f"📊 响应状态: {response.status_code}")
if response.status_code != 200:
print(f"❌ 请求失败: {response.text}")
return
print("🔍 开始解析流式响应...")
accumulated_text = ""
line_count = 0
for line in response.iter_lines(decode_unicode=True):
line_count += 1
if not line or not line.strip():
continue
# 预处理
line = line.strip()
print(f"\n--- 第{line_count}行 ---")
print(f"原始内容: {repr(line)}")
if line.startswith("data: "):
data_str = line[6:] # 移除 "data: " 前缀
print(f"处理后: {repr(data_str)}")
if data_str == "[DONE]":
print("✅ 流结束")
break
try:
chunk_data = json.loads(data_str)
print(f"✅ JSON解析成功: {chunk_data}")
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
accumulated_text += content
print(f"💬 累计内容: {accumulated_text}")
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}")
print(f"🔍 问题数据: {repr(data_str)}")
except Exception as e:
print(f"❌ 其他错误: {e}")
print(f"\n✅ 测试完成,总共处理了 {line_count}")
print(f"📝 最终内容: {accumulated_text}")
except Exception as e:
print(f"❌ 测试失败: {e}")
if __name__ == "__main__":
test_streaming_response()

View File

@ -1,840 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
语音交互聊天系统 - 集成豆包AI
基于能量检测的录音 + 豆包语音识别 + TTS回复
"""
import sys
import os
import time
import threading
import asyncio
import subprocess
import wave
import struct
import json
import gzip
import uuid
from typing import Dict, Any, Optional
import pyaudio
import numpy as np
import websockets
# 豆包协议常量
PROTOCOL_VERSION = 0b0001
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111
NO_SEQUENCE = 0b0000
MSG_WITH_EVENT = 0b0100
NO_SERIALIZATION = 0b0000
JSON = 0b0001
GZIP = 0b0001
class DoubaoClient:
"""豆包音频处理客户端"""
def __init__(self):
self.base_url = "wss://openspeech.bytedance.com/api/v3/realtime/dialogue"
self.app_id = "8718217928"
self.access_key = "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc"
self.app_key = "PlgvMymc7f3tQnJ6"
self.resource_id = "volc.speech.dialog"
self.session_id = str(uuid.uuid4())
self.ws = None
self.log_id = ""
def get_headers(self) -> Dict[str, str]:
"""获取请求头"""
return {
"X-Api-App-ID": self.app_id,
"X-Api-Access-Key": self.access_key,
"X-Api-Resource-Id": self.resource_id,
"X-Api-App-Key": self.app_key,
"X-Api-Connect-Id": str(uuid.uuid4()),
}
def generate_header(self, message_type=CLIENT_FULL_REQUEST,
message_type_specific_flags=MSG_WITH_EVENT,
serial_method=JSON, compression_type=GZIP) -> bytes:
"""生成协议头"""
header = bytearray()
header.append((PROTOCOL_VERSION << 4) | 1) # version + header_size
header.append((message_type << 4) | message_type_specific_flags)
header.append((serial_method << 4) | compression_type)
header.append(0x00) # reserved
return bytes(header)
async def connect(self) -> None:
"""建立WebSocket连接"""
print(f"🔗 连接豆包服务器...")
try:
self.ws = await websockets.connect(
self.base_url,
additional_headers=self.get_headers(),
ping_interval=None
)
# 获取log_id
if hasattr(self.ws, 'response_headers'):
self.log_id = self.ws.response_headers.get("X-Tt-Logid")
elif hasattr(self.ws, 'headers'):
self.log_id = self.ws.headers.get("X-Tt-Logid")
print(f"✅ 连接成功, log_id: {self.log_id}")
# 发送StartConnection请求
await self._send_start_connection()
# 发送StartSession请求
await self._send_start_session()
except Exception as e:
print(f"❌ 连接失败: {e}")
raise
def parse_response(self, response):
"""解析响应"""
if len(response) < 4:
return None
protocol_version = response[0] >> 4
header_size = response[0] & 0x0f
message_type = response[1] >> 4
flags = response[1] & 0x0f
payload_start = header_size * 4
payload = response[payload_start:]
result = {
'protocol_version': protocol_version,
'header_size': header_size,
'message_type': message_type,
'flags': flags,
'payload': payload,
'payload_size': len(payload)
}
# 解析payload
if len(payload) >= 4:
result['event'] = int.from_bytes(payload[:4], 'big')
if len(payload) >= 8:
session_id_len = int.from_bytes(payload[4:8], 'big')
if len(payload) >= 8 + session_id_len:
result['session_id'] = payload[8:8+session_id_len].decode()
if len(payload) >= 12 + session_id_len:
data_size = int.from_bytes(payload[8+session_id_len:12+session_id_len], 'big')
result['data_size'] = data_size
result['data'] = payload[12+session_id_len:12+session_id_len+data_size]
# 尝试解析JSON数据
try:
result['json_data'] = json.loads(result['data'].decode('utf-8'))
except:
pass
return result
async def _send_start_connection(self) -> None:
"""发送StartConnection请求"""
request = bytearray(self.generate_header())
request.extend(int(1).to_bytes(4, 'big'))
payload_bytes = b"{}"
payload_bytes = gzip.compress(payload_bytes)
request.extend(len(payload_bytes).to_bytes(4, 'big'))
request.extend(payload_bytes)
await self.ws.send(request)
response = await self.ws.recv()
async def _send_start_session(self) -> None:
"""发送StartSession请求"""
session_config = {
"asr": {"extra": {"end_smooth_window_ms": 1500}},
"tts": {
"speaker": "zh_female_vv_jupiter_bigtts",
"audio_config": {"channel": 1, "format": "pcm", "sample_rate": 24000}
},
"dialog": {
"bot_name": "豆包",
"system_role": "你使用活泼灵动的女声,性格开朗,热爱生活。",
"speaking_style": "你的说话风格简洁明了,语速适中,语调自然。",
"location": {"city": "北京"},
"extra": {
"strict_audit": False,
"audit_response": "支持客户自定义安全审核回复话术。",
"recv_timeout": 30,
"input_mod": "audio",
},
},
}
request = bytearray(self.generate_header())
request.extend(int(100).to_bytes(4, 'big'))
request.extend(len(self.session_id).to_bytes(4, 'big'))
request.extend(self.session_id.encode())
payload_bytes = json.dumps(session_config).encode()
payload_bytes = gzip.compress(payload_bytes)
request.extend(len(payload_bytes).to_bytes(4, 'big'))
request.extend(payload_bytes)
await self.ws.send(request)
response = await self.ws.recv()
await asyncio.sleep(1.0)
async def process_audio(self, audio_data: bytes) -> tuple[str, bytes]:
"""处理音频并返回(识别文本, TTS音频)"""
try:
# 发送音频数据 - 使用与doubao_simple.py相同的格式
task_request = bytearray(
self.generate_header(message_type=CLIENT_AUDIO_ONLY_REQUEST,
serial_method=NO_SERIALIZATION))
task_request.extend(int(200).to_bytes(4, 'big'))
task_request.extend(len(self.session_id).to_bytes(4, 'big'))
task_request.extend(self.session_id.encode())
payload_bytes = gzip.compress(audio_data)
task_request.extend(len(payload_bytes).to_bytes(4, 'big'))
task_request.extend(payload_bytes)
await self.ws.send(task_request)
print("📤 音频数据已发送")
recognized_text = ""
tts_audio = b""
response_count = 0
# 接收响应 - 使用与doubao_simple.py相同的解析逻辑
audio_chunks = []
max_responses = 30
while response_count < max_responses:
try:
response = await asyncio.wait_for(self.ws.recv(), timeout=30.0)
response_count += 1
parsed = self.parse_response(response)
if not parsed:
continue
print(f"📥 响应 {response_count}: message_type={parsed['message_type']}, event={parsed.get('event', 'N/A')}, size={parsed['payload_size']}")
# 处理不同类型的响应
if parsed['message_type'] == 11: # SERVER_ACK - 可能包含音频
if 'data' in parsed and parsed['data_size'] > 0:
audio_chunks.append(parsed['data'])
print(f"收集到音频块: {parsed['data_size']} 字节")
elif parsed['message_type'] == 9: # SERVER_FULL_RESPONSE
event = parsed.get('event', 0)
if event == 450: # ASR开始
print("🎤 ASR处理开始")
elif event == 451: # ASR结果
if 'json_data' in parsed and 'results' in parsed['json_data']:
text = parsed['json_data']['results'][0].get('text', '')
recognized_text = text
print(f"🧠 识别结果: {text}")
elif event == 459: # ASR结束
print("✅ ASR处理结束")
elif event == 350: # TTS开始
print("🎵 TTS生成开始")
elif event == 359: # TTS结束
print("✅ TTS生成结束")
break
elif event == 550: # TTS音频数据
if 'data' in parsed and parsed['data_size'] > 0:
# 检查是否是JSON音频元数据还是实际音频数据
try:
json.loads(parsed['data'].decode('utf-8'))
print("收到TTS音频元数据")
except:
# 不是JSON可能是音频数据
audio_chunks.append(parsed['data'])
print(f"收集到TTS音频块: {parsed['data_size']} 字节")
except asyncio.TimeoutError:
print(f"⏰ 等待响应 {response_count + 1} 超时")
break
except websockets.exceptions.ConnectionClosed:
print("🔌 连接已关闭")
break
print(f"共收到 {response_count} 个响应,收集到 {len(audio_chunks)} 个音频块")
# 合并音频数据
if audio_chunks:
tts_audio = b''.join(audio_chunks)
print(f"合并后的音频数据: {len(tts_audio)} 字节")
# 转换TTS音频格式32位浮点 -> 16位整数
if tts_audio:
# 检查是否是GZIP压缩数据
try:
decompressed = gzip.decompress(tts_audio)
print(f"解压缩后音频数据: {len(decompressed)} 字节")
audio_to_write = decompressed
except:
print("音频数据不是GZIP压缩格式直接使用原始数据")
audio_to_write = tts_audio
# 检查音频数据长度是否是4的倍数32位浮点
if len(audio_to_write) % 4 != 0:
print(f"警告:音频数据长度 {len(audio_to_write)} 不是4的倍数截断到最近的倍数")
audio_to_write = audio_to_write[:len(audio_to_write) // 4 * 4]
# 将32位浮点转换为16位整数
float_count = len(audio_to_write) // 4
int16_data = bytearray(float_count * 2)
for i in range(float_count):
# 读取32位浮点数小端序
float_value = struct.unpack('<f', audio_to_write[i*4:i*4+4])[0]
# 将浮点数限制在[-1.0, 1.0]范围内
float_value = max(-1.0, min(1.0, float_value))
# 转换为16位整数
int16_value = int(float_value * 32767)
# 写入16位整数小端序
int16_data[i*2:i*2+2] = struct.pack('<h', int16_value)
tts_audio = bytes(int16_data)
print(f"✅ 音频转换完成: {len(tts_audio)} 字节")
return recognized_text, tts_audio
except Exception as e:
print(f"❌ 处理失败: {e}")
import traceback
traceback.print_exc()
return "", b""
async def send_silence_data(self, duration_ms=100) -> None:
"""发送静音数据保持连接活跃"""
try:
# 生成静音音频数据
samples = int(16000 * duration_ms / 1000) # 16kHz采样率
silence_data = bytes(samples * 2) # 16位PCM
# 发送静音数据
task_request = bytearray(
self.generate_header(message_type=CLIENT_AUDIO_ONLY_REQUEST,
serial_method=NO_SERIALIZATION))
task_request.extend(int(200).to_bytes(4, 'big'))
task_request.extend(len(self.session_id).to_bytes(4, 'big'))
task_request.extend(self.session_id.encode())
payload_bytes = gzip.compress(silence_data)
task_request.extend(len(payload_bytes).to_bytes(4, 'big'))
task_request.extend(payload_bytes)
await self.ws.send(task_request)
print("💓 发送心跳数据保持连接")
# 简单处理响应(不等待完整响应)
try:
response = await asyncio.wait_for(self.ws.recv(), timeout=5.0)
# 只确认收到响应,不处理内容
except asyncio.TimeoutError:
print("⚠️ 心跳响应超时")
except websockets.exceptions.ConnectionClosed:
print("❌ 心跳时连接已关闭")
raise
except Exception as e:
print(f"❌ 发送心跳数据失败: {e}")
async def close(self) -> None:
"""关闭连接"""
if self.ws:
try:
await self.ws.close()
except:
pass
print("🔌 连接已关闭")
class VoiceChatRecorder:
"""语音聊天录音系统"""
def __init__(self, enable_ai_chat=True):
# 音频参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 1024
# 能量检测参数
self.energy_threshold = 500
self.silence_threshold = 2.0
self.min_recording_time = 1.0
self.max_recording_time = 20.0
# 状态变量
self.audio = None
self.stream = None
self.running = False
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
self.energy_history = []
self.zcr_history = []
# AI聊天功能
self.enable_ai_chat = enable_ai_chat
self.doubao_client = None
self.is_processing_ai = False
self.heartbeat_thread = None
self.last_heartbeat_time = time.time()
self.heartbeat_interval = 10.0 # 每10秒发送一次心跳
# 预录音缓冲区
self.pre_record_buffer = []
self.pre_record_max_frames = int(2.0 * self.RATE / self.CHUNK_SIZE)
# 播放状态
self.is_playing = False
# ZCR检测参数
self.consecutive_low_zcr_count = 0
self.low_zcr_threshold_count = 15
self.voice_activity_history = []
self._setup_audio()
def _setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
def generate_silence_audio(self, duration_ms=100):
"""生成静音音频数据"""
# 生成指定时长的静音音频16位PCM值为0
samples = int(self.RATE * duration_ms / 1000)
silence_data = bytes(samples * 2) # 16位 = 2字节每样本
return silence_data
def calculate_energy(self, audio_data):
"""计算音频能量"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
rms = np.sqrt(np.mean(audio_array ** 2))
if not self.recording:
self.energy_history.append(rms)
if len(self.energy_history) > 50:
self.energy_history.pop(0)
return rms
def calculate_zero_crossing_rate(self, audio_data):
"""计算零交叉率"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
zcr = zero_crossings / len(audio_array) * self.RATE
self.zcr_history.append(zcr)
if len(self.zcr_history) > 30:
self.zcr_history.pop(0)
return zcr
def is_voice_active(self, energy, zcr):
"""使用ZCR进行语音活动检测"""
# 16000Hz采样率下的语音ZCR范围
zcr_condition = 2400 < zcr < 12000
return zcr_condition
def save_recording(self, audio_data, filename=None):
"""保存录音"""
if filename is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_data)
print(f"✅ 录音已保存: {filename}")
return True, filename
except Exception as e:
print(f"❌ 保存录音失败: {e}")
return False, None
def play_audio(self, filename):
"""播放音频文件"""
try:
# 停止当前录音
if self.recording:
self.recording = False
self.recorded_frames = []
# 关闭输入流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
self.is_playing = True
time.sleep(0.2)
# 使用系统播放器
print(f"🔊 播放: {filename}")
subprocess.run(['aplay', filename], check=True)
print("✅ 播放完成")
except Exception as e:
print(f"❌ 播放失败: {e}")
finally:
self.is_playing = False
time.sleep(0.2)
self._setup_audio()
def update_pre_record_buffer(self, audio_data):
"""更新预录音缓冲区"""
self.pre_record_buffer.append(audio_data)
if len(self.pre_record_buffer) > self.pre_record_max_frames:
self.pre_record_buffer.pop(0)
def start_recording(self):
"""开始录音"""
print("🎙️ 检测到声音,开始录音...")
self.recording = True
self.recorded_frames = []
self.recorded_frames.extend(self.pre_record_buffer)
self.pre_record_buffer = []
self.recording_start_time = time.time()
self.last_sound_time = time.time()
self.consecutive_low_zcr_count = 0
def stop_recording(self):
"""停止录音"""
if len(self.recorded_frames) > 0:
audio_data = b''.join(self.recorded_frames)
duration = len(audio_data) / (self.RATE * 2)
print(f"📝 录音完成,时长: {duration:.2f}")
if self.enable_ai_chat:
# AI聊天模式
self.process_with_ai(audio_data)
else:
# 普通录音模式
success, filename = self.save_recording(audio_data)
if success and filename:
print("=" * 50)
print("🔊 播放刚才录制的音频...")
self.play_audio(filename)
print("=" * 50)
self.recording = False
self.recorded_frames = []
self.recording_start_time = None
self.last_sound_time = None
def process_with_ai(self, audio_data):
"""使用AI处理录音"""
if self.is_processing_ai:
print("⏳ AI正在处理中请稍候...")
return
self.is_processing_ai = True
# 在新线程中处理AI
ai_thread = threading.Thread(target=self._ai_processing_thread, args=(audio_data,))
ai_thread.daemon = True
ai_thread.start()
def _heartbeat_thread(self):
"""心跳线程 - 定期发送静音数据保持连接活跃"""
while self.running and self.doubao_client and self.doubao_client.ws:
current_time = time.time()
if current_time - self.last_heartbeat_time >= self.heartbeat_interval:
try:
# 异步发送心跳数据
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.doubao_client.send_silence_data())
self.last_heartbeat_time = current_time
except Exception as e:
print(f"❌ 心跳失败: {e}")
# 如果心跳失败,可能需要重新连接
break
finally:
loop.close()
except Exception as e:
print(f"❌ 心跳线程异常: {e}")
break
# 睡眠一段时间
time.sleep(1.0)
print("📡 心跳线程结束")
def _ai_processing_thread(self, audio_data):
"""AI处理线程"""
try:
print("🤖 开始AI处理...")
print("🧠 正在进行语音识别...")
# 异步处理
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 连接豆包
self.doubao_client = DoubaoClient()
loop.run_until_complete(self.doubao_client.connect())
# 启动心跳线程
self.last_heartbeat_time = time.time()
self.heartbeat_thread = threading.Thread(target=self._heartbeat_thread)
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
print("💓 心跳线程已启动")
# 语音识别和TTS回复
recognized_text, tts_audio = loop.run_until_complete(
self.doubao_client.process_audio(audio_data)
)
if recognized_text:
print(f"🗣️ 你说: {recognized_text}")
if tts_audio:
# 保存TTS音频
tts_filename = "ai_response.wav"
with wave.open(tts_filename, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(24000)
wav_file.writeframes(tts_audio)
print("🎵 AI回复生成完成")
print("=" * 50)
print("🔊 播放AI回复...")
self.play_audio(tts_filename)
print("=" * 50)
else:
print("❌ 未收到AI回复")
# 等待一段时间再关闭连接,以便心跳继续工作
print("⏳ 等待5秒后关闭连接...")
time.sleep(5)
except Exception as e:
print(f"❌ AI处理失败: {e}")
finally:
# 停止心跳线程
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
print("🛑 停止心跳线程")
self.heartbeat_thread = None
# 关闭连接
if self.doubao_client:
loop.run_until_complete(self.doubao_client.close())
loop.close()
except Exception as e:
print(f"❌ AI处理线程失败: {e}")
finally:
self.is_processing_ai = False
def run(self):
"""运行语音聊天系统"""
if not self.stream:
print("❌ 音频设备未初始化")
return
self.running = True
if self.enable_ai_chat:
print("🤖 语音聊天AI助手")
print("=" * 50)
print("🎯 功能特点:")
print("- 🎙️ 智能语音检测")
print("- 🧠 豆包AI语音识别")
print("- 🗣️ AI智能回复")
print("- 🔊 TTS语音播放")
print("- 🔄 实时对话")
print("=" * 50)
print("📖 使用说明:")
print("- 说话自动录音")
print("- 静音2秒结束录音")
print("- AI自动识别并回复")
print("- 按 Ctrl+C 退出")
print("=" * 50)
else:
print("🎙️ 智能录音系统")
print("=" * 50)
print("📖 使用说明:")
print("- 说话自动录音")
print("- 静音2秒结束录音")
print("- 录音完成后自动播放")
print("- 按 Ctrl+C 退出")
print("=" * 50)
try:
while self.running:
# 如果正在播放AI回复跳过音频处理
if self.is_playing or self.is_processing_ai:
status = "🤖 AI处理中..."
print(f"\r{status}", end='', flush=True)
time.sleep(0.1)
continue
# 读取音频数据
data = self.stream.read(self.CHUNK_SIZE, exception_on_overflow=False)
if len(data) == 0:
continue
# 计算能量和ZCR
energy = self.calculate_energy(data)
zcr = self.calculate_zero_crossing_rate(data)
if self.recording:
# 录音模式
self.recorded_frames.append(data)
recording_duration = time.time() - self.recording_start_time
# 检测语音活动
if self.is_voice_active(energy, zcr):
self.last_sound_time = time.time()
self.consecutive_low_zcr_count = 0
else:
self.consecutive_low_zcr_count += 1
# 检查是否应该结束录音
should_stop = False
# ZCR静音检测
if self.consecutive_low_zcr_count >= self.low_zcr_threshold_count:
should_stop = True
# 时间静音检测
if not should_stop and time.time() - self.last_sound_time > self.silence_threshold:
should_stop = True
# 执行停止录音
if should_stop and recording_duration >= self.min_recording_time:
print(f"\n🔇 检测到静音,结束录音")
self.stop_recording()
# 检查最大录音时间
if recording_duration > self.max_recording_time:
print(f"\n⏰ 达到最大录音时间")
self.stop_recording()
# 显示录音状态
is_voice = self.is_voice_active(energy, zcr)
zcr_count = f"{self.consecutive_low_zcr_count}/{self.low_zcr_threshold_count}"
status = f"录音中... {recording_duration:.1f}s | ZCR: {zcr:.0f} | 语音: {is_voice} | 静音计数: {zcr_count}"
print(f"\r{status}", end='', flush=True)
else:
# 监听模式
self.update_pre_record_buffer(data)
if self.is_voice_active(energy, zcr):
# 检测到声音,开始录音
self.start_recording()
else:
# 显示监听状态
is_voice = self.is_voice_active(energy, zcr)
buffer_usage = len(self.pre_record_buffer) / self.pre_record_max_frames * 100
status = f"监听中... ZCR: {zcr:.0f} | 语音: {is_voice} | 缓冲: {buffer_usage:.0f}%"
print(f"\r{status}", end='', flush=True)
time.sleep(0.01)
except KeyboardInterrupt:
print("\n👋 退出")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
self.stop()
def stop(self):
"""停止系统"""
self.running = False
# 停止心跳线程
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
print("🛑 停止心跳线程")
self.heartbeat_thread = None
if self.recording:
self.stop_recording()
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
# 关闭AI连接
if self.doubao_client and self.doubao_client.ws:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.doubao_client.close())
loop.close()
except:
pass
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='语音聊天AI助手')
parser.add_argument('--no-ai', action='store_true', help='禁用AI功能仅录音')
args = parser.parse_args()
enable_ai = not args.no_ai
if enable_ai:
print("🚀 语音聊天AI助手")
else:
print("🚀 智能录音系统")
print("=" * 50)
# 创建语音聊天系统
recorder = VoiceChatRecorder(enable_ai_chat=enable_ai)
print("✅ 系统初始化成功")
print("=" * 50)
# 开始运行
recorder.run()
if __name__ == "__main__":
main()

198
zcr_monitor.py Normal file
View File

@ -0,0 +1,198 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时ZCR监控工具
用于观察实际的ZCR值和测试语音检测
"""
import threading
import time
import numpy as np
import pyaudio
class ZCRMonitor:
"""ZCR实时监控器"""
def __init__(self):
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK_SIZE = 1024
# 监控参数
self.running = False
self.zcr_history = []
self.max_history = 100
# 音频设备
self.audio = None
self.stream = None
# 检测阈值匹配recorder.py的设置
self.zcr_min = 2400
self.zcr_max = 12000
def setup_audio(self):
"""设置音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE
)
return True
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
return False
def calculate_zcr(self, audio_data):
"""计算零交叉率"""
if len(audio_data) == 0:
return 0
audio_array = np.frombuffer(audio_data, dtype=np.int16)
zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0)
zcr = zero_crossings / len(audio_array) * self.RATE
return zcr
def is_voice(self, zcr):
"""简单的语音检测"""
return self.zcr_min < zcr < self.zcr_max
def monitor_callback(self, in_data, frame_count, time_info, status):
"""音频回调函数"""
zcr = self.calculate_zcr(in_data)
# 更新历史
self.zcr_history.append(zcr)
if len(self.zcr_history) > self.max_history:
self.zcr_history.pop(0)
# 计算统计信息
if len(self.zcr_history) > 10:
avg_zcr = np.mean(self.zcr_history[-10:]) # 最近10个值的平均
std_zcr = np.std(self.zcr_history[-10:])
else:
avg_zcr = zcr
std_zcr = 0
# 判断是否为语音
voice_detected = self.is_voice(zcr)
# 实时显示
status = "🎤" if voice_detected else "🔇"
color = "\033[92m" if voice_detected else "\033[90m" # 绿色或灰色
reset = "\033[0m"
# 显示信息
info = (f"{color}{status} ZCR: {zcr:.0f} | "
f"阈值: {self.zcr_min}-{self.zcr_max} | "
f"平均: {avg_zcr:.0f}±{std_zcr:.0f}{reset}")
print(f"\r{info}", end='', flush=True)
return (in_data, pyaudio.paContinue)
def start_monitoring(self):
"""开始监控"""
print("🎙️ ZCR实时监控工具")
print("=" * 50)
print("📊 当前检测阈值:")
print(f" ZCR范围: {self.zcr_min} - {self.zcr_max}")
print("💡 请说话测试语音检测...")
print("🛑 按 Ctrl+C 停止监控")
print("=" * 50)
try:
# 使用回调模式
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK_SIZE,
stream_callback=self.monitor_callback
)
self.stream.start_stream()
self.running = True
# 主循环
while self.running:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n🛑 监控停止")
finally:
self.cleanup()
def show_statistics(self):
"""显示统计信息"""
if not self.zcr_history:
return
print("\n📊 ZCR统计信息:")
print(f" 样本数量: {len(self.zcr_history)}")
print(f" 最小值: {min(self.zcr_history):.0f}")
print(f" 最大值: {max(self.zcr_history):.0f}")
print(f" 平均值: {np.mean(self.zcr_history):.0f}")
print(f" 标准差: {np.std(self.zcr_history):.0f}")
# 分析语音检测
voice_count = sum(1 for zcr in self.zcr_history if self.is_voice(zcr))
voice_percentage = voice_count / len(self.zcr_history) * 100
print(f" 语音检测: {voice_count}/{len(self.zcr_history)} ({voice_percentage:.1f}%)")
# 建议新的阈值
avg_zcr = np.mean(self.zcr_history)
std_zcr = np.std(self.zcr_history)
suggested_min = max(800, avg_zcr + std_zcr)
suggested_max = min(8000, avg_zcr + 4 * std_zcr)
print(f"\n🎯 建议的检测阈值:")
print(f" 最小值: {suggested_min:.0f}")
print(f" 最大值: {suggested_max:.0f}")
def cleanup(self):
"""清理资源"""
self.running = False
if self.stream:
try:
self.stream.stop_stream()
self.stream.close()
except:
pass
if self.audio:
try:
self.audio.terminate()
except:
pass
# 显示最终统计
self.show_statistics()
def main():
"""主函数"""
monitor = ZCRMonitor()
if not monitor.setup_audio():
print("❌ 无法初始化音频设备")
return
try:
monitor.start_monitoring()
except Exception as e:
print(f"❌ 监控过程中出错: {e}")
finally:
monitor.cleanup()
if __name__ == "__main__":
main()