This commit is contained in:
朱潮 2025-09-22 00:40:12 +08:00
parent 137ff6edfd
commit 67549566ec
5 changed files with 24 additions and 545 deletions

View File

@ -134,7 +134,7 @@ class ControlSystem:
'llm': {
'api_url': "https://ark.cn-beijing.volces.com/api/v3/chat/completions",
'model': "doubao-seed-1-6-flash-250828",
'api_key': os.environ.get("ARK_API_KEY", ""),
'api_key': "f8e43677-1c23-4e21-8a4c-66e7103a8155",
'max_tokens': 50
},
'tts': {

23
simple_nfc_reader.py Normal file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
import subprocess
import os
import re
import sys
def read_uid():
os.environ['LIBNFC_DRIVER'] = 'pn532_i2c'
os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1'
try:
result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout)
if match:
return match.group(1).strip().replace(' ', '').upper()
except:
pass
return None
if __name__ == "__main__":
uid = read_uid()
print(uid if uid else "000000")

View File

@ -1,194 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试新的自动校准和监听启动流程
"""
import sys
import os
import time
import signal
from control_system import ControlSystem
def signal_handler(sig, frame):
"""信号处理函数"""
print('\n👋 收到中断信号,正在关闭系统...')
if 'control_system' in globals():
control_system.shutdown()
sys.exit(0)
def main():
"""主函数"""
print("🧪 测试自动校准和监听启动流程")
print("=" * 60)
# 注册信号处理
signal.signal(signal.SIGINT, signal_handler)
try:
# 1. 创建控制系统
print("📦 创建控制系统...")
config = {
'system': {
'max_queue_size': 1000,
'process_timeout': 30,
'heartbeat_interval': 1.0,
'log_level': "INFO"
},
'audio': {
'sample_rate': 16000,
'channels': 1,
'chunk_size': 1024
},
'recording': {
'min_duration': 2.0,
'max_duration': 30.0,
'silence_threshold': 3.0
},
'processing': {
'enable_asr': True,
'enable_llm': True,
'enable_tts': True,
'character': 'libai'
}
}
global control_system
control_system = ControlSystem(config)
# 设置角色
control_system.config['processing']['character'] = 'libai'
print("✅ 控制系统创建完成")
# 2. 测试手动启动流程
print("\n🎯 测试步骤1手动启动流程")
print("-" * 40)
# 2.1 启动进程但不自动校准和监听
print("🔧 启动进程...")
control_system._start_processes()
time.sleep(2) # 等待进程启动
# 2.2 检查初始状态
print("\n🔍 检查初始状态...")
monitoring_status = control_system.get_monitoring_status()
calibration_status = control_system.get_calibration_status()
print(f" 监听状态: {monitoring_status}")
print(f" 校准状态: {calibration_status}")
# 2.3 手动启动校准
print("\n🎯 手动启动校准...")
success = control_system.start_calibration()
print(f" 校准启动结果: {success}")
if success:
print("\n⏱️ 等待校准完成...")
if control_system.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
else:
print("⚠️ 校准超时")
# 2.4 手动启动监听
print("\n🎯 手动启动监听...")
success = control_system.start_monitoring()
print(f" 监听启动结果: {success}")
time.sleep(1) # 等待监听启动
# 2.5 验证监听状态
monitoring_status = control_system.get_monitoring_status()
print(f" 监听状态: {monitoring_status}")
# 3. 运行一段时间测试
print("\n🎙️ 系统运行测试")
print("-" * 40)
print("💡 请说话测试语音检测功能")
print("⏱️ 运行10秒...")
start_time = time.time()
while time.time() - start_time < 10:
try:
control_system.check_events()
time.sleep(0.1)
except KeyboardInterrupt:
break
# 4. 停止监听
print("\n🛑 停止监听...")
success = control_system.stop_monitoring()
print(f" 停止结果: {success}")
time.sleep(1)
# 5. 关闭系统
print("\n🔄 关闭当前系统...")
control_system.shutdown()
time.sleep(2)
# 6. 测试自动启动流程
print("\n🎯 测试步骤2自动启动流程")
print("-" * 40)
# 创建新的控制系统实例
control_system = ControlSystem(config)
control_system.config['processing']['character'] = 'libai'
print("\n🚀 启动系统(自动校准和监听)...")
# 这里我们只启动进程,不进入主循环
control_system._start_processes()
# 手动执行自动校准和监听流程
print("\n🎯 自动启动校准...")
success = control_system.start_calibration()
if success and control_system.wait_for_calibration_complete(timeout=30):
print("✅ 自动校准完成")
print("\n🎯 自动启动监听...")
success = control_system.start_monitoring()
if success:
print("✅ 自动监听启动完成")
time.sleep(2)
# 检查最终状态
monitoring_status = control_system.get_monitoring_status()
calibration_status = control_system.get_calibration_status()
print(f"\n📊 最终状态:")
print(f" 监听状态: {monitoring_status}")
print(f" 校准状态: {calibration_status}")
# 7. 运行一段时间
print("\n🎙️ 最终运行测试")
print("-" * 40)
print("💡 请说话测试最终功能")
print("⏱️ 运行10秒...")
start_time = time.time()
while time.time() - start_time < 10:
try:
control_system.check_events()
control_system.display_status()
time.sleep(0.1)
except KeyboardInterrupt:
break
print("\n🎉 测试完成!")
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
finally:
# 确保系统关闭
if 'control_system' in globals():
print("\n🛑 确保系统关闭...")
control_system.shutdown()
print("✅ 测试结束")
if __name__ == "__main__":
main()

View File

@ -1,177 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
首次播放逻辑专项测试
验证第一句TTS触发机制的优化效果
"""
import sys
def test_first_playback_logic():
"""测试首次播放逻辑"""
print("🎵 首次播放逻辑专项测试")
print("=" * 50)
# 测试场景1: 短句子积累
print("📝 测试场景1: 短句子积累")
print(" 模拟LLM生成的短句子流:")
short_sentences = [
"你好",
"",
"这个",
"问题",
"确实",
"需要",
"仔细",
"思考",
]
buffer = []
total_length = 0
first_trigger = None
for i, sentence in enumerate(short_sentences):
buffer.append(sentence)
total_length = sum(len(s) for s in buffer)
# 模拟首次播放逻辑
trigger_reason = None
if total_length >= 40 and len(buffer) >= 2:
trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子"
elif len(buffer) >= 5:
trigger_reason = f"缓冲区达到最大值{len(buffer)}"
if trigger_reason and first_trigger is None:
first_trigger = i + 1
print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(buffer)}'")
break
else:
print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)")
if first_trigger is None:
print(" ⚠️ 未触发TTS需要超时机制")
print()
# 测试场景2: 中等长度句子
print("📝 测试场景2: 中等长度句子")
medium_sentences = [
"你好,很高兴见到你。",
"今天天气真不错呢。",
"我们可以一起去公园玩吗?",
"我想那会是一个很好的主意。",
]
buffer = []
total_length = 0
first_trigger = None
for i, sentence in enumerate(medium_sentences):
buffer.append(sentence)
total_length = sum(len(s) for s in buffer)
# 模拟首次播放逻辑
trigger_reason = None
if total_length >= 40 and len(buffer) >= 2:
trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子"
elif len(sentence) >= 25 and sentence.endswith(('', '', '', '.', '!', '?')) and len(buffer) >= 2:
trigger_reason = f"长句子{len(sentence)}字符+缓冲内容"
if trigger_reason and first_trigger is None:
first_trigger = i + 1
print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(buffer)}'")
break
else:
print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)")
if first_trigger is None:
print(" ⚠️ 未触发TTS需要超时机制")
print()
# 测试场景3: 长句子
print("📝 测试场景3: 长句子")
long_sentences = [
"你好,",
"我认为这个问题需要我们从多个角度来分析。",
"首先,让我们仔细了解一下具体情况。",
]
buffer = []
total_length = 0
first_trigger = None
for i, sentence in enumerate(long_sentences):
buffer.append(sentence)
total_length = sum(len(s) for s in buffer)
# 模拟首次播放逻辑
trigger_reason = None
if total_length >= 40 and len(buffer) >= 2:
trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子"
elif len(sentence) >= 25 and sentence.endswith(('', '', '', '.', '!', '?')) and len(buffer) >= 2:
trigger_reason = f"长句子{len(sentence)}字符+缓冲内容"
if trigger_reason and first_trigger is None:
first_trigger = i + 1
print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(buffer)}'")
break
else:
print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)")
if first_trigger is None:
print(" ⚠️ 未触发TTS需要超时机制")
print()
# 测试场景4: 超长单句
print("📝 测试场景4: 超长单句")
ultra_long_sentence = "根据我的分析,这个问题的解决方案需要综合考虑多个因素,包括时间成本、资源投入以及最终的实施效果。"
buffer = ["你好"]
buffer.append(ultra_long_sentence)
total_length = sum(len(s) for s in buffer)
if total_length >= 40 and len(buffer) >= 2:
print(f" 🎵 第2句触发TTS: 总长度{total_length}字符,{len(buffer)}个句子")
print(f" 📝 发送内容: '{''.join(buffer)[:50]}...'")
else:
print(" ⚠️ 未触发TTS")
print()
def show_optimization_comparison():
"""显示优化对比"""
print("📈 首次播放逻辑优化对比")
print("=" * 50)
comparison = {
"优化前": {
"触发条件": "任何完整句子或长句子",
"最小长度": "无明确要求",
"积攒机制": "基本没有",
"可能导致": "播放卡顿,等待数据",
},
"优化后": {
"触发条件": "40+字符且2+句子 或 25+字符完整句+缓冲",
"最小长度": "总长度40字符或单句25字符",
"积攒机制": "智能积累多个句子",
"超时保护": "5秒超时机制",
"效果": "确保有足够数据才开始播放"
}
}
for aspect, details in comparison.items():
print(f"\n🔧 {aspect}:")
for key, value in details.items():
print(f"{key}: {value}")
print(f"\n🎯 核心改进: 确保首次播放有足够的内容,避免因为数据不足导致的播放卡顿")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--comparison":
show_optimization_comparison()
else:
test_first_playback_logic()
show_optimization_comparison()

View File

@ -1,173 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
音频播放优化测试脚本
用于验证缓冲区优化和播放性能改进
"""
import time
import json
import os
import sys
def test_optimization():
"""测试优化效果"""
print("🧪 音频播放优化测试")
print("=" * 50)
# 显示优化后的配置参数
print("📋 优化后的配置参数:")
print(" - 预加载缓冲区: 6个音频块原:3个")
print(" - 智能句子缓冲: 最多5个句子原:3个")
print(" - 最小触发句子: 2个句子原:1个")
print(" - 积累时间窗口: 150ms原:200ms")
print(" - TTS任务队列: 20个任务原:10个")
print(" - 音频块大小: 1024字节原:2048字节")
print(" - 播放冷却期: 0.05秒(原:0.1秒)")
print(" - 长句子触发阈值: 30字符原:50字符")
print(" - 播放缓冲区维护: 4个块原:3个")
print()
# 测试智能句子缓冲逻辑
print("🧠 测试智能句子缓冲逻辑:")
print(" 🔄 首次播放逻辑测试:")
test_sentences = [
"你好", # 短句子
"今天天气怎么样?", # 中等长度
"我觉得这个方案很不错,我们可以试试看。", # 长句子
"这是一个超过三十个字符的句子应该会立即触发TTS生成。", # 超过30字符
"短句。", # 带标点的短句
]
# 模拟首次播放的缓冲区状态
tts_buffer = []
first_playback_started = False
total_buffered_text = ""
trigger_count = 0
for i, sentence in enumerate(test_sentences):
tts_buffer.append(sentence)
total_buffered_text = ''.join(tts_buffer)
# 首次播放逻辑
if not first_playback_started:
should_trigger = False
trigger_reason = ""
# 条件1: 总文本长度超过40字符且至少有2个句子
if len(total_buffered_text) >= 40 and len(tts_buffer) >= 2:
should_trigger = True
trigger_reason = f"总长度{len(total_buffered_text)}字符,{len(tts_buffer)}个句子"
# 条件2: 有1个完整长句子超过25字符
elif len(sentence) >= 25 and sentence.endswith(('', '', '', '.', '!', '?')) and len(tts_buffer) >= 2:
should_trigger = True
trigger_reason = f"长句子{len(sentence)}字符+缓冲内容"
# 条件3: 缓冲区达到最大值(5个)
elif len(tts_buffer) >= 5:
should_trigger = True
trigger_reason = f"缓冲区达到最大值{len(tts_buffer)}"
# 条件4: 超过500ms模拟
if should_trigger:
trigger_count += 1
first_playback_started = True
print(f" 🎵 首次触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{total_buffered_text[:50]}...'")
tts_buffer = []
else:
print(f" ⏳ 首次缓冲: '{sentence}' (累计: {len(total_buffered_text)}字符, {len(tts_buffer)}个句子)")
else:
# 正常播放逻辑
if len(sentence) > 30 or len(tts_buffer) >= 3:
should_trigger = True
trigger_reason = "长句子" if len(sentence) > 30 else "缓冲区满"
if should_trigger:
trigger_count += 1
print(f" ✅ 正常触发TTS: {trigger_reason}")
print(f" 📝 发送内容: '{''.join(tts_buffer)[:30]}...'")
tts_buffer = []
else:
print(f" ⏳ 正常缓冲: '{sentence}'")
print(f" 📊 总触发次数: {trigger_count}")
print()
print(" 📋 首次播放优化效果:")
print(" • 确保首句有足够长度40+字符或25+字符完整句)")
print(" • 积累多个句子避免播放卡顿")
print(" • 5秒超时机制防止无限等待")
print(" • 后续句子正常流式处理")
print()
# 显示性能监控信息
print("📊 性能监控功能:")
print(" - 实时缓冲区大小统计")
print(" - 平均和最大缓冲区大小")
print(" - 播放块数和音频大小统计")
print(" - 每5秒自动输出性能报告")
print()
print("🎯 预期改进效果:")
print(" 1. ✅ 减少音频播放卡顿(更大的缓冲区)")
print(" 2. ✅ 更快的TTS响应优化的触发条件")
print(" 3. ✅ 更流畅的播放体验(减少冷却期)")
print(" 4. ✅ 更好的资源利用(更小的音频块)")
print(" 5. ✅ 实时性能监控(调试和优化)")
print()
print("📝 测试建议:")
print(" 1. 运行主程序观察播放流畅度")
print(" 2. 查看性能统计输出")
print(" 3. 监控缓冲区大小变化")
print(" 4. 测试不同长度的语音响应")
print()
print("🚀 测试完成!可以运行主程序验证优化效果。")
def show_optimization_summary():
"""显示优化总结"""
print("📈 音频播放优化总结")
print("=" * 50)
summary = {
"缓冲区优化": [
"预加载缓冲区: 3→6个块",
"智能句子缓冲: 3→5个句子",
"最小触发缓冲: 1→2个句子",
"TTS任务队列: 10→20个任务"
],
"响应性优化": [
"积累时间窗口: 200ms→150ms",
"长句子触发: 50→30字符",
"中等长度触发: 30→20字符",
"播放冷却期: 0.1s→0.05s"
],
"播放优化": [
"音频块大小: 2048→1024字节",
"播放缓冲维护: 3→4个块",
"数据转移: 2→3个块/次"
],
"监控功能": [
"实时性能统计",
"缓冲区大小监控",
"自动性能报告",
"播放进度追踪"
]
}
for category, improvements in summary.items():
print(f"\n🔧 {category}:")
for improvement in improvements:
print(f"{improvement}")
print(f"\n🎯 总体目标: 减少音频播放卡顿,提升用户体验")
print(f"📊 预期效果: 更流畅的实时语音交互")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--summary":
show_optimization_summary()
else:
test_optimization()