From 67549566ec3a3efffe6dc1dd1ca91226c9c6a367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Mon, 22 Sep 2025 00:40:12 +0800 Subject: [PATCH] ndf --- control_system.py | 2 +- simple_nfc_reader.py | 23 +++++ test_auto_start.py | 194 ----------------------------------------- test_first_playback.py | 177 ------------------------------------- test_optimization.py | 173 ------------------------------------ 5 files changed, 24 insertions(+), 545 deletions(-) create mode 100644 simple_nfc_reader.py delete mode 100644 test_auto_start.py delete mode 100644 test_first_playback.py delete mode 100644 test_optimization.py diff --git a/control_system.py b/control_system.py index 25bc74e..df0958e 100644 --- a/control_system.py +++ b/control_system.py @@ -134,7 +134,7 @@ class ControlSystem: 'llm': { 'api_url': "https://ark.cn-beijing.volces.com/api/v3/chat/completions", 'model': "doubao-seed-1-6-flash-250828", - 'api_key': os.environ.get("ARK_API_KEY", ""), + 'api_key': "f8e43677-1c23-4e21-8a4c-66e7103a8155", 'max_tokens': 50 }, 'tts': { diff --git a/simple_nfc_reader.py b/simple_nfc_reader.py new file mode 100644 index 0000000..fe781d0 --- /dev/null +++ b/simple_nfc_reader.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +import subprocess +import os +import re +import sys + +def read_uid(): + os.environ['LIBNFC_DRIVER'] = 'pn532_i2c' + os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1' + + try: + result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=5) + if result.returncode == 0: + match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout) + if match: + return match.group(1).strip().replace(' ', '').upper() + except: + pass + return None + +if __name__ == "__main__": + uid = read_uid() + print(uid if uid else "000000") \ No newline at end of file diff --git a/test_auto_start.py b/test_auto_start.py deleted file mode 100644 index c152318..0000000 --- a/test_auto_start.py +++ /dev/null @@ -1,194 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试新的自动校准和监听启动流程 -""" - -import sys -import os -import time -import signal -from control_system import ControlSystem - -def signal_handler(sig, frame): - """信号处理函数""" - print('\n👋 收到中断信号,正在关闭系统...') - if 'control_system' in globals(): - control_system.shutdown() - sys.exit(0) - -def main(): - """主函数""" - print("🧪 测试自动校准和监听启动流程") - print("=" * 60) - - # 注册信号处理 - signal.signal(signal.SIGINT, signal_handler) - - try: - # 1. 创建控制系统 - print("📦 创建控制系统...") - config = { - 'system': { - 'max_queue_size': 1000, - 'process_timeout': 30, - 'heartbeat_interval': 1.0, - 'log_level': "INFO" - }, - 'audio': { - 'sample_rate': 16000, - 'channels': 1, - 'chunk_size': 1024 - }, - 'recording': { - 'min_duration': 2.0, - 'max_duration': 30.0, - 'silence_threshold': 3.0 - }, - 'processing': { - 'enable_asr': True, - 'enable_llm': True, - 'enable_tts': True, - 'character': 'libai' - } - } - - global control_system - control_system = ControlSystem(config) - - # 设置角色 - control_system.config['processing']['character'] = 'libai' - - print("✅ 控制系统创建完成") - - # 2. 测试手动启动流程 - print("\n🎯 测试步骤1:手动启动流程") - print("-" * 40) - - # 2.1 启动进程但不自动校准和监听 - print("🔧 启动进程...") - control_system._start_processes() - time.sleep(2) # 等待进程启动 - - # 2.2 检查初始状态 - print("\n🔍 检查初始状态...") - monitoring_status = control_system.get_monitoring_status() - calibration_status = control_system.get_calibration_status() - - print(f" 监听状态: {monitoring_status}") - print(f" 校准状态: {calibration_status}") - - # 2.3 手动启动校准 - print("\n🎯 手动启动校准...") - success = control_system.start_calibration() - print(f" 校准启动结果: {success}") - - if success: - print("\n⏱️ 等待校准完成...") - if control_system.wait_for_calibration_complete(timeout=30): - print("✅ 校准完成") - else: - print("⚠️ 校准超时") - - # 2.4 手动启动监听 - print("\n🎯 手动启动监听...") - success = control_system.start_monitoring() - print(f" 监听启动结果: {success}") - - time.sleep(1) # 等待监听启动 - - # 2.5 验证监听状态 - monitoring_status = control_system.get_monitoring_status() - print(f" 监听状态: {monitoring_status}") - - # 3. 运行一段时间测试 - print("\n🎙️ 系统运行测试") - print("-" * 40) - print("💡 请说话测试语音检测功能") - print("⏱️ 运行10秒...") - - start_time = time.time() - while time.time() - start_time < 10: - try: - control_system.check_events() - time.sleep(0.1) - except KeyboardInterrupt: - break - - # 4. 停止监听 - print("\n🛑 停止监听...") - success = control_system.stop_monitoring() - print(f" 停止结果: {success}") - - time.sleep(1) - - # 5. 关闭系统 - print("\n🔄 关闭当前系统...") - control_system.shutdown() - time.sleep(2) - - # 6. 测试自动启动流程 - print("\n🎯 测试步骤2:自动启动流程") - print("-" * 40) - - # 创建新的控制系统实例 - control_system = ControlSystem(config) - control_system.config['processing']['character'] = 'libai' - - print("\n🚀 启动系统(自动校准和监听)...") - # 这里我们只启动进程,不进入主循环 - control_system._start_processes() - - # 手动执行自动校准和监听流程 - print("\n🎯 自动启动校准...") - success = control_system.start_calibration() - if success and control_system.wait_for_calibration_complete(timeout=30): - print("✅ 自动校准完成") - - print("\n🎯 自动启动监听...") - success = control_system.start_monitoring() - if success: - print("✅ 自动监听启动完成") - - time.sleep(2) - - # 检查最终状态 - monitoring_status = control_system.get_monitoring_status() - calibration_status = control_system.get_calibration_status() - - print(f"\n📊 最终状态:") - print(f" 监听状态: {monitoring_status}") - print(f" 校准状态: {calibration_status}") - - # 7. 运行一段时间 - print("\n🎙️ 最终运行测试") - print("-" * 40) - print("💡 请说话测试最终功能") - print("⏱️ 运行10秒...") - - start_time = time.time() - while time.time() - start_time < 10: - try: - control_system.check_events() - control_system.display_status() - time.sleep(0.1) - except KeyboardInterrupt: - break - - print("\n🎉 测试完成!") - - except Exception as e: - print(f"❌ 测试过程中出错: {e}") - import traceback - traceback.print_exc() - finally: - # 确保系统关闭 - if 'control_system' in globals(): - print("\n🛑 确保系统关闭...") - control_system.shutdown() - - print("✅ 测试结束") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_first_playback.py b/test_first_playback.py deleted file mode 100644 index 69c2333..0000000 --- a/test_first_playback.py +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -首次播放逻辑专项测试 -验证第一句TTS触发机制的优化效果 -""" - -import sys - -def test_first_playback_logic(): - """测试首次播放逻辑""" - print("🎵 首次播放逻辑专项测试") - print("=" * 50) - - # 测试场景1: 短句子积累 - print("📝 测试场景1: 短句子积累") - print(" 模拟LLM生成的短句子流:") - - short_sentences = [ - "你好", - "嗯", - "这个", - "问题", - "确实", - "需要", - "仔细", - "思考", - ] - - buffer = [] - total_length = 0 - first_trigger = None - - for i, sentence in enumerate(short_sentences): - buffer.append(sentence) - total_length = sum(len(s) for s in buffer) - - # 模拟首次播放逻辑 - trigger_reason = None - if total_length >= 40 and len(buffer) >= 2: - trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子" - elif len(buffer) >= 5: - trigger_reason = f"缓冲区达到最大值{len(buffer)}" - - if trigger_reason and first_trigger is None: - first_trigger = i + 1 - print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}") - print(f" 📝 发送内容: '{''.join(buffer)}'") - break - else: - print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)") - - if first_trigger is None: - print(" ⚠️ 未触发TTS(需要超时机制)") - print() - - # 测试场景2: 中等长度句子 - print("📝 测试场景2: 中等长度句子") - medium_sentences = [ - "你好,很高兴见到你。", - "今天天气真不错呢。", - "我们可以一起去公园玩吗?", - "我想那会是一个很好的主意。", - ] - - buffer = [] - total_length = 0 - first_trigger = None - - for i, sentence in enumerate(medium_sentences): - buffer.append(sentence) - total_length = sum(len(s) for s in buffer) - - # 模拟首次播放逻辑 - trigger_reason = None - if total_length >= 40 and len(buffer) >= 2: - trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子" - elif len(sentence) >= 25 and sentence.endswith(('。', '!', '?', '.', '!', '?')) and len(buffer) >= 2: - trigger_reason = f"长句子{len(sentence)}字符+缓冲内容" - - if trigger_reason and first_trigger is None: - first_trigger = i + 1 - print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}") - print(f" 📝 发送内容: '{''.join(buffer)}'") - break - else: - print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)") - - if first_trigger is None: - print(" ⚠️ 未触发TTS(需要超时机制)") - print() - - # 测试场景3: 长句子 - print("📝 测试场景3: 长句子") - long_sentences = [ - "你好,", - "我认为这个问题需要我们从多个角度来分析。", - "首先,让我们仔细了解一下具体情况。", - ] - - buffer = [] - total_length = 0 - first_trigger = None - - for i, sentence in enumerate(long_sentences): - buffer.append(sentence) - total_length = sum(len(s) for s in buffer) - - # 模拟首次播放逻辑 - trigger_reason = None - if total_length >= 40 and len(buffer) >= 2: - trigger_reason = f"总长度{total_length}字符,{len(buffer)}个句子" - elif len(sentence) >= 25 and sentence.endswith(('。', '!', '?', '.', '!', '?')) and len(buffer) >= 2: - trigger_reason = f"长句子{len(sentence)}字符+缓冲内容" - - if trigger_reason and first_trigger is None: - first_trigger = i + 1 - print(f" 🎵 第{first_trigger}句触发TTS: {trigger_reason}") - print(f" 📝 发送内容: '{''.join(buffer)}'") - break - else: - print(f" ⏳ 第{i+1}句: '{sentence}' (累计: {total_length}字符, {len(buffer)}个句子)") - - if first_trigger is None: - print(" ⚠️ 未触发TTS(需要超时机制)") - print() - - # 测试场景4: 超长单句 - print("📝 测试场景4: 超长单句") - ultra_long_sentence = "根据我的分析,这个问题的解决方案需要综合考虑多个因素,包括时间成本、资源投入以及最终的实施效果。" - - buffer = ["你好"] - buffer.append(ultra_long_sentence) - total_length = sum(len(s) for s in buffer) - - if total_length >= 40 and len(buffer) >= 2: - print(f" 🎵 第2句触发TTS: 总长度{total_length}字符,{len(buffer)}个句子") - print(f" 📝 发送内容: '{''.join(buffer)[:50]}...'") - else: - print(" ⚠️ 未触发TTS") - print() - -def show_optimization_comparison(): - """显示优化对比""" - print("📈 首次播放逻辑优化对比") - print("=" * 50) - - comparison = { - "优化前": { - "触发条件": "任何完整句子或长句子", - "最小长度": "无明确要求", - "积攒机制": "基本没有", - "可能导致": "播放卡顿,等待数据", - }, - "优化后": { - "触发条件": "40+字符且2+句子 或 25+字符完整句+缓冲", - "最小长度": "总长度40字符或单句25字符", - "积攒机制": "智能积累多个句子", - "超时保护": "5秒超时机制", - "效果": "确保有足够数据才开始播放" - } - } - - for aspect, details in comparison.items(): - print(f"\n🔧 {aspect}:") - for key, value in details.items(): - print(f" • {key}: {value}") - - print(f"\n🎯 核心改进: 确保首次播放有足够的内容,避免因为数据不足导致的播放卡顿") - -if __name__ == "__main__": - if len(sys.argv) > 1 and sys.argv[1] == "--comparison": - show_optimization_comparison() - else: - test_first_playback_logic() - show_optimization_comparison() \ No newline at end of file diff --git a/test_optimization.py b/test_optimization.py deleted file mode 100644 index 4ff70a1..0000000 --- a/test_optimization.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -音频播放优化测试脚本 -用于验证缓冲区优化和播放性能改进 -""" - -import time -import json -import os -import sys - -def test_optimization(): - """测试优化效果""" - print("🧪 音频播放优化测试") - print("=" * 50) - - # 显示优化后的配置参数 - print("📋 优化后的配置参数:") - print(" - 预加载缓冲区: 6个音频块(原:3个)") - print(" - 智能句子缓冲: 最多5个句子(原:3个)") - print(" - 最小触发句子: 2个句子(原:1个)") - print(" - 积累时间窗口: 150ms(原:200ms)") - print(" - TTS任务队列: 20个任务(原:10个)") - print(" - 音频块大小: 1024字节(原:2048字节)") - print(" - 播放冷却期: 0.05秒(原:0.1秒)") - print(" - 长句子触发阈值: 30字符(原:50字符)") - print(" - 播放缓冲区维护: 4个块(原:3个)") - print() - - # 测试智能句子缓冲逻辑 - print("🧠 测试智能句子缓冲逻辑:") - print(" 🔄 首次播放逻辑测试:") - - test_sentences = [ - "你好", # 短句子 - "今天天气怎么样?", # 中等长度 - "我觉得这个方案很不错,我们可以试试看。", # 长句子 - "这是一个超过三十个字符的句子,应该会立即触发TTS生成。", # 超过30字符 - "短句。", # 带标点的短句 - ] - - # 模拟首次播放的缓冲区状态 - tts_buffer = [] - first_playback_started = False - total_buffered_text = "" - trigger_count = 0 - - for i, sentence in enumerate(test_sentences): - tts_buffer.append(sentence) - total_buffered_text = ''.join(tts_buffer) - - # 首次播放逻辑 - if not first_playback_started: - should_trigger = False - trigger_reason = "" - - # 条件1: 总文本长度超过40字符且至少有2个句子 - if len(total_buffered_text) >= 40 and len(tts_buffer) >= 2: - should_trigger = True - trigger_reason = f"总长度{len(total_buffered_text)}字符,{len(tts_buffer)}个句子" - # 条件2: 有1个完整长句子(超过25字符) - elif len(sentence) >= 25 and sentence.endswith(('。', '!', '?', '.', '!', '?')) and len(tts_buffer) >= 2: - should_trigger = True - trigger_reason = f"长句子{len(sentence)}字符+缓冲内容" - # 条件3: 缓冲区达到最大值(5个) - elif len(tts_buffer) >= 5: - should_trigger = True - trigger_reason = f"缓冲区达到最大值{len(tts_buffer)}" - # 条件4: 超过500ms(模拟) - - if should_trigger: - trigger_count += 1 - first_playback_started = True - print(f" 🎵 首次触发TTS: {trigger_reason}") - print(f" 📝 发送内容: '{total_buffered_text[:50]}...'") - tts_buffer = [] - else: - print(f" ⏳ 首次缓冲: '{sentence}' (累计: {len(total_buffered_text)}字符, {len(tts_buffer)}个句子)") - else: - # 正常播放逻辑 - if len(sentence) > 30 or len(tts_buffer) >= 3: - should_trigger = True - trigger_reason = "长句子" if len(sentence) > 30 else "缓冲区满" - - if should_trigger: - trigger_count += 1 - print(f" ✅ 正常触发TTS: {trigger_reason}") - print(f" 📝 发送内容: '{''.join(tts_buffer)[:30]}...'") - tts_buffer = [] - else: - print(f" ⏳ 正常缓冲: '{sentence}'") - - print(f" 📊 总触发次数: {trigger_count}") - print() - - print(" 📋 首次播放优化效果:") - print(" • 确保首句有足够长度(40+字符或25+字符完整句)") - print(" • 积累多个句子避免播放卡顿") - print(" • 5秒超时机制防止无限等待") - print(" • 后续句子正常流式处理") - print() - - # 显示性能监控信息 - print("📊 性能监控功能:") - print(" - 实时缓冲区大小统计") - print(" - 平均和最大缓冲区大小") - print(" - 播放块数和音频大小统计") - print(" - 每5秒自动输出性能报告") - print() - - print("🎯 预期改进效果:") - print(" 1. ✅ 减少音频播放卡顿(更大的缓冲区)") - print(" 2. ✅ 更快的TTS响应(优化的触发条件)") - print(" 3. ✅ 更流畅的播放体验(减少冷却期)") - print(" 4. ✅ 更好的资源利用(更小的音频块)") - print(" 5. ✅ 实时性能监控(调试和优化)") - print() - - print("📝 测试建议:") - print(" 1. 运行主程序观察播放流畅度") - print(" 2. 查看性能统计输出") - print(" 3. 监控缓冲区大小变化") - print(" 4. 测试不同长度的语音响应") - print() - - print("🚀 测试完成!可以运行主程序验证优化效果。") - -def show_optimization_summary(): - """显示优化总结""" - print("📈 音频播放优化总结") - print("=" * 50) - - summary = { - "缓冲区优化": [ - "预加载缓冲区: 3→6个块", - "智能句子缓冲: 3→5个句子", - "最小触发缓冲: 1→2个句子", - "TTS任务队列: 10→20个任务" - ], - "响应性优化": [ - "积累时间窗口: 200ms→150ms", - "长句子触发: 50→30字符", - "中等长度触发: 30→20字符", - "播放冷却期: 0.1s→0.05s" - ], - "播放优化": [ - "音频块大小: 2048→1024字节", - "播放缓冲维护: 3→4个块", - "数据转移: 2→3个块/次" - ], - "监控功能": [ - "实时性能统计", - "缓冲区大小监控", - "自动性能报告", - "播放进度追踪" - ] - } - - for category, improvements in summary.items(): - print(f"\n🔧 {category}:") - for improvement in improvements: - print(f" • {improvement}") - - print(f"\n🎯 总体目标: 减少音频播放卡顿,提升用户体验") - print(f"📊 预期效果: 更流畅的实时语音交互") - -if __name__ == "__main__": - if len(sys.argv) > 1 and sys.argv[1] == "--summary": - show_optimization_summary() - else: - test_optimization() \ No newline at end of file