From f416ef0036d717f53e2aeae6305f5bd5c9a871cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=BD=AE?= Date: Mon, 22 Sep 2025 02:06:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9ENFC=E5=91=BC=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- audio_processes.py | 41 ++++++-- characters/libai.json | 1 + characters/zhubajie.json | 1 + control_system.py | 194 ++++++++++++++++++++++++++++++------- multiprocess_recorder.py | 38 +++++++- nfc_manager.py | 176 +++++++++++++++++++++++++++++++++ test_all_audio_received.py | 65 +++++++++++++ test_nfc_functionality.py | 141 +++++++++++++++++++++++++++ test_raspberry_nfc.py | 131 +++++++++++++++++++++++++ 9 files changed, 743 insertions(+), 45 deletions(-) create mode 100644 nfc_manager.py create mode 100644 test_all_audio_received.py create mode 100644 test_nfc_functionality.py create mode 100644 test_raspberry_nfc.py diff --git a/audio_processes.py b/audio_processes.py index 0d9fffa..5bb9fd8 100644 --- a/audio_processes.py +++ b/audio_processes.py @@ -874,6 +874,12 @@ class OutputProcess: if not self.currently_playing: self.currently_playing = True self.last_audio_chunk_time = time.time() # 记录最后播放时间 + + # 新增:只要有音频播放就设置all_audio_received为True + if not self.all_audio_received: + self.all_audio_received = True + print(f"🎵 音频开始播放,设置all_audio_received=True") + print(f"🎵 播放状态变化: currently_playing = True (开始播放)") print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}") @@ -1168,7 +1174,17 @@ class OutputProcess: self.all_audio_received = False self.completion_sent = False + print(f"🔊 准备处理完整文本,当前tts_speaker: {self.tts_speaker}") self.process_complete_text(complete_text) + print(f"✅ 完整文本处理完成") + continue + + if isinstance(audio_data, str) and audio_data.startswith("UPDATE_TTS_SPEAKER:"): + # 更新TTS speaker配置 + new_speaker = audio_data[19:] # 移除 "UPDATE_TTS_SPEAKER:" 前缀(19个字符) + print(f"📥 输出进程收到TTS speaker更新: {new_speaker}") + self.tts_speaker = new_speaker + print(f"✅ TTS speaker已更新为: {self.tts_speaker}") continue if isinstance(audio_data, str) and audio_data.startswith("FLUSH_TTS_BUFFER:"): @@ -1220,10 +1236,10 @@ class OutputProcess: self.is_playing = True self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期 - # 音频真正开始播放,设置all_audio_received为True - if self.end_signal_received and not self.all_audio_received: + # 备用:在缓冲区转移时也设置all_audio_received为True(兜底机制) + if not self.all_audio_received: self.all_audio_received = True - print(f"🎵 音频开始播放,设置all_audio_received=True") + print(f"🎵 缓冲区转移完成,设置all_audio_received=True(备用机制)") # 确保播放工作线程知道有数据要播放 if not self.currently_playing: # 播放工作线程会自动检测播放缓冲区并开始播放 @@ -1242,10 +1258,10 @@ class OutputProcess: self.is_playing = True self.last_playback_time = 0 - # 强制转移后设置all_audio_received为True + # 备用:强制转移后也设置all_audio_received为True(兜底机制) if not self.all_audio_received: self.all_audio_received = True - print(f"🎵 强制转移预加载缓冲区后,设置all_audio_received=True") + print(f"🎵 强制转移预加载缓冲区后,设置all_audio_received=True(备用机制)") print(f"🎵 强制开始播放音频,播放缓冲区大小: {len(self.playback_buffer)}") @@ -1288,10 +1304,10 @@ class OutputProcess: self.is_playing = True self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期 - # 音频真正开始播放,设置all_audio_received为True - if self.end_signal_received and not self.all_audio_received: + # 备用:最小缓冲区模式下也设置all_audio_received为True(兜底机制) + if not self.all_audio_received: self.all_audio_received = True - print(f"🎵 音频开始播放(最小缓冲区模式),设置all_audio_received=True") + print(f"🎵 音频开始播放(最小缓冲区模式),设置all_audio_received=True(备用机制)") print(f"🎵 开始播放音频(最小缓冲区满足)") print(f"🔍 已重置last_playback_time,避免立即触发冷却期") elif self.is_playing and len(self.playback_buffer) < 2 and len(self.preload_buffer) > 0: @@ -2076,11 +2092,14 @@ class OutputProcess: def process_complete_text(self, text): """处理完整文本 - 强制刷新缓冲区""" + print(f"🔊 process_complete_text 被调用,输入文本: {text}") if not text.strip(): + print("🔊 文本为空,返回") return # 过滤括号内容 filtered_text = self._filter_parentheses_content(text.strip()) + print(f"🔊 过滤后文本: {filtered_text}") if filtered_text: # 重置所有播放完成检测状态 - 开始新的对话 @@ -2096,8 +2115,14 @@ class OutputProcess: print(f"🔊 处理完整文本:已重置所有播放完成检测状态") # 直接添加到缓冲区并强制处理 + print(f"🔊 添加文本到TTS缓冲区: {filtered_text}") self.tts_buffer.append(filtered_text) + print(f"🔊 当前TTS缓冲区大小: {len(self.tts_buffer)}") + print(f"🔊 开始处理TTS缓冲区...") self._process_tts_buffer() + print(f"🔊 TTS缓冲区处理完成") + else: + print("🔊 过滤后文本为空,不添加到缓冲区") # ========== 原有方法保持不变 ========== diff --git a/characters/libai.json b/characters/libai.json index 94784cd..f9a4f8a 100644 --- a/characters/libai.json +++ b/characters/libai.json @@ -5,5 +5,6 @@ "voice": "ICL_zh_male_huzi_v1_tob", "max_tokens": 500, "greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!", + "nfc_uid": "1DC6C90D0D1080", "author": "Claude" } diff --git a/characters/zhubajie.json b/characters/zhubajie.json index be235fc..c4ebfc4 100644 --- a/characters/zhubajie.json +++ b/characters/zhubajie.json @@ -5,5 +5,6 @@ "voice": "zh_male_zhubajie_mars_bigtts", "max_tokens": 500, "greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?", + "nfc_uid": "1DC7C90D0D1080", "author": "Claude" } diff --git a/control_system.py b/control_system.py index df0958e..f7672fd 100644 --- a/control_system.py +++ b/control_system.py @@ -6,27 +6,28 @@ 实现主控制进程和状态管理 """ -import multiprocessing as mp -import queue -import time -import threading -import requests -import json +import asyncio import base64 import gzip -import uuid -import asyncio -import websockets -from typing import Optional, Dict, Any, List -from dataclasses import dataclass, asdict -from enum import Enum +import json +import multiprocessing as mp import os +import queue import sys +import threading +import time +import uuid +from dataclasses import asdict, dataclass +from enum import Enum +from typing import Any, Dict, List, Optional + +import requests +import websockets + +from audio_processes import (ControlCommand, InputProcess, OutputProcess, + ProcessEvent, RecordingState) +from nfc_manager import get_nfc_manager -from audio_processes import ( - InputProcess, OutputProcess, - RecordingState, ControlCommand, ProcessEvent -) def input_process_target(command_queue, event_queue, config): """输入进程的目标函数 - 在子进程中创建InputProcess实例""" @@ -93,6 +94,10 @@ class ControlSystem: # 运行状态 self.running = True + # NFC管理器 + self.nfc_manager = None + self.nfc_enabled = False + # 检查依赖 self._check_dependencies() @@ -274,26 +279,51 @@ class ControlSystem: # 等待校准完成 if self.wait_for_calibration_complete(timeout=30): print("✅ 校准完成") - # 校准完成后播放打招呼 - print("🎭 播放角色打招呼...") - greeting_success = self.play_greeting() - if not greeting_success: - print("⚠️ 打招呼播放失败,继续运行...") + + # 如果启用了NFC,开始持续执行NFC检测,但不播放打招呼或启动监听 + if hasattr(self, 'nfc_enabled') and self.nfc_enabled: + print("📱 开始持续执行NFC检测,等待NFC卡片...") + # NFC检测已经在enable_nfc()中启动,这里只需确认状态 + if self.nfc_manager and self.nfc_manager.running: + print("✅ NFC检测已在运行") + else: + print("⚠️ NFC检测未运行,尝试启动...") + try: + self.nfc_manager.start() + print("✅ NFC检测已启动") + except Exception as e: + print(f"❌ 启动NFC检测失败: {e}") + else: + # 如果没有启用NFC,播放打招呼并启动监听 + print("🎭 播放角色打招呼...") + greeting_success = self.play_greeting() + if not greeting_success: + print("⚠️ 打招呼播放失败,继续运行...") + + print("🎯 启动音频监听...") + success = self.start_monitoring() + if success: + print("✅ 监听已启动") + else: + print("⚠️ 监听启动失败") else: print("⚠️ 校准超时,继续运行...") - # 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听 - # if auto_monitoring: - # # 自动启动监听 - # print("🎯 自动启动音频监听...") - # success = self.start_monitoring() - # if success: - # print("✅ 监听已启动") - # else: - # print("⚠️ 监听启动失败") + # 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听 + # if auto_monitoring: + # # 自动启动监听 + # print("🎯 自动启动音频监听...") + # success = self.start_monitoring() + # if success: + # print("✅ 监听已启动") + # else: + # print("⚠️ 监听启动失败") print("=" * 60) - print("🎙️ 系统就绪,开始检测语音...") + if self.nfc_enabled: + print("📱 NFC模式:等待NFC卡片进行角色切换...") + else: + print("🎙️ 系统就绪,开始检测语音...") print("=" * 60) # 启动主控制循环 @@ -410,6 +440,11 @@ class ControlSystem: print("🎯 状态:IDLE(播放刚完成,等待延迟启用录音)") self._just_finished_playing = False # 重置标志 else: + # 如果启用了NFC,不自动启动音频监听,等待NFC触发 + if self.nfc_enabled: + print("🎯 状态:IDLE(NFC模式已启用,等待NFC卡片触发)") + return + # 检查监听状态 monitoring_status = self.get_monitoring_status() if monitoring_status and monitoring_status['enabled']: @@ -514,7 +549,7 @@ class ControlSystem: def delayed_enable_recording(): import threading import time - + # 等待更长时间确保音频完全停止 time.sleep(2.0) # 增加到2秒 @@ -1256,7 +1291,7 @@ class ControlSystem: def _filter_parentheses_content(self, text): """过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植""" import re - + # 移除中文括号内容:(内容) filtered_text = re.sub(r'([^)]*)', '', text) # 移除英文括号内容:(content) @@ -1289,10 +1324,13 @@ class ControlSystem: self.state = RecordingState.PLAYING # 发送打招呼文本到TTS + print(f"📡 准备发送打招呼文本到输出进程: {greeting_text}") success = self._send_text_to_output_process(greeting_text) if not success: print("❌ 打招呼TTS生成失败") return False + else: + print(f"✅ 打招呼文本已成功发送到输出进程") # 手动设置LLM完成状态(因为打招呼没有LLM生成过程) self._notify_llm_complete() @@ -1529,6 +1567,94 @@ class ControlSystem: print(f" 成功率: {success_rate:.1f}%") print("👋 系统已关闭") + + def enable_nfc(self): + """启用NFC功能""" + if self.nfc_enabled: + print("⚠️ NFC功能已启用") + return + + try: + # 获取NFC管理器单例 + self.nfc_manager = get_nfc_manager() + self.nfc_manager.set_character_switch_callback(self._on_character_switch) + self.nfc_manager.start() + self.nfc_enabled = True + print("✅ NFC功能已启用") + except Exception as e: + print(f"❌ 启用NFC功能失败: {e}") + + def disable_nfc(self): + """禁用NFC功能""" + if not self.nfc_enabled: + return + + try: + if self.nfc_manager: + self.nfc_manager.stop() + self.nfc_enabled = False + print("🛑 NFC功能已禁用") + except Exception as e: + print(f"❌ 禁用NFC功能失败: {e}") + + def _on_character_switch(self, character_name: str): + """角色切换回调函数""" + print(f"🎭 NFC触发角色切换: {character_name}") + + # 检查角色配置是否存在 + character_config = self._load_character_config(character_name) + if not character_config: + print(f"❌ 角色配置不存在: {character_name}") + return + + # 停止当前录音(如果有) + if self.state == RecordingState.RECORDING: + print("🛑 停止当前录音...") + self.input_command_queue.put(ControlCommand('stop_recording')) + + # 停止当前监听 + if hasattr(self, '_monitoring_active') and self._monitoring_active: + print("🛑 停止当前监听...") + self.input_command_queue.put(ControlCommand('stop_monitoring')) + + # 更新角色配置 + old_character = self.config['processing']['character'] + self.config['processing']['character'] = character_name + + # 更新TTS语音配置 + if character_config and "voice" in character_config: + self.api_config['tts']['speaker'] = character_config["voice"] + print(f"🎵 更新TTS语音: {character_config['voice']}") + + # 向输出进程发送更新TTS speaker的命令 + try: + update_command = f"UPDATE_TTS_SPEAKER:{character_config['voice']}" + self.output_audio_queue.put(update_command) + print(f"📡 已发送TTS speaker更新命令到输出进程") + except Exception as e: + print(f"❌ 发送TTS speaker更新命令失败: {e}") + + print(f"🔄 角色已切换: {old_character} -> {character_name}") + + # 播放新角色打招呼 + print("🎭 播放新角色打招呼...") + greeting_success = self.play_greeting() + if greeting_success: + print("✅ 角色切换完成") + else: + print("⚠️ 打招呼播放失败,继续运行...") + + + def get_nfc_status(self): + """获取NFC状态""" + if not self.nfc_enabled or not self.nfc_manager: + return {"enabled": False, "current_character": None} + + return { + "enabled": True, + "current_character": self.nfc_manager.get_current_character(), + "running": self.nfc_manager.running + } def main(): """主函数""" @@ -1562,4 +1688,4 @@ def main(): control_system.start() if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/multiprocess_recorder.py b/multiprocess_recorder.py index ab94ddf..ef5541d 100644 --- a/multiprocess_recorder.py +++ b/multiprocess_recorder.py @@ -11,6 +11,7 @@ import sys import argparse import json import time +import subprocess from typing import Dict, Any def check_dependencies(): @@ -47,6 +48,23 @@ def check_dependencies(): return True +def check_nfc_dependencies(): + """检查NFC相关依赖""" + try: + # 检查nfc-list命令是否存在 + result = subprocess.run(['which', 'nfc-list'], capture_output=True, text=True) + if result.returncode != 0: + print("❌ 未找到nfc-list命令") + print(" 请安装libnfc工具:") + print(" Ubuntu/Debian: sudo apt-get install libnfc-bin") + print(" macOS: brew install libnfc") + return False + print("✅ NFC工具已安装") + return True + except Exception as e: + print(f"❌ NFC依赖检查失败: {e}") + return False + def check_environment(): """检查运行环境""" print("🔍 检查运行环境...") @@ -190,6 +208,7 @@ def main(): python multiprocess_recorder.py -c zhubajie # 指定角色 python multiprocess_recorder.py -l # 列出角色 python multiprocess_recorder.py --create-config # 创建配置文件 + python multiprocess_recorder.py --enable-nfc # 启用NFC角色切换 """ ) @@ -205,6 +224,8 @@ def main(): help='检查运行环境') parser.add_argument('--verbose', '-v', action='store_true', help='详细输出') + parser.add_argument('--enable-nfc', action='store_true', + help='启用NFC角色切换功能') args = parser.parse_args() @@ -288,9 +309,20 @@ def main(): if args.verbose: control_system.config['system']['log_level'] = "DEBUG" - # 启动系统(自动校准和监听) - print("🚀 启动系统(包含自动校准和监听)...") - control_system.start(auto_calibration=True, auto_monitoring=True) + # 启用NFC功能(如果指定)- 在系统启动前启用,确保校准完成后能立即开始NFC检测 + nfc_enabled = False + if args.enable_nfc: + print("📱 检查NFC依赖...") + if not check_nfc_dependencies(): + print("❌ NFC依赖检查失败,无法启用NFC功能") + else: + print("📱 启用NFC角色切换功能...") + control_system.enable_nfc() + nfc_enabled = True + + # 启动系统(自动校准,但根据NFC状态决定是否启动监听) + print("🚀 启动系统(包含自动校准)...") + control_system.start(auto_calibration=True, auto_monitoring=not nfc_enabled) except KeyboardInterrupt: print("\n👋 用户中断") diff --git a/nfc_manager.py b/nfc_manager.py new file mode 100644 index 0000000..f2e4155 --- /dev/null +++ b/nfc_manager.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +NFC管理模块 +实现NFC读取和角色切换功能 +""" + +import json +import os +import re +import subprocess +import threading +import time +from pathlib import Path +from typing import Callable, Dict, Optional + + +class NFCManager: + """NFC管理器,负责NFC读取和角色切换""" + + def __init__(self, characters_dir: str = "characters"): + self.characters_dir = Path(characters_dir) + self.uid_to_character = {} + self.running = False + self.read_thread = None + self.current_uid = None + self.character_switch_callback: Optional[Callable] = None + + # 加载角色NFC映射 + self._load_character_mappings() + + def _load_character_mappings(self): + """加载角色NFC映射""" + self.uid_to_character = {} + + if not self.characters_dir.exists(): + print(f"⚠️ 角色目录不存在: {self.characters_dir}") + return + + for config_file in self.characters_dir.glob("*.json"): + try: + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + + if 'nfc_uid' in config: + uid = config['nfc_uid'].upper().strip() + character_name = config_file.stem + self.uid_to_character[uid] = character_name + print(f"✅ 加载角色映射: {character_name} -> {uid}") + + except Exception as e: + print(f"❌ 加载角色配置失败 {config_file}: {e}") + + if not self.uid_to_character: + print("⚠️ 未找到任何带NFC UID的角色配置") + + def _read_uid(self) -> Optional[str]: + """读取NFC UID""" + os.environ['LIBNFC_DRIVER'] = 'pn532_i2c' + os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1' + + try: + result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10) + if result.returncode == 0: + match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout) + if match: + return match.group(1).strip().replace(' ', '').upper() + except subprocess.TimeoutExpired: + print("⚠️ NFC读取超时") + except Exception as e: + print(f"❌ NFC读取失败: {e}") + + return None + + def _read_loop(self): + """NFC读取循环""" + print("🔄 启动NFC读取循环...") + + while self.running: + try: + uid = self._read_uid() + + if uid and uid != self.current_uid: + print(f"🎯 检测到NFC卡片: {uid}") + + if uid in self.uid_to_character: + character_name = self.uid_to_character[uid] + print(f"🎭 切换角色: {character_name}") + + # 更新当前UID + self.current_uid = uid + + # 调用角色切换回调 + if self.character_switch_callback: + self.character_switch_callback(character_name) + else: + print(f"⚠️ 未找到UID对应角色: {uid}") + + + except Exception as e: + print(f"❌ NFC读取循环出错: {e}") + + # 等待5秒 + time.sleep(1) + + def set_character_switch_callback(self, callback: Callable): + """设置角色切换回调函数""" + self.character_switch_callback = callback + + def start(self): + """启动NFC管理器""" + if self.running: + print("⚠️ NFC管理器已在运行") + return + + self.running = True + self.read_thread = threading.Thread(target=self._read_loop, daemon=True) + self.read_thread.start() + print("✅ NFC管理器已启动") + + def stop(self): + """停止NFC管理器""" + if not self.running: + return + + self.running = False + if self.read_thread: + self.read_thread.join(timeout=5) + + print("🛑 NFC管理器已停止") + + def get_current_character(self) -> Optional[str]: + """获取当前角色""" + if self.current_uid and self.current_uid in self.uid_to_character: + return self.uid_to_character[self.current_uid] + return None + + def reload_mappings(self): + """重新加载角色映射""" + print("🔄 重新加载角色NFC映射...") + self._load_character_mappings() + + def __del__(self): + """析构函数""" + self.stop() + +# 单例模式 +_nfc_manager_instance = None + +def get_nfc_manager(characters_dir: str = "characters") -> NFCManager: + """获取NFC管理器单例""" + global _nfc_manager_instance + if _nfc_manager_instance is None: + _nfc_manager_instance = NFCManager(characters_dir) + return _nfc_manager_instance + +if __name__ == "__main__": + # 测试NFC管理器 + def on_character_switch(character_name): + print(f"🎭 角色切换回调: {character_name}") + + nfc_manager = get_nfc_manager() + nfc_manager.set_character_switch_callback(on_character_switch) + + try: + nfc_manager.start() + print("🔄 NFC管理器运行中,按Ctrl+C退出...") + + while True: + time.sleep(1) + + except KeyboardInterrupt: + print("\n👋 用户中断") + finally: + nfc_manager.stop() diff --git a/test_all_audio_received.py b/test_all_audio_received.py new file mode 100644 index 0000000..b571963 --- /dev/null +++ b/test_all_audio_received.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +测试 all_audio_received 修改后的逻辑 +""" + +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +def test_all_audio_received_logic(): + """测试 all_audio_received 的设置逻辑""" + print("🧪 测试 all_audio_received 修改后的逻辑") + print("=" * 50) + + # 模拟播放工作线程的逻辑 + class MockAudioProcess: + def __init__(self): + self.currently_playing = False + self.all_audio_received = False + self.last_audio_chunk_time = 0 + + def simulate_audio_playback(self): + """模拟音频播放过程""" + print("🔊 模拟音频播放开始...") + + # 模拟第875行的逻辑 + if not self.currently_playing: + self.currently_playing = True + import time + self.last_audio_chunk_time = time.time() + + # 新增:只要有音频播放就设置all_audio_received为True + if not self.all_audio_received: + self.all_audio_received = True + print(f"🎵 音频开始播放,设置all_audio_received=True") + + print(f"🎵 播放状态变化: currently_playing = True (开始播放)") + print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}") + + # 测试1:正常播放场景 + print("\n📋 测试1:正常播放场景") + mock1 = MockAudioProcess() + print(f"初始状态: currently_playing={mock1.currently_playing}, all_audio_received={mock1.all_audio_received}") + mock1.simulate_audio_playback() + print(f"播放后状态: currently_playing={mock1.currently_playing}, all_audio_received={mock1.all_audio_received}") + + # 测试2:重复播放场景(不应该重复设置) + print("\n📋 测试2:重复播放场景") + mock2 = MockAudioProcess() + mock2.all_audio_received = True # 已经设置为True + print(f"初始状态: currently_playing={mock2.currently_playing}, all_audio_received={mock2.all_audio_received}") + mock2.simulate_audio_playback() + print(f"播放后状态: currently_playing={mock2.currently_playing}, all_audio_received={mock2.all_audio_received}") + + print("\n✅ 所有测试完成") + print("\n📊 修改总结:") + print(" • 在播放工作线程中添加了 all_audio_received = True 的设置") + print(" • 移除了对 end_signal_received 的依赖") + print(" • 保留了原有逻辑作为备用机制") + print(" • 确保只要有音频播放就会设置 all_audio_received = True") + +if __name__ == "__main__": + test_all_audio_received_logic() \ No newline at end of file diff --git a/test_nfc_functionality.py b/test_nfc_functionality.py new file mode 100644 index 0000000..99ab333 --- /dev/null +++ b/test_nfc_functionality.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +测试NFC角色切换功能 +""" + +import json +import os +import sys +from nfc_manager import NFCManager + +def test_character_loading(): + """测试角色配置加载""" + print("🔍 测试角色配置加载...") + + # 创建测试NFC管理器 + nfc_manager = NFCManager() + + # 检查映射是否正确加载 + expected_mappings = { + '1DC6C90D0D1080': 'libai', + '1DC7C90D0D1080': 'zhubajie' + } + + for uid, character in expected_mappings.items(): + if uid in nfc_manager.uid_to_character: + actual_character = nfc_manager.uid_to_character[uid] + if actual_character == character: + print(f"✅ {character} -> {uid}") + else: + print(f"❌ {character} -> {uid} (实际: {actual_character})") + else: + print(f"❌ 未找到UID: {uid}") + + return len(nfc_manager.uid_to_character) == len(expected_mappings) + +def test_uid_reading(): + """测试UID读取功能(需要真实NFC设备)""" + print("\n🔍 测试UID读取功能...") + + nfc_manager = NFCManager() + + try: + uid = nfc_manager._read_uid() + if uid: + print(f"✅ 读取到UID: {uid}") + return True + else: + print("⚠️ 未读取到UID(可能没有NFC设备或卡片)") + return False + except Exception as e: + print(f"❌ UID读取失败: {e}") + return False + +def test_character_switch_callback(): + """测试角色切换回调""" + print("\n🔍 测试角色切换回调...") + + nfc_manager = NFCManager() + + def callback(character_name): + print(f"🎭 角色切换回调被调用: {character_name}") + + nfc_manager.set_character_switch_callback(callback) + + # 模拟角色切换 + test_uid = '1DC6C90D0D1080' + if test_uid in nfc_manager.uid_to_character: + character_name = nfc_manager.uid_to_character[test_uid] + print(f"📡 模拟检测到UID: {test_uid}") + print(f"🎭 应该切换到角色: {character_name}") + + # 手动调用回调函数 + if nfc_manager.character_switch_callback: + nfc_manager.character_switch_callback(character_name) + return True + + return False + +def test_character_configs(): + """测试角色配置文件""" + print("\n🔍 测试角色配置文件...") + + characters_dir = "characters" + test_files = ['libai.json', 'zhubajie.json'] + + for filename in test_files: + filepath = os.path.join(characters_dir, filename) + if os.path.exists(filepath): + try: + with open(filepath, 'r', encoding='utf-8') as f: + config = json.load(f) + + required_fields = ['name', 'nfc_uid', 'greeting'] + missing_fields = [field for field in required_fields if field not in config] + + if not missing_fields: + print(f"✅ {filename}: {config['name']} (NFC: {config['nfc_uid']})") + else: + print(f"❌ {filename}: 缺少字段 {missing_fields}") + + except Exception as e: + print(f"❌ {filename}: 读取失败 - {e}") + else: + print(f"❌ {filename}: 文件不存在") + +def main(): + """主测试函数""" + print("🧪 NFC角色切换功能测试") + print("=" * 50) + + # 测试角色配置文件 + test_character_configs() + + # 测试角色加载 + loading_success = test_character_loading() + + # 测试UID读取 + reading_success = test_uid_reading() + + # 测试角色切换回调 + callback_success = test_character_switch_callback() + + print("\n" + "=" * 50) + print("📊 测试结果:") + print(f" 角色配置加载: {'✅ 通过' if loading_success else '❌ 失败'}") + print(f" UID读取功能: {'✅ 通过' if reading_success else '⚠️ 需要NFC设备'}") + print(f" 角色切换回调: {'✅ 通过' if callback_success else '❌ 失败'}") + + if loading_success and callback_success: + print("\n🎉 核心功能测试通过!") + print("💡 提示: 要测试完整的NFC功能,请:") + print(" 1. 安装libnfc工具") + print(" 2. 连接NFC读取器") + print(" 3. 运行: python multiprocess_recorder.py --enable-nfc") + else: + print("\n❌ 部分测试失败,请检查配置") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_raspberry_nfc.py b/test_raspberry_nfc.py new file mode 100644 index 0000000..17d0eb8 --- /dev/null +++ b/test_raspberry_nfc.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +测试树莓派上的NFC功能 +""" + +import subprocess +import os +import re +import sys +import time + +def test_nfc_connection(): + """测试NFC连接""" + print("🔍 测试NFC连接...") + + # 设置环境变量 + os.environ['LIBNFC_DRIVER'] = 'pn532_i2c' + os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1' + + try: + result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10) + print(f"NFC输出: {result.stdout}") + print(f"返回码: {result.returncode}") + + if result.returncode == 0: + if "NFC device: user defined device opened" in result.stdout: + print("✅ NFC设备连接成功") + return True + else: + print("⚠️ NFC设备未正确初始化") + return False + else: + print(f"❌ NFC设备连接失败: {result.stderr}") + return False + except Exception as e: + print(f"❌ NFC测试异常: {e}") + return False + +def test_nfc_reading(): + """测试NFC读取""" + print("\n🔍 测试NFC读取...") + + # 设置环境变量 + os.environ['LIBNFC_DRIVER'] = 'pn532_i2c' + os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1' + + try: + result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10) + if result.returncode == 0: + match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout) + if match: + uid = match.group(1).strip().replace(' ', '').upper() + print(f"✅ 读取到UID: {uid}") + return uid + else: + print("⚠️ 未检测到NFC卡片") + return None + else: + print(f"❌ NFC读取失败: {result.stderr}") + return None + except Exception as e: + print(f"❌ NFC读取异常: {e}") + return None + +def test_nfc_manager(): + """测试NFC管理器""" + print("\n🔍 测试NFC管理器...") + + try: + # 导入NFC管理器 + sys.path.append('/home/zhuchaowe/Local-Voice') + from nfc_manager import get_nfc_manager + + # 创建NFC管理器 + nfc_manager = get_nfc_manager() + + # 检查映射 + print(f"角色映射: {nfc_manager.uid_to_character}") + + # 测试读取 + uid = nfc_manager._read_uid() + if uid: + print(f"✅ NFC管理器读取到UID: {uid}") + + # 检查映射 + if uid in nfc_manager.uid_to_character: + character = nfc_manager.uid_to_character[uid] + print(f"🎭 对应角色: {character}") + else: + print("⚠️ UID未映射到角色") + else: + print("⚠️ 未读取到UID") + + return True + except Exception as e: + print(f"❌ NFC管理器测试失败: {e}") + import traceback + traceback.print_exc() + return False + +def main(): + """主测试函数""" + print("🧪 树莓派NFC功能测试") + print("=" * 50) + + # 测试NFC连接 + connection_ok = test_nfc_connection() + + # 测试NFC读取 + uid = test_nfc_reading() + + # 测试NFC管理器 + manager_ok = test_nfc_manager() + + print("\n" + "=" * 50) + print("📊 测试结果:") + print(f" NFC连接: {'✅ 通过' if connection_ok else '❌ 失败'}") + print(f" UID读取: {'✅ 通过' if uid else '⚠️ 无卡片'}") + print(f" NFC管理器: {'✅ 通过' if manager_ok else '❌ 失败'}") + + if uid: + print(f"\n🎯 当前检测到的UID: {uid}") + print("💡 提示: 您现在可以使用以下命令测试完整的NFC角色切换功能:") + print(" cd Local-Voice && python multiprocess_recorder.py --enable-nfc") + else: + print(f"\n💡 提示: 请确保NFC读取器已连接并放置了NFC卡片") + +if __name__ == "__main__": + main() \ No newline at end of file