增NFC呼功能

This commit is contained in:
朱潮 2025-09-22 02:06:00 +08:00
parent 67549566ec
commit f416ef0036
9 changed files with 743 additions and 45 deletions

View File

@ -874,6 +874,12 @@ class OutputProcess:
if not self.currently_playing:
self.currently_playing = True
self.last_audio_chunk_time = time.time() # 记录最后播放时间
# 新增只要有音频播放就设置all_audio_received为True
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 音频开始播放设置all_audio_received=True")
print(f"🎵 播放状态变化: currently_playing = True (开始播放)")
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
@ -1168,7 +1174,17 @@ class OutputProcess:
self.all_audio_received = False
self.completion_sent = False
print(f"🔊 准备处理完整文本当前tts_speaker: {self.tts_speaker}")
self.process_complete_text(complete_text)
print(f"✅ 完整文本处理完成")
continue
if isinstance(audio_data, str) and audio_data.startswith("UPDATE_TTS_SPEAKER:"):
# 更新TTS speaker配置
new_speaker = audio_data[19:] # 移除 "UPDATE_TTS_SPEAKER:" 前缀19个字符
print(f"📥 输出进程收到TTS speaker更新: {new_speaker}")
self.tts_speaker = new_speaker
print(f"✅ TTS speaker已更新为: {self.tts_speaker}")
continue
if isinstance(audio_data, str) and audio_data.startswith("FLUSH_TTS_BUFFER:"):
@ -1220,10 +1236,10 @@ class OutputProcess:
self.is_playing = True
self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期
# 音频真正开始播放设置all_audio_received为True
if self.end_signal_received and not self.all_audio_received:
# 备用在缓冲区转移时也设置all_audio_received为True兜底机制
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 音频开始播放设置all_audio_received=True")
print(f"🎵 缓冲区转移完成设置all_audio_received=True备用机制")
# 确保播放工作线程知道有数据要播放
if not self.currently_playing:
# 播放工作线程会自动检测播放缓冲区并开始播放
@ -1242,10 +1258,10 @@ class OutputProcess:
self.is_playing = True
self.last_playback_time = 0
# 强制转移后设置all_audio_received为True
# 备用:强制转移后设置all_audio_received为True(兜底机制)
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 强制转移预加载缓冲区后设置all_audio_received=True")
print(f"🎵 强制转移预加载缓冲区后设置all_audio_received=True(备用机制)")
print(f"🎵 强制开始播放音频,播放缓冲区大小: {len(self.playback_buffer)}")
@ -1288,10 +1304,10 @@ class OutputProcess:
self.is_playing = True
self.last_playback_time = 0 # 重置播放时间,避免立即触发冷却期
# 音频真正开始播放设置all_audio_received为True
if self.end_signal_received and not self.all_audio_received:
# 备用最小缓冲区模式下也设置all_audio_received为True兜底机制
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 音频开始播放最小缓冲区模式设置all_audio_received=True")
print(f"🎵 音频开始播放最小缓冲区模式设置all_audio_received=True(备用机制)")
print(f"🎵 开始播放音频(最小缓冲区满足)")
print(f"🔍 已重置last_playback_time避免立即触发冷却期")
elif self.is_playing and len(self.playback_buffer) < 2 and len(self.preload_buffer) > 0:
@ -2076,11 +2092,14 @@ class OutputProcess:
def process_complete_text(self, text):
"""处理完整文本 - 强制刷新缓冲区"""
print(f"🔊 process_complete_text 被调用,输入文本: {text}")
if not text.strip():
print("🔊 文本为空,返回")
return
# 过滤括号内容
filtered_text = self._filter_parentheses_content(text.strip())
print(f"🔊 过滤后文本: {filtered_text}")
if filtered_text:
# 重置所有播放完成检测状态 - 开始新的对话
@ -2096,8 +2115,14 @@ class OutputProcess:
print(f"🔊 处理完整文本:已重置所有播放完成检测状态")
# 直接添加到缓冲区并强制处理
print(f"🔊 添加文本到TTS缓冲区: {filtered_text}")
self.tts_buffer.append(filtered_text)
print(f"🔊 当前TTS缓冲区大小: {len(self.tts_buffer)}")
print(f"🔊 开始处理TTS缓冲区...")
self._process_tts_buffer()
print(f"🔊 TTS缓冲区处理完成")
else:
print("🔊 过滤后文本为空,不添加到缓冲区")
# ========== 原有方法保持不变 ==========

View File

@ -5,5 +5,6 @@
"voice": "ICL_zh_male_huzi_v1_tob",
"max_tokens": 500,
"greeting": "吾乃李白,字太白,号青莲居士。今天有幸与君相会,让我们畅谈诗词人生吧!",
"nfc_uid": "1DC6C90D0D1080",
"author": "Claude"
}

View File

@ -5,5 +5,6 @@
"voice": "zh_male_zhubajie_mars_bigtts",
"max_tokens": 500,
"greeting": "嘿!俺老猪来也!今天咱聊点啥好吃的?要不要一起去化缘啊?",
"nfc_uid": "1DC7C90D0D1080",
"author": "Claude"
}

View File

@ -6,27 +6,28 @@
实现主控制进程和状态管理
"""
import multiprocessing as mp
import queue
import time
import threading
import requests
import json
import asyncio
import base64
import gzip
import uuid
import asyncio
import websockets
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from enum import Enum
import json
import multiprocessing as mp
import os
import queue
import sys
import threading
import time
import uuid
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
import requests
import websockets
from audio_processes import (ControlCommand, InputProcess, OutputProcess,
ProcessEvent, RecordingState)
from nfc_manager import get_nfc_manager
from audio_processes import (
InputProcess, OutputProcess,
RecordingState, ControlCommand, ProcessEvent
)
def input_process_target(command_queue, event_queue, config):
"""输入进程的目标函数 - 在子进程中创建InputProcess实例"""
@ -93,6 +94,10 @@ class ControlSystem:
# 运行状态
self.running = True
# NFC管理器
self.nfc_manager = None
self.nfc_enabled = False
# 检查依赖
self._check_dependencies()
@ -274,26 +279,51 @@ class ControlSystem:
# 等待校准完成
if self.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
# 校准完成后播放打招呼
print("🎭 播放角色打招呼...")
greeting_success = self.play_greeting()
if not greeting_success:
print("⚠️ 打招呼播放失败,继续运行...")
# 如果启用了NFC开始持续执行NFC检测但不播放打招呼或启动监听
if hasattr(self, 'nfc_enabled') and self.nfc_enabled:
print("📱 开始持续执行NFC检测等待NFC卡片...")
# NFC检测已经在enable_nfc()中启动,这里只需确认状态
if self.nfc_manager and self.nfc_manager.running:
print("✅ NFC检测已在运行")
else:
print("⚠️ NFC检测未运行尝试启动...")
try:
self.nfc_manager.start()
print("✅ NFC检测已启动")
except Exception as e:
print(f"❌ 启动NFC检测失败: {e}")
else:
# 如果没有启用NFC播放打招呼并启动监听
print("🎭 播放角色打招呼...")
greeting_success = self.play_greeting()
if not greeting_success:
print("⚠️ 打招呼播放失败,继续运行...")
print("🎯 启动音频监听...")
success = self.start_monitoring()
if success:
print("✅ 监听已启动")
else:
print("⚠️ 监听启动失败")
else:
print("⚠️ 校准超时,继续运行...")
# 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听
# if auto_monitoring:
# # 自动启动监听
# print("🎯 自动启动音频监听...")
# success = self.start_monitoring()
# if success:
# print("✅ 监听已启动")
# else:
# print("⚠️ 监听启动失败")
# 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听
# if auto_monitoring:
# # 自动启动监听
# print("🎯 自动启动音频监听...")
# success = self.start_monitoring()
# if success:
# print("✅ 监听已启动")
# else:
# print("⚠️ 监听启动失败")
print("=" * 60)
print("🎙️ 系统就绪,开始检测语音...")
if self.nfc_enabled:
print("📱 NFC模式等待NFC卡片进行角色切换...")
else:
print("🎙️ 系统就绪,开始检测语音...")
print("=" * 60)
# 启动主控制循环
@ -410,6 +440,11 @@ class ControlSystem:
print("🎯 状态IDLE播放刚完成等待延迟启用录音")
self._just_finished_playing = False # 重置标志
else:
# 如果启用了NFC不自动启动音频监听等待NFC触发
if self.nfc_enabled:
print("🎯 状态IDLENFC模式已启用等待NFC卡片触发")
return
# 检查监听状态
monitoring_status = self.get_monitoring_status()
if monitoring_status and monitoring_status['enabled']:
@ -514,7 +549,7 @@ class ControlSystem:
def delayed_enable_recording():
import threading
import time
# 等待更长时间确保音频完全停止
time.sleep(2.0) # 增加到2秒
@ -1256,7 +1291,7 @@ class ControlSystem:
def _filter_parentheses_content(self, text):
"""过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植"""
import re
# 移除中文括号内容:(内容)
filtered_text = re.sub(r'[^]*', '', text)
# 移除英文括号内容:(content)
@ -1289,10 +1324,13 @@ class ControlSystem:
self.state = RecordingState.PLAYING
# 发送打招呼文本到TTS
print(f"📡 准备发送打招呼文本到输出进程: {greeting_text}")
success = self._send_text_to_output_process(greeting_text)
if not success:
print("❌ 打招呼TTS生成失败")
return False
else:
print(f"✅ 打招呼文本已成功发送到输出进程")
# 手动设置LLM完成状态因为打招呼没有LLM生成过程
self._notify_llm_complete()
@ -1529,6 +1567,94 @@ class ControlSystem:
print(f" 成功率: {success_rate:.1f}%")
print("👋 系统已关闭")
def enable_nfc(self):
"""启用NFC功能"""
if self.nfc_enabled:
print("⚠️ NFC功能已启用")
return
try:
# 获取NFC管理器单例
self.nfc_manager = get_nfc_manager()
self.nfc_manager.set_character_switch_callback(self._on_character_switch)
self.nfc_manager.start()
self.nfc_enabled = True
print("✅ NFC功能已启用")
except Exception as e:
print(f"❌ 启用NFC功能失败: {e}")
def disable_nfc(self):
"""禁用NFC功能"""
if not self.nfc_enabled:
return
try:
if self.nfc_manager:
self.nfc_manager.stop()
self.nfc_enabled = False
print("🛑 NFC功能已禁用")
except Exception as e:
print(f"❌ 禁用NFC功能失败: {e}")
def _on_character_switch(self, character_name: str):
"""角色切换回调函数"""
print(f"🎭 NFC触发角色切换: {character_name}")
# 检查角色配置是否存在
character_config = self._load_character_config(character_name)
if not character_config:
print(f"❌ 角色配置不存在: {character_name}")
return
# 停止当前录音(如果有)
if self.state == RecordingState.RECORDING:
print("🛑 停止当前录音...")
self.input_command_queue.put(ControlCommand('stop_recording'))
# 停止当前监听
if hasattr(self, '_monitoring_active') and self._monitoring_active:
print("🛑 停止当前监听...")
self.input_command_queue.put(ControlCommand('stop_monitoring'))
# 更新角色配置
old_character = self.config['processing']['character']
self.config['processing']['character'] = character_name
# 更新TTS语音配置
if character_config and "voice" in character_config:
self.api_config['tts']['speaker'] = character_config["voice"]
print(f"🎵 更新TTS语音: {character_config['voice']}")
# 向输出进程发送更新TTS speaker的命令
try:
update_command = f"UPDATE_TTS_SPEAKER:{character_config['voice']}"
self.output_audio_queue.put(update_command)
print(f"📡 已发送TTS speaker更新命令到输出进程")
except Exception as e:
print(f"❌ 发送TTS speaker更新命令失败: {e}")
print(f"🔄 角色已切换: {old_character} -> {character_name}")
# 播放新角色打招呼
print("🎭 播放新角色打招呼...")
greeting_success = self.play_greeting()
if greeting_success:
print("✅ 角色切换完成")
else:
print("⚠️ 打招呼播放失败,继续运行...")
def get_nfc_status(self):
"""获取NFC状态"""
if not self.nfc_enabled or not self.nfc_manager:
return {"enabled": False, "current_character": None}
return {
"enabled": True,
"current_character": self.nfc_manager.get_current_character(),
"running": self.nfc_manager.running
}
def main():
"""主函数"""
@ -1562,4 +1688,4 @@ def main():
control_system.start()
if __name__ == "__main__":
main()
main()

View File

@ -11,6 +11,7 @@ import sys
import argparse
import json
import time
import subprocess
from typing import Dict, Any
def check_dependencies():
@ -47,6 +48,23 @@ def check_dependencies():
return True
def check_nfc_dependencies():
"""检查NFC相关依赖"""
try:
# 检查nfc-list命令是否存在
result = subprocess.run(['which', 'nfc-list'], capture_output=True, text=True)
if result.returncode != 0:
print("❌ 未找到nfc-list命令")
print(" 请安装libnfc工具:")
print(" Ubuntu/Debian: sudo apt-get install libnfc-bin")
print(" macOS: brew install libnfc")
return False
print("✅ NFC工具已安装")
return True
except Exception as e:
print(f"❌ NFC依赖检查失败: {e}")
return False
def check_environment():
"""检查运行环境"""
print("🔍 检查运行环境...")
@ -190,6 +208,7 @@ def main():
python multiprocess_recorder.py -c zhubajie # 指定角色
python multiprocess_recorder.py -l # 列出角色
python multiprocess_recorder.py --create-config # 创建配置文件
python multiprocess_recorder.py --enable-nfc # 启用NFC角色切换
"""
)
@ -205,6 +224,8 @@ def main():
help='检查运行环境')
parser.add_argument('--verbose', '-v', action='store_true',
help='详细输出')
parser.add_argument('--enable-nfc', action='store_true',
help='启用NFC角色切换功能')
args = parser.parse_args()
@ -288,9 +309,20 @@ def main():
if args.verbose:
control_system.config['system']['log_level'] = "DEBUG"
# 启动系统(自动校准和监听)
print("🚀 启动系统(包含自动校准和监听)...")
control_system.start(auto_calibration=True, auto_monitoring=True)
# 启用NFC功能如果指定- 在系统启动前启用确保校准完成后能立即开始NFC检测
nfc_enabled = False
if args.enable_nfc:
print("📱 检查NFC依赖...")
if not check_nfc_dependencies():
print("❌ NFC依赖检查失败无法启用NFC功能")
else:
print("📱 启用NFC角色切换功能...")
control_system.enable_nfc()
nfc_enabled = True
# 启动系统自动校准但根据NFC状态决定是否启动监听
print("🚀 启动系统(包含自动校准)...")
control_system.start(auto_calibration=True, auto_monitoring=not nfc_enabled)
except KeyboardInterrupt:
print("\n👋 用户中断")

176
nfc_manager.py Normal file
View File

@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NFC管理模块
实现NFC读取和角色切换功能
"""
import json
import os
import re
import subprocess
import threading
import time
from pathlib import Path
from typing import Callable, Dict, Optional
class NFCManager:
"""NFC管理器负责NFC读取和角色切换"""
def __init__(self, characters_dir: str = "characters"):
self.characters_dir = Path(characters_dir)
self.uid_to_character = {}
self.running = False
self.read_thread = None
self.current_uid = None
self.character_switch_callback: Optional[Callable] = None
# 加载角色NFC映射
self._load_character_mappings()
def _load_character_mappings(self):
"""加载角色NFC映射"""
self.uid_to_character = {}
if not self.characters_dir.exists():
print(f"⚠️ 角色目录不存在: {self.characters_dir}")
return
for config_file in self.characters_dir.glob("*.json"):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
if 'nfc_uid' in config:
uid = config['nfc_uid'].upper().strip()
character_name = config_file.stem
self.uid_to_character[uid] = character_name
print(f"✅ 加载角色映射: {character_name} -> {uid}")
except Exception as e:
print(f"❌ 加载角色配置失败 {config_file}: {e}")
if not self.uid_to_character:
print("⚠️ 未找到任何带NFC UID的角色配置")
def _read_uid(self) -> Optional[str]:
"""读取NFC UID"""
os.environ['LIBNFC_DRIVER'] = 'pn532_i2c'
os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1'
try:
result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout)
if match:
return match.group(1).strip().replace(' ', '').upper()
except subprocess.TimeoutExpired:
print("⚠️ NFC读取超时")
except Exception as e:
print(f"❌ NFC读取失败: {e}")
return None
def _read_loop(self):
"""NFC读取循环"""
print("🔄 启动NFC读取循环...")
while self.running:
try:
uid = self._read_uid()
if uid and uid != self.current_uid:
print(f"🎯 检测到NFC卡片: {uid}")
if uid in self.uid_to_character:
character_name = self.uid_to_character[uid]
print(f"🎭 切换角色: {character_name}")
# 更新当前UID
self.current_uid = uid
# 调用角色切换回调
if self.character_switch_callback:
self.character_switch_callback(character_name)
else:
print(f"⚠️ 未找到UID对应角色: {uid}")
except Exception as e:
print(f"❌ NFC读取循环出错: {e}")
# 等待5秒
time.sleep(1)
def set_character_switch_callback(self, callback: Callable):
"""设置角色切换回调函数"""
self.character_switch_callback = callback
def start(self):
"""启动NFC管理器"""
if self.running:
print("⚠️ NFC管理器已在运行")
return
self.running = True
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
print("✅ NFC管理器已启动")
def stop(self):
"""停止NFC管理器"""
if not self.running:
return
self.running = False
if self.read_thread:
self.read_thread.join(timeout=5)
print("🛑 NFC管理器已停止")
def get_current_character(self) -> Optional[str]:
"""获取当前角色"""
if self.current_uid and self.current_uid in self.uid_to_character:
return self.uid_to_character[self.current_uid]
return None
def reload_mappings(self):
"""重新加载角色映射"""
print("🔄 重新加载角色NFC映射...")
self._load_character_mappings()
def __del__(self):
"""析构函数"""
self.stop()
# 单例模式
_nfc_manager_instance = None
def get_nfc_manager(characters_dir: str = "characters") -> NFCManager:
"""获取NFC管理器单例"""
global _nfc_manager_instance
if _nfc_manager_instance is None:
_nfc_manager_instance = NFCManager(characters_dir)
return _nfc_manager_instance
if __name__ == "__main__":
# 测试NFC管理器
def on_character_switch(character_name):
print(f"🎭 角色切换回调: {character_name}")
nfc_manager = get_nfc_manager()
nfc_manager.set_character_switch_callback(on_character_switch)
try:
nfc_manager.start()
print("🔄 NFC管理器运行中按Ctrl+C退出...")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n👋 用户中断")
finally:
nfc_manager.stop()

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试 all_audio_received 修改后的逻辑
"""
import sys
import os
sys.path.append(os.path.dirname(__file__))
def test_all_audio_received_logic():
"""测试 all_audio_received 的设置逻辑"""
print("🧪 测试 all_audio_received 修改后的逻辑")
print("=" * 50)
# 模拟播放工作线程的逻辑
class MockAudioProcess:
def __init__(self):
self.currently_playing = False
self.all_audio_received = False
self.last_audio_chunk_time = 0
def simulate_audio_playback(self):
"""模拟音频播放过程"""
print("🔊 模拟音频播放开始...")
# 模拟第875行的逻辑
if not self.currently_playing:
self.currently_playing = True
import time
self.last_audio_chunk_time = time.time()
# 新增只要有音频播放就设置all_audio_received为True
if not self.all_audio_received:
self.all_audio_received = True
print(f"🎵 音频开始播放设置all_audio_received=True")
print(f"🎵 播放状态变化: currently_playing = True (开始播放)")
print(f"🎵 设置last_audio_chunk_time = {self.last_audio_chunk_time}")
# 测试1正常播放场景
print("\n📋 测试1正常播放场景")
mock1 = MockAudioProcess()
print(f"初始状态: currently_playing={mock1.currently_playing}, all_audio_received={mock1.all_audio_received}")
mock1.simulate_audio_playback()
print(f"播放后状态: currently_playing={mock1.currently_playing}, all_audio_received={mock1.all_audio_received}")
# 测试2重复播放场景不应该重复设置
print("\n📋 测试2重复播放场景")
mock2 = MockAudioProcess()
mock2.all_audio_received = True # 已经设置为True
print(f"初始状态: currently_playing={mock2.currently_playing}, all_audio_received={mock2.all_audio_received}")
mock2.simulate_audio_playback()
print(f"播放后状态: currently_playing={mock2.currently_playing}, all_audio_received={mock2.all_audio_received}")
print("\n✅ 所有测试完成")
print("\n📊 修改总结:")
print(" • 在播放工作线程中添加了 all_audio_received = True 的设置")
print(" • 移除了对 end_signal_received 的依赖")
print(" • 保留了原有逻辑作为备用机制")
print(" • 确保只要有音频播放就会设置 all_audio_received = True")
if __name__ == "__main__":
test_all_audio_received_logic()

141
test_nfc_functionality.py Normal file
View File

@ -0,0 +1,141 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试NFC角色切换功能
"""
import json
import os
import sys
from nfc_manager import NFCManager
def test_character_loading():
"""测试角色配置加载"""
print("🔍 测试角色配置加载...")
# 创建测试NFC管理器
nfc_manager = NFCManager()
# 检查映射是否正确加载
expected_mappings = {
'1DC6C90D0D1080': 'libai',
'1DC7C90D0D1080': 'zhubajie'
}
for uid, character in expected_mappings.items():
if uid in nfc_manager.uid_to_character:
actual_character = nfc_manager.uid_to_character[uid]
if actual_character == character:
print(f"{character} -> {uid}")
else:
print(f"{character} -> {uid} (实际: {actual_character})")
else:
print(f"❌ 未找到UID: {uid}")
return len(nfc_manager.uid_to_character) == len(expected_mappings)
def test_uid_reading():
"""测试UID读取功能需要真实NFC设备"""
print("\n🔍 测试UID读取功能...")
nfc_manager = NFCManager()
try:
uid = nfc_manager._read_uid()
if uid:
print(f"✅ 读取到UID: {uid}")
return True
else:
print("⚠️ 未读取到UID可能没有NFC设备或卡片")
return False
except Exception as e:
print(f"❌ UID读取失败: {e}")
return False
def test_character_switch_callback():
"""测试角色切换回调"""
print("\n🔍 测试角色切换回调...")
nfc_manager = NFCManager()
def callback(character_name):
print(f"🎭 角色切换回调被调用: {character_name}")
nfc_manager.set_character_switch_callback(callback)
# 模拟角色切换
test_uid = '1DC6C90D0D1080'
if test_uid in nfc_manager.uid_to_character:
character_name = nfc_manager.uid_to_character[test_uid]
print(f"📡 模拟检测到UID: {test_uid}")
print(f"🎭 应该切换到角色: {character_name}")
# 手动调用回调函数
if nfc_manager.character_switch_callback:
nfc_manager.character_switch_callback(character_name)
return True
return False
def test_character_configs():
"""测试角色配置文件"""
print("\n🔍 测试角色配置文件...")
characters_dir = "characters"
test_files = ['libai.json', 'zhubajie.json']
for filename in test_files:
filepath = os.path.join(characters_dir, filename)
if os.path.exists(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as f:
config = json.load(f)
required_fields = ['name', 'nfc_uid', 'greeting']
missing_fields = [field for field in required_fields if field not in config]
if not missing_fields:
print(f"{filename}: {config['name']} (NFC: {config['nfc_uid']})")
else:
print(f"{filename}: 缺少字段 {missing_fields}")
except Exception as e:
print(f"{filename}: 读取失败 - {e}")
else:
print(f"{filename}: 文件不存在")
def main():
"""主测试函数"""
print("🧪 NFC角色切换功能测试")
print("=" * 50)
# 测试角色配置文件
test_character_configs()
# 测试角色加载
loading_success = test_character_loading()
# 测试UID读取
reading_success = test_uid_reading()
# 测试角色切换回调
callback_success = test_character_switch_callback()
print("\n" + "=" * 50)
print("📊 测试结果:")
print(f" 角色配置加载: {'✅ 通过' if loading_success else '❌ 失败'}")
print(f" UID读取功能: {'✅ 通过' if reading_success else '⚠️ 需要NFC设备'}")
print(f" 角色切换回调: {'✅ 通过' if callback_success else '❌ 失败'}")
if loading_success and callback_success:
print("\n🎉 核心功能测试通过!")
print("💡 提示: 要测试完整的NFC功能请:")
print(" 1. 安装libnfc工具")
print(" 2. 连接NFC读取器")
print(" 3. 运行: python multiprocess_recorder.py --enable-nfc")
else:
print("\n❌ 部分测试失败,请检查配置")
if __name__ == "__main__":
main()

131
test_raspberry_nfc.py Normal file
View File

@ -0,0 +1,131 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试树莓派上的NFC功能
"""
import subprocess
import os
import re
import sys
import time
def test_nfc_connection():
"""测试NFC连接"""
print("🔍 测试NFC连接...")
# 设置环境变量
os.environ['LIBNFC_DRIVER'] = 'pn532_i2c'
os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1'
try:
result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10)
print(f"NFC输出: {result.stdout}")
print(f"返回码: {result.returncode}")
if result.returncode == 0:
if "NFC device: user defined device opened" in result.stdout:
print("✅ NFC设备连接成功")
return True
else:
print("⚠️ NFC设备未正确初始化")
return False
else:
print(f"❌ NFC设备连接失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ NFC测试异常: {e}")
return False
def test_nfc_reading():
"""测试NFC读取"""
print("\n🔍 测试NFC读取...")
# 设置环境变量
os.environ['LIBNFC_DRIVER'] = 'pn532_i2c'
os.environ['LIBNFC_DEVICE'] = 'pn532_i2c:/dev/i2c-1'
try:
result = subprocess.run(['nfc-list'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
match = re.search(r'UID\s*\(NFCID1\):\s*([0-9A-Fa-f\s]+)', result.stdout)
if match:
uid = match.group(1).strip().replace(' ', '').upper()
print(f"✅ 读取到UID: {uid}")
return uid
else:
print("⚠️ 未检测到NFC卡片")
return None
else:
print(f"❌ NFC读取失败: {result.stderr}")
return None
except Exception as e:
print(f"❌ NFC读取异常: {e}")
return None
def test_nfc_manager():
"""测试NFC管理器"""
print("\n🔍 测试NFC管理器...")
try:
# 导入NFC管理器
sys.path.append('/home/zhuchaowe/Local-Voice')
from nfc_manager import get_nfc_manager
# 创建NFC管理器
nfc_manager = get_nfc_manager()
# 检查映射
print(f"角色映射: {nfc_manager.uid_to_character}")
# 测试读取
uid = nfc_manager._read_uid()
if uid:
print(f"✅ NFC管理器读取到UID: {uid}")
# 检查映射
if uid in nfc_manager.uid_to_character:
character = nfc_manager.uid_to_character[uid]
print(f"🎭 对应角色: {character}")
else:
print("⚠️ UID未映射到角色")
else:
print("⚠️ 未读取到UID")
return True
except Exception as e:
print(f"❌ NFC管理器测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""主测试函数"""
print("🧪 树莓派NFC功能测试")
print("=" * 50)
# 测试NFC连接
connection_ok = test_nfc_connection()
# 测试NFC读取
uid = test_nfc_reading()
# 测试NFC管理器
manager_ok = test_nfc_manager()
print("\n" + "=" * 50)
print("📊 测试结果:")
print(f" NFC连接: {'✅ 通过' if connection_ok else '❌ 失败'}")
print(f" UID读取: {'✅ 通过' if uid else '⚠️ 无卡片'}")
print(f" NFC管理器: {'✅ 通过' if manager_ok else '❌ 失败'}")
if uid:
print(f"\n🎯 当前检测到的UID: {uid}")
print("💡 提示: 您现在可以使用以下命令测试完整的NFC角色切换功能:")
print(" cd Local-Voice && python multiprocess_recorder.py --enable-nfc")
else:
print(f"\n💡 提示: 请确保NFC读取器已连接并放置了NFC卡片")
if __name__ == "__main__":
main()