#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多进程音频控制系统 实现主控制进程和状态管理 """ import asyncio import base64 import gzip import json import multiprocessing as mp import os import queue import sys import threading import time import uuid from dataclasses import asdict, dataclass from enum import Enum from typing import Any, Dict, List, Optional import requests import websockets from audio_processes import (ControlCommand, InputProcess, OutputProcess, ProcessEvent, RecordingState) from nfc_manager import get_nfc_manager # 导入LED控制器 try: from led_controller import SystemState, set_led_state LED_AVAILABLE = True except ImportError: LED_AVAILABLE = False def input_process_target(command_queue, event_queue, config): """输入进程的目标函数 - 在子进程中创建InputProcess实例""" try: print("🎙️ 输入进程目标函数开始执行...") input_process = InputProcess(command_queue, event_queue, config) print("🎙️ InputProcess实例创建成功,开始运行...") input_process.run() print("🎙️ 输入进程运行完成") except Exception as e: print(f"❌ 输入进程出错: {e}") import traceback traceback.print_exc() def output_process_target(audio_queue, config, event_queue): """输出进程的目标函数 - 在子进程中创建OutputProcess实例""" try: print("🔊 输出进程目标函数开始执行...") output_process = OutputProcess(audio_queue, config, event_queue) print("🔊 OutputProcess实例创建成功,开始运行...") output_process.run() print("🔊 输出进程运行完成") except Exception as e: print(f"❌ 输出进程出错: {e}") import traceback traceback.print_exc() class ControlSystem: """主控制系统""" def __init__(self, config: Dict[str, Any] = None): self.config = config or self._get_default_config() # 进程间通信 self.input_command_queue = mp.Queue(maxsize=100) # 主进程 → 输入进程 self.input_event_queue = mp.Queue(maxsize=100) # 输入进程 → 主进程 self.output_audio_queue = mp.Queue(maxsize=1000) # 主进程 → 输出进程 self.output_event_queue = mp.Queue(maxsize=100) # 输出进程 → 主进程 # 进程 self.input_process = None self.output_process = None # 状态管理 self.state = RecordingState.IDLE self.processing_complete = False self.playback_complete = False # 当前处理的数据 self.current_audio_data = None self.current_audio_metadata = None # API配置 self.api_config = self._setup_api_config() # 统计信息 self.stats = { 'total_conversations': 0, 'total_recording_time': 0, 'successful_processing': 0, 'failed_processing': 0 } # 聊天历史记录 self.chat_history = [] # 从配置中读取聊天历史设置 self.enable_chat_memory = self.config.get('processing', {}).get('enable_chat_memory', True) self.max_history_length = self.config.get('processing', {}).get('max_history_length', 5) # 显示聊天记忆状态 if self.enable_chat_memory: print(f"💬 聊天记忆功能已启用,最多保存 {self.max_history_length} 轮对话") else: print("💬 聊天记忆功能已禁用") # 运行状态 self.running = True # NFC管理器 self.nfc_manager = None self.nfc_enabled = False # 检查依赖 self._check_dependencies() def _get_default_config(self) -> Dict[str, Any]: """获取默认配置""" return { 'system': { 'max_queue_size': 1000, 'process_timeout': 30, 'heartbeat_interval': 1.0 }, 'audio': { 'sample_rate': 16000, 'channels': 1, 'chunk_size': 1024 }, 'recording': { 'min_duration': 2.0, 'max_duration': 30.0, 'silence_threshold': 3.0 }, 'processing': { 'enable_asr': True, 'enable_llm': True, 'enable_tts': True, 'character': 'libai' } } def _setup_api_config(self) -> Dict[str, Any]: """设置API配置""" config = { 'asr': { 'appid': "8718217928", 'token': "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc", 'cluster': "volcengine_input_common", 'ws_url': "wss://openspeech.bytedance.com/api/v2/asr" }, 'llm': { 'api_url': "https://ark.cn-beijing.volces.com/api/v3/chat/completions", 'model': "doubao-seed-1-6-flash-250828", 'api_key': "f8e43677-1c23-4e21-8a4c-66e7103a8155", 'max_tokens': 50 }, 'tts': { 'url': "https://openspeech.bytedance.com/api/v3/tts/unidirectional", 'app_id': "8718217928", 'access_key': "ynJMX-5ix1FsJvswC9KTNlGUdubcchqc", 'resource_id': "volc.service_type.10029", 'app_key': "aGjiRDfUWi", 'speaker': "zh_female_wanqudashu_moon_bigtts" } } # 加载角色配置 character_config = self._load_character_config(self.config['processing']['character']) if character_config and "voice" in character_config: config['tts']['speaker'] = character_config["voice"] return config def _load_character_config(self, character_name: str) -> Optional[Dict[str, Any]]: """加载角色配置""" characters_dir = os.path.join(os.path.dirname(__file__), "characters") config_file = os.path.join(characters_dir, f"{character_name}.json") if not os.path.exists(config_file): print(f"⚠️ 角色配置文件不存在: {config_file}") return None try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) print(f"✅ 加载角色: {config.get('name', character_name)}") return config except Exception as e: print(f"❌ 加载角色配置失败: {e}") return None def _check_dependencies(self): """检查依赖库""" missing_deps = [] try: import pyaudio except ImportError: missing_deps.append("pyaudio") try: import numpy except ImportError: missing_deps.append("numpy") try: import requests except ImportError: missing_deps.append("requests") try: import websockets except ImportError: missing_deps.append("websockets") if missing_deps: print(f"❌ 缺少依赖库: {', '.join(missing_deps)}") print("请安装: pip install " + " ".join(missing_deps)) sys.exit(1) # 检查API密钥 if not self.api_config['llm']['api_key']: print("⚠️ 未设置 ARK_API_KEY 环境变量,大语言模型功能将被禁用") self.config['processing']['enable_llm'] = False def _start_processes(self): """内部方法:启动进程但不启动控制循环""" print("🚀 启动进程...") # 创建并启动输入进程 input_config = { 'zcr_min': 2400, 'zcr_max': 12000, 'min_recording_time': self.config['recording']['min_duration'], 'max_recording_time': self.config['recording']['max_duration'], 'silence_threshold': self.config['recording']['silence_threshold'], 'pre_record_duration': 2.0 } self.input_process = mp.Process( target=input_process_target, args=(self.input_command_queue, self.input_event_queue, input_config) ) # 创建并启动输出进程 output_config = { 'buffer_size': 1000, 'show_progress': True, 'progress_interval': 100, 'tts_speaker': self.api_config['tts']['speaker'] } self.output_process = mp.Process( target=output_process_target, args=(self.output_audio_queue, output_config, self.output_event_queue) ) # 启动进程 self.input_process.start() self.output_process.start() print("✅ 所有进程已启动") print("🎙️ 输入进程:负责录音和语音检测") print("🔊 输出进程:负责音频播放") print("🎯 主控制:负责协调和AI处理") def start(self, auto_calibration=True, auto_monitoring=True): """启动系统 Args: auto_calibration: 是否自动启动校准 auto_monitoring: 是否自动启动监听 """ print("🚀 启动多进程音频控制系统") print("=" * 60) # 启动进程 self._start_processes() print("=" * 60) if auto_calibration: # 自动启动校准 print("🎯 自动启动语音检测器校准...") success = self.start_calibration() if not success: print("⚠️ 校准启动失败,继续运行...") else: # 等待校准完成 if self.wait_for_calibration_complete(timeout=30): print("✅ 校准完成") # 校准完成后恢复LED状态 if LED_AVAILABLE: if hasattr(self, 'nfc_enabled') and self.nfc_enabled: set_led_state(SystemState.WAITING_NFC) else: set_led_state(SystemState.IDLE_MONITORING) # 如果启用了NFC,开始持续执行NFC检测,但不播放打招呼或启动监听 if hasattr(self, 'nfc_enabled') and self.nfc_enabled: print("📱 开始持续执行NFC检测,等待NFC卡片...") # NFC检测已经在enable_nfc()中启动,这里只需确认状态 if self.nfc_manager and self.nfc_manager.running: print("✅ NFC检测已在运行") else: print("⚠️ NFC检测未运行,尝试启动...") try: self.nfc_manager.start() print("✅ NFC检测已启动") except Exception as e: print(f"❌ 启动NFC检测失败: {e}") else: # 如果没有启用NFC,播放打招呼并启动监听 print("🎭 播放角色打招呼...") greeting_success = self.play_greeting() if not greeting_success: print("⚠️ 打招呼播放失败,继续运行...") # 打招呼失败时设置LED为空闲监听状态 if LED_AVAILABLE: set_led_state(SystemState.IDLE_MONITORING) print("🎯 启动音频监听...") success = self.start_monitoring() if success: print("✅ 监听已启动") # 注意:不立即设置LED状态,让打招呼效果持续到播放完成 else: print("⚠️ 监听启动失败") # 监听启动失败时设置LED为空闲监听状态 if LED_AVAILABLE: set_led_state(SystemState.IDLE_MONITORING) else: print("⚠️ 校准超时,继续运行...") # 校准超时也恢复LED状态 if LED_AVAILABLE: if hasattr(self, 'nfc_enabled') and self.nfc_enabled: set_led_state(SystemState.WAITING_NFC) else: set_led_state(SystemState.IDLE_MONITORING) # 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听 # if auto_monitoring: # # 自动启动监听 # print("🎯 自动启动音频监听...") # success = self.start_monitoring() # if success: # print("✅ 监听已启动") # else: # print("⚠️ 监听启动失败") print("=" * 60) if self.nfc_enabled: print("📱 NFC模式:等待NFC卡片进行角色切换...") else: print("🎙️ 系统就绪,开始检测语音...") print("=" * 60) # 启动主控制循环 self._control_loop() def start_calibration(self): """启动语音检测器校准""" print("🎯 启动语音检测器校准...") # 更新LED状态为校准模式 if LED_AVAILABLE: set_led_state(SystemState.CALIBRATING) self.input_command_queue.put(ControlCommand('start_calibration')) return True def start_monitoring(self): """启动音频监听""" print("🎯 启动音频监听...") self.input_command_queue.put(ControlCommand('start_monitoring')) return True def stop_monitoring(self): """停止音频监听""" print("🎯 停止音频监听...") self.input_command_queue.put(ControlCommand('stop_monitoring')) return True def get_calibration_status(self): """获取校准状态""" self.input_command_queue.put(ControlCommand('get_calibration_status')) # 等待响应 start_time = time.time() while time.time() - start_time < 5.0: # 5秒超时 try: event = self.input_event_queue.get(timeout=0.1) if event.event_type == 'calibration_status': return event.metadata except queue.Empty: continue return None def get_monitoring_status(self): """获取监听状态""" self.input_command_queue.put(ControlCommand('get_monitoring_status')) # 等待响应 start_time = time.time() while time.time() - start_time < 5.0: # 5秒超时 try: event = self.input_event_queue.get(timeout=0.1) if event.event_type == 'monitoring_status': return event.metadata except queue.Empty: continue return None def wait_for_calibration_complete(self, timeout=30): """等待校准完成""" print(f"⏱️ 等待校准完成,超时时间: {timeout}秒...") start_time = time.time() while time.time() - start_time < timeout: status = self.get_calibration_status() if status and not status['calibrating']: print("✅ 校准完成") return True # 显示进度 if status: progress = status['progress'] * 100 print(f"\r🔧 校准进度: {progress:.1f}%", end='', flush=True) time.sleep(0.5) print(f"\n⚠️ 校准超时") return False def _control_loop(self): """主控制循环""" print("🎯 主控制循环启动") try: while self.running: # 根据状态处理不同逻辑 if self.state == RecordingState.IDLE: self._handle_idle_state() elif self.state == RecordingState.RECORDING: self._handle_recording_state() elif self.state == RecordingState.PROCESSING: self._handle_processing_state() elif self.state == RecordingState.PLAYING: self._handle_playing_state() # 检查进程事件 self._check_events() # 显示状态 self._display_status() # 控制循环频率 time.sleep(0.1) except KeyboardInterrupt: print("\n👋 收到退出信号...") self.shutdown() except Exception as e: print(f"❌ 主控制循环错误: {e}") self.shutdown() def _handle_idle_state(self): """处理空闲状态""" if self.state == RecordingState.IDLE: # 检查是否应该立即启用录音 # 如果刚刚从播放完成状态切换过来,不要立即启用录音 if hasattr(self, '_just_finished_playing') and self._just_finished_playing: print("🎯 状态:IDLE(播放刚完成,等待延迟启用录音)") self._just_finished_playing = False # 重置标志 else: # 如果启用了NFC,不自动启动音频监听,等待NFC触发 if self.nfc_enabled: print("🎯 状态:IDLE(NFC模式已启用,等待NFC卡片触发)") return # 检查监听状态 monitoring_status = self.get_monitoring_status() if monitoring_status and monitoring_status['enabled']: # 监听已启用,切换到录音状态 self.state = RecordingState.RECORDING # 更新LED状态 if LED_AVAILABLE: set_led_state(SystemState.RECORDING) print("🎯 状态:IDLE → RECORDING(监听已启用)") else: # 监听未启用,尝试启用 print("🎯 状态:IDLE(监听未启用,尝试启用)") success = self.start_monitoring() if success: # 监听启用成功,等待状态更新后进入录音状态 time.sleep(0.5) # 等待状态更新 self.state = RecordingState.RECORDING # 更新LED状态 if LED_AVAILABLE: set_led_state(SystemState.RECORDING) print("🎯 状态:IDLE → RECORDING(监听已启用)") else: print("⚠️ 监听启用失败,保持IDLE状态") def _handle_recording_state(self): """处理录音状态""" # 等待输入进程发送录音完成事件 pass def _handle_processing_state(self): """处理状态""" if not self.processing_complete: self._process_audio_pipeline() def _handle_playing_state(self): """处理播放状态""" # 现在主要由输出进程的播放完成事件驱动 pass def _check_events(self): """检查进程事件""" # 检查输入进程事件 try: while True: event = self.input_event_queue.get_nowait() if event.event_type == 'recording_complete': print("📡 主控制:收到录音完成事件") self._handle_recording_complete(event) except queue.Empty: pass # 检查输出进程事件 try: while True: event = self.output_event_queue.get_nowait() if event.event_type == 'playback_complete': print("📡 主控制:收到播放完成事件") self._handle_playback_complete(event) except queue.Empty: pass def check_events(self): """公开方法:检查进程事件""" return self._check_events() def _handle_recording_complete(self, event: ProcessEvent): """处理录音完成事件""" # 禁用输入进程录音功能 self.input_command_queue.put(ControlCommand('disable_recording')) # 保存录音数据 self.current_audio_data = event.data self.current_audio_metadata = event.metadata # 更新统计 self.stats['total_recording_time'] += event.metadata['duration'] # 切换到处理状态 self.state = RecordingState.PROCESSING self.processing_complete = False self.playback_complete = False # 更新LED状态 if LED_AVAILABLE: set_led_state(SystemState.PROCESSING) print(f"🎯 状态:RECORDING → PROCESSING (时长: {event.metadata['duration']:.2f}s)") def _handle_playback_complete(self, event: ProcessEvent): """处理播放完成事件""" # 标记播放完成 self.playback_complete = True # 更新统计 self.stats['total_conversations'] += 1 # 切换到空闲状态,但先不启用录音 old_state = self.state.value self.state = RecordingState.IDLE # 设置标志,表示刚从播放状态切换过来 self._just_finished_playing = True # 更新LED状态 if LED_AVAILABLE: if self.nfc_enabled: set_led_state(SystemState.WAITING_NFC) else: set_led_state(SystemState.IDLE_MONITORING) print(f"🎯 状态:{old_state} → IDLE") # 延迟重新启用录音,确保音频设备完全停止 # 延迟启用录音的函数 def delayed_enable_recording(): import threading import time # 等待更长时间确保音频完全停止 time.sleep(2.0) # 增加到2秒 # 检查输出队列是否还有音频数据 output_queue_size = self.output_audio_queue.qsize() if output_queue_size > 0: time.sleep(1.0) # 重新启用输入进程录音功能 try: self.input_command_queue.put(ControlCommand('enable_recording')) # 更新状态为录音状态 self.state = RecordingState.RECORDING # 更新LED状态 if LED_AVAILABLE: set_led_state(SystemState.RECORDING) print(f"🎯 状态:IDLE → RECORDING(延迟启用)") except Exception as e: print(f"❌ 主控制:延迟发送 enable_recording 命令失败: {e}") # 在新线程中延迟启用录音 threading.Thread(target=delayed_enable_recording, daemon=True).start() def _process_audio_pipeline(self): """处理音频流水线:STT + LLM + TTS""" try: print("🤖 开始处理音频流水线") # 1. 语音识别 (STT) if self.config['processing']['enable_asr']: text = self._speech_to_text(self.current_audio_data) if not text: print("❌ 语音识别失败") self._handle_processing_failure() return print(f"📝 识别结果: {text}") else: text = "语音识别功能已禁用" # 2. 大语言模型 (LLM) - 使用流式处理支持智能缓冲 if self.config['processing']['enable_llm']: success = self._call_llm_streaming(text) if not success: print("❌ 大语言模型调用失败") self._handle_processing_failure() return else: # 如果禁用LLM,直接发送文本到TTS response = "大语言模型功能已禁用" if self.config['processing']['enable_tts']: success = self._send_text_to_output_process(response) if not success: print("❌ 文本转语音失败") self._handle_processing_failure() return # 在禁用LLM的情况下,也需要发送完成信号 # 由于LLM未启用,直接发送LLM完成信号 self._notify_llm_complete() # 发送TTS完成信号 tts_complete_command = "TTS_COMPLETE:" self.output_audio_queue.put(tts_complete_command) # 发送结束信号 self.output_audio_queue.put(None) else: print("ℹ️ 文本转语音功能已禁用") # 直接发送结束信号 self.output_audio_queue.put(None) # 标记处理完成 self.processing_complete = True self.state = RecordingState.PLAYING self.stats['successful_processing'] += 1 # 更新LED状态 if LED_AVAILABLE: set_led_state(SystemState.PLAYING) print("🎯 状态:PROCESSING → PLAYING") except Exception as e: print(f"❌ 处理流水线错误: {e}") self._handle_processing_failure() def _handle_processing_failure(self): """处理失败情况""" self.stats['failed_processing'] += 1 self.state = RecordingState.IDLE self.processing_complete = True self.playback_complete = True # 更新LED状态 if LED_AVAILABLE: set_led_state(SystemState.ERROR) time.sleep(2) # 显示错误状态2秒 if self.nfc_enabled: set_led_state(SystemState.WAITING_NFC) else: set_led_state(SystemState.IDLE_MONITORING) # 发送完成信号,防止输出进程等待 try: # 发送LLM完成信号 self._notify_llm_complete() # 发送TTS完成信号 tts_complete_command = "TTS_COMPLETE:" self.output_audio_queue.put(tts_complete_command) # 关键修复:处理失败时也要发送all_audio_received=True信号 # 这解决了语音识别失败但TTS完成信号已发送的死锁问题 all_audio_received_command = "ALL_AUDIO_RECEIVED:" self.output_audio_queue.put(all_audio_received_command) print(f"🔧 处理失败,发送all_audio_received=True信号以避免死锁") # 发送结束信号 self.output_audio_queue.put(None) except Exception as e: print(f"❌ 发送失败处理信号失败: {e}") print("🎯 状态:PROCESSING → IDLE (失败)") def _speech_to_text(self, audio_data: bytes) -> Optional[str]: """语音转文字""" try: print(f"🔍 开始语音识别,音频大小: {len(audio_data)} 字节") result = asyncio.run(self._recognize_audio_async(audio_data)) if result: print(f"✅ 语音识别成功: {result}") else: print(f"❌ 语音识别返回空结果") return result except Exception as e: print(f"❌ 语音识别异常: {e}") import traceback print(f"❌ 详细错误信息:\n{traceback.format_exc()}") return None async def _recognize_audio_async(self, audio_data: bytes) -> Optional[str]: """异步语音识别""" if not self.config['processing']['enable_asr']: return "语音识别功能已禁用" try: # 验证音频数据 print(f"🎵 音频数据验证:") print(f" - 大小: {len(audio_data)} 字节") print(f" - 是否为空: {len(audio_data) == 0}") if len(audio_data) == 0: print("❌ 音频数据为空") return None # 检查是否有WAV头部 has_wav_header = audio_data.startswith(b'RIFF') print(f" - 有WAV头部: {has_wav_header}") if has_wav_header: # 解析WAV头部 print(f" - WAV格式,可能需要提取PCM数据") riff_size = int.from_bytes(audio_data[4:8], 'little') wave_fmt = audio_data[8:12] if wave_fmt == b'WAVE': print(f" - WAVE格式正确") # 查找fmt块 fmt_pos = audio_data.find(b'fmt ') if fmt_pos > 0: fmt_size = int.from_bytes(audio_data[fmt_pos+4:fmt_pos+8], 'little') audio_format = int.from_bytes(audio_data[fmt_pos+8:fmt_pos+10], 'little') channels = int.from_bytes(audio_data[fmt_pos+10:fmt_pos+12], 'little') sample_rate = int.from_bytes(audio_data[fmt_pos+12:fmt_pos+16], 'little') print(f" - 音频格式: {audio_format}") print(f" - 声道数: {channels}") print(f" - 采样率: {sample_rate}") else: print(f" - 纯PCM数据") # 检查音频数据格式(假设是16位PCM) if len(audio_data) % 2 != 0: print(f"⚠️ 音频数据长度不是2的倍数: {len(audio_data)}") # 计算音频时长 sample_rate = self.config['audio']['sample_rate'] channels = self.config['audio']['channels'] bytes_per_second = sample_rate * channels * 2 # 16位 = 2字节 duration = len(audio_data) / bytes_per_second print(f" - 配置采样率: {sample_rate} Hz") print(f" - 配置声道数: {channels}") print(f" - 估算时长: {duration:.2f} 秒") if duration < 0.5: print(f"⚠️ 音频时长过短: {duration:.2f} 秒") import websockets print(f"🔗 连接WebSocket ASR服务: {self.api_config['asr']['ws_url']}") # 生成ASR头部 def generate_asr_header(message_type=1, message_type_specific_flags=0): PROTOCOL_VERSION = 0b0001 DEFAULT_HEADER_SIZE = 0b0001 JSON = 0b0001 GZIP = 0b0001 header = bytearray() header.append((PROTOCOL_VERSION << 4) | DEFAULT_HEADER_SIZE) header.append((message_type << 4) | message_type_specific_flags) header.append((JSON << 4) | GZIP) header.append(0x00) # reserved return header # 解析ASR响应 - 基于recorder.py的工作实现 def parse_asr_response(res): """解析ASR响应""" PROTOCOL_VERSION = res[0] >> 4 header_size = res[0] & 0x0f message_type = res[1] >> 4 message_type_specific_flags = res[1] & 0x0f serialization_method = res[2] >> 4 message_compression = res[2] & 0x0f reserved = res[3] header_extensions = res[4:header_size * 4] payload = res[header_size * 4:] result = {} payload_msg = None payload_size = 0 print(f"🔍 响应头信息: message_type={message_type}, compression={message_compression}, serialization={serialization_method}") if message_type == 0b1001: # SERVER_FULL_RESPONSE payload_size = int.from_bytes(payload[:4], "big", signed=True) payload_msg = payload[4:] print(f"📦 Full响应: payload_size={payload_size}") elif message_type == 0b1011: # SERVER_ACK seq = int.from_bytes(payload[:4], "big", signed=True) result['seq'] = seq if len(payload) >= 8: payload_size = int.from_bytes(payload[4:8], "big", signed=False) payload_msg = payload[8:] print(f"📦 ACK响应: seq={seq}, payload_size={payload_size}") elif message_type == 0b1111: # SERVER_ERROR_RESPONSE code = int.from_bytes(payload[:4], "big", signed=False) result['code'] = code payload_size = int.from_bytes(payload[4:8], "big", signed=False) payload_msg = payload[8:] print(f"❌ 错误响应: code={code}") if payload_msg is None: return result if message_compression == 0b0001: # GZIP payload_msg = gzip.decompress(payload_msg) print(f"📦 解压后大小: {len(payload_msg)} 字节") if serialization_method == 0b0001: # JSON payload_msg = json.loads(str(payload_msg, "utf-8")) result['payload_msg'] = payload_msg result['payload_size'] = payload_size return result # 构建请求参数 reqid = str(uuid.uuid4()) request_params = { 'app': { 'appid': self.api_config['asr']['appid'], 'cluster': self.api_config['asr']['cluster'], 'token': self.api_config['asr']['token'], }, 'user': { 'uid': 'multiprocess_asr' }, 'request': { 'reqid': reqid, 'nbest': 1, 'workflow': 'audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate', 'show_language': False, 'show_utterances': False, 'result_type': 'full', "sequence": 1 }, 'audio': { 'format': 'pcm', 'rate': self.config['audio']['sample_rate'], 'language': 'zh-CN', 'bits': 16, 'channel': self.config['audio']['channels'], 'codec': 'raw' } } # 构建请求 print(f"📋 ASR请求参数:") print(f" - audio.format: {request_params['audio']['format']}") print(f" - audio.rate: {request_params['audio']['rate']}") print(f" - audio.channel: {request_params['audio']['channel']}") print(f" - audio.bits: {request_params['audio']['bits']}") print(f" - audio.codec: {request_params['audio']['codec']}") print(f" - request.workflow: {request_params['request']['workflow']}") payload_bytes = str.encode(json.dumps(request_params)) payload_bytes = gzip.compress(payload_bytes) full_client_request = bytearray(generate_asr_header()) full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) full_client_request.extend(payload_bytes) # 设置认证头 additional_headers = {'Authorization': 'Bearer; {}'.format(self.api_config['asr']['token'])} # 连接WebSocket print(f"📡 尝试连接WebSocket...") print(f"🔗 WebSocket URL: {self.api_config['asr']['ws_url']}") print(f"📋 Headers: {additional_headers}") async with websockets.connect( self.api_config['asr']['ws_url'], additional_headers=additional_headers, max_size=1000000000, ping_interval=20, ping_timeout=60 ) as ws: print(f"✅ WebSocket连接成功") # 发送请求 print(f"📤 发送ASR请求...") print(f"📦 请求大小: {len(full_client_request)} 字节") await ws.send(full_client_request) res = await ws.recv() print(f"📥 收到ASR响应,大小: {len(res)} 字节") result = parse_asr_response(res) print(f"🔍 解析ASR响应: {result}") # 发送音频数据 - 基于recorder.py实现 chunk_size = int(self.config['audio']['channels'] * 2 * self.config['audio']['sample_rate'] * 15000 / 1000) print(f"🎵 开始发送音频数据:") print(f" - 总大小: {len(audio_data)} 字节") print(f" - 分块大小: {chunk_size} 字节") print(f" - 预计分块数: {(len(audio_data) + chunk_size - 1) // chunk_size}") total_chunks = (len(audio_data) + chunk_size - 1) // chunk_size chunks_sent = 0 for offset in range(0, len(audio_data), chunk_size): chunks_sent += 1 chunk = audio_data[offset:offset + chunk_size] last = (offset + chunk_size) >= len(audio_data) print(f"📦 发送第 {chunks_sent}/{total_chunks} 块:") print(f" - 当前块大小: {len(chunk)} 字节") print(f" - 偏移量: {offset}-{offset + len(chunk)}") print(f" - 是否最后一块: {last}") try: payload_bytes = gzip.compress(chunk) audio_only_request = bytearray( generate_asr_header( message_type=0b0010, message_type_specific_flags=0b0010 if last else 0 ) ) audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big')) audio_only_request.extend(payload_bytes) print(f" - 压缩后大小: {len(payload_bytes)} 字节") print(f" - 总请求数据大小: {len(audio_only_request)} 字节") await ws.send(audio_only_request) print(f" ✅ 第 {chunks_sent} 块发送成功") # 等待服务器响应 try: res = await asyncio.wait_for(ws.recv(), timeout=30.0) print(f" 📥 收到第 {chunks_sent} 块响应,大小: {len(res)} 字节") result = parse_asr_response(res) print(f" 🔍 第 {chunks_sent} 块响应解析: {result}") except asyncio.TimeoutError: print(f" ⏰ 第 {chunks_sent} 块响应超时") raise Exception("音频块响应超时") # 检查每个响应是否有错误 if 'code' in result: print(f" 🔍 第 {chunks_sent} 块响应码: {result['code']}") if result['code'] != 1000: print(f" ❌ 第 {chunks_sent} 块数据发送失败: {result}") return None if 'payload_msg' in result and result['payload_msg'].get('code') != 1000: print(f" ❌ 第 {chunks_sent} 块数据发送失败: {result['payload_msg']}") return None except Exception as chunk_error: print(f" ❌ 第 {chunks_sent} 块发送异常: {chunk_error}") raise chunk_error if last: print(f"📨 发送最后一块音频数据完成") print(f"🎯 所有音频数据发送完成,共发送 {chunks_sent} 块") # 检查最后一个响应中是否包含识别结果 print(f"🎯 检查最终识别结果...") print(f"📋 最后一个响应: {result}") if 'payload_msg' in result: payload_msg = result['payload_msg'] print(f"📋 最终Payload结构: {list(payload_msg.keys()) if isinstance(payload_msg, dict) else type(payload_msg)}") print(f"📋 最终Payload内容: {payload_msg}") if isinstance(payload_msg, dict): # 检查响应码 if 'code' in payload_msg: code = payload_msg['code'] print(f"🔢 最终响应码: {code}") if code == 1000: print(f"✅ ASR识别成功") else: print(f"❌ ASR服务返回错误: {payload_msg.get('message', '未知错误')}") return None # 查找结果 - 与recorder.py保持一致 if 'result' in payload_msg: results = payload_msg['result'] print(f"📝 找到结果字段 'result': {results}") if isinstance(results, list) and results: text = results[0].get('text', '识别失败') print(f"✅ 提取识别文本: {text}") return text elif isinstance(results, str): print(f"✅ 提取识别文本: {results}") return results else: print(f"❌ 未找到result字段,可用字段: {list(payload_msg.keys())}") else: print(f"❌ Payload不是字典类型: {type(payload_msg)}") else: print(f"❌ 响应中没有payload_msg字段") print(f"可用字段: {list(result.keys())}") if 'code' in result: print(f"错误码: {result['code']}") return None except Exception as e: print(f"❌ 语音识别失败: {e}") return None def _call_llm(self, text: str) -> Optional[str]: """调用大语言模型""" if not self.config['processing']['enable_llm']: return "大语言模型功能已禁用" try: # 获取角色配置 character_config = self._load_character_config(self.config['processing']['character']) if character_config and "system_prompt" in character_config: system_prompt = character_config["system_prompt"] else: system_prompt = "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。" # 构建请求 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_config['llm']['api_key']}" } # 构建消息列表 messages = [ { "role": "system", "content": system_prompt } ] # 如果启用聊天记忆,添加历史对话记录 if self.enable_chat_memory: for history_item in self.chat_history: messages.extend([ {"role": "user", "content": history_item["user"]}, {"role": "assistant", "content": history_item["assistant"]} ]) # 添加当前用户消息 messages.append({ "role": "user", "content": text }) data = { "model": self.api_config['llm']['model'], "messages": messages, "max_tokens": self.api_config['llm']['max_tokens'], "stream": False # 非流式,简化实现 } response = requests.post( self.api_config['llm']['api_url'], headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() if 'choices' in result and len(result['choices']) > 0: content = result['choices'][0]['message']['content'] filtered_content = self._filter_parentheses_content(content.strip()) # 保存对话到历史记录 if self.enable_chat_memory: self._add_to_chat_history(text, filtered_content) print(f"💬 对话已保存到历史记录 (当前历史长度: {len(self.chat_history)})") return filtered_content print(f"❌ LLM API调用失败: {response.status_code}") return None except Exception as e: print(f"❌ 大语言模型调用失败: {e}") return None def _call_llm_streaming(self, text: str) -> bool: """流式调用大语言模型并实时处理响应""" if not self.config['processing']['enable_llm']: return False try: print("🤖 调用大语言模型(流式输出)...") # 获取角色配置 character_config = self._load_character_config(self.config['processing']['character']) if character_config and "system_prompt" in character_config: system_prompt = character_config["system_prompt"] else: system_prompt = "你是一个智能助手,请根据用户的语音输入提供有帮助的回答。保持回答简洁明了。" # 获取角色配置中的最大token数 max_tokens = 50 if character_config and "max_tokens" in character_config: max_tokens = character_config["max_tokens"] headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_config['llm']['api_key']}" } # 构建消息列表 messages = [ { "role": "system", "content": system_prompt } ] # 如果启用聊天记忆,添加历史对话记录 if self.enable_chat_memory: for history_item in self.chat_history: messages.extend([ {"role": "user", "content": history_item["user"]}, {"role": "assistant", "content": history_item["assistant"]} ]) # 添加当前用户消息 messages.append({ "role": "user", "content": text }) data = { "model": self.api_config['llm']['model'], "messages": messages, "max_tokens": max_tokens, "stream": True # 启用流式输出 } # 使用流式请求 response = requests.post( self.api_config['llm']['api_url'], headers=headers, json=data, stream=True, timeout=30 ) if response.status_code == 200: print("🔄 开始接收流式响应...") # 处理流式响应 accumulated_text = "" sentence_buffer = "" # 使用行缓冲区处理多字节字符 buffer = "" for line_bytes in response.iter_lines(): if not line_bytes: continue # 解码为字符串,处理可能的编码问题 try: line = line_bytes.decode('utf-8') except UnicodeDecodeError: try: line = line_bytes.decode('latin-1') except Exception: continue # 添加到缓冲区 buffer += line # 检查是否包含完整的数据行 while buffer: # 查找完整的数据行 if '\n' in buffer: line, buffer = buffer.split('\n', 1) else: line = buffer buffer = "" line = line.strip() if not line: continue try: # 解析SSE格式的响应 if line.startswith("data: "): data_str = line[6:] # 移除 "data: " 前缀 if data_str == "[DONE]": break # 尝试解析JSON try: chunk_data = json.loads(data_str) except json.JSONDecodeError as e: # 尝试修复JSON:检查是否是不完整的字符串 if not data_str.endswith('}'): # 这可能是一个不完整的JSON,等待下一行 buffer = data_str + buffer continue print(f"⚠️ JSON解析错误: {e}") continue # 处理解析后的数据 if "choices" in chunk_data and len(chunk_data["choices"]) > 0: delta = chunk_data["choices"][0].get("delta", {}) content = delta.get("content", "") if content: accumulated_text += content sentence_buffer += content # 检查是否有完整句子 if self._is_complete_sentence(sentence_buffer): print(f"📝 检测到完整句子: {sentence_buffer}") # 过滤括号内容 filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip()) if filtered_sentence: print(f"🎵 发送到智能缓冲区: {filtered_sentence}") # 发送到输出进程的智能缓冲系统 self._send_streaming_text_to_output_process(filtered_sentence) # 重置句子缓冲区 sentence_buffer = "" # 显示进度 print(f"\r💬 已生成: {accumulated_text}", end='', flush=True) except Exception as e: print(f"⚠️ 处理流式响应时出错: {e}") continue print(f"\n✅ 流式响应完成: {accumulated_text}") # 强制刷新缓冲区,确保所有内容都被处理 if sentence_buffer: filtered_sentence = self._filter_parentheses_content(sentence_buffer.strip()) if filtered_sentence: print(f"🎵 发送剩余句子: {filtered_sentence}") self._send_streaming_text_to_output_process(filtered_sentence) # 保存完整回复到历史记录 if accumulated_text and self.enable_chat_memory: filtered_response = self._filter_parentheses_content(accumulated_text.strip()) self._add_to_chat_history(text, filtered_response) print(f"💬 对话已保存到历史记录 (当前历史长度: {len(self.chat_history)})") # 通知输出进程LLM生成已完成 self._notify_llm_complete() # 通知输出进程所有文本已发送完成 self._flush_output_process_tts_buffer() # 在流式模式下,只发送结束信号,不发送TTS完成信号 # 让OutputProcess在真正完成TTS生成时自己设置TTS完成状态 self.output_audio_queue.put(None) return accumulated_text != "" else: print(f"❌ LLM API调用失败: {response.status_code}") return False except Exception as e: print(f"❌ 流式LLM调用失败: {e}") return False def _send_streaming_text_to_output_process(self, text: str): """发送流式文本到输出进程的智能缓冲系统""" try: # 发送特殊命令到输出进程,指示这是流式文本 streaming_command = f"STREAMING_TEXT:{text}" self.output_audio_queue.put(streaming_command) return True except Exception as e: print(f"❌ 发送流式文本失败: {e}") return False def _send_text_to_output_process(self, text: str) -> bool: """发送完整文本到输出进程""" try: # 发送特殊命令到输出进程,指示这是完整文本 complete_command = f"COMPLETE_TEXT:{text}" self.output_audio_queue.put(complete_command) return True except Exception as e: print(f"❌ 发送文本失败: {e}") return False def _send_greeting_to_output_process(self, text: str, character_name: str) -> bool: """发送打招呼文本到输出进程(带缓存支持)""" try: # 发送特殊命令到输出进程,包含角色名称用于缓存 greeting_command = f"GREETING_TEXT:{text}:{character_name}" self.output_audio_queue.put(greeting_command) return True except Exception as e: print(f"❌ 发送打招呼文本失败: {e}") return False def _flush_output_process_tts_buffer(self): """通知输出进程刷新TTS缓冲区""" try: # 发送刷新缓冲区命令 flush_command = "FLUSH_TTS_BUFFER:" self.output_audio_queue.put(flush_command) return True except Exception as e: print(f"❌ 刷新缓冲区失败: {e}") return False def _notify_llm_complete(self): """通知输出进程LLM生成已完成""" try: # 发送LLM完成信号 llm_complete_command = "LLM_COMPLETE:" self.output_audio_queue.put(llm_complete_command) return True except Exception as e: print(f"❌ 发送LLM完成信号失败: {e}") return False def _is_complete_sentence(self, text): """检测是否为完整句子 - 从 recorder.py 移植""" import re if not text or len(text.strip()) == 0: return False # 增加最小长度要求 - 至少10个字符才考虑作为完整句子 if len(text.strip()) < 10: return False # 句子结束标点符号 sentence_endings = r'[。!?.!?]' # 检查是否以句子结束符结尾 if re.search(sentence_endings + r'\s*$', text): # 对于以结束符结尾的句子,要求至少15个字符 if len(text.strip()) >= 15: return True # 检查是否包含句子结束符(可能在句子中间) if re.search(sentence_endings, text): # 如果后面有其他标点或字符,可能不是完整句子 remaining_text = re.split(sentence_endings, text, 1)[-1] if len(remaining_text.strip()) > 0: return False # 对于包含结束符的句子,要求至少20个字符 if len(text.strip()) >= 20: return True # 对于较长的文本(超过50字符),即使没有结束符也可以考虑 if len(text.strip()) >= 50: return True # 对于中等长度的文本,如果包含常见完整句式模式 if len(text.strip()) >= 20: common_patterns = [ r'^[是的有没有来去在把被让叫请使].*[的得了吗呢吧啊呀]', r'^(你好|谢谢|再见|是的|不是|好的|没问题)', r'^[\u4e00-\u9fff]{4,8}[的得了]$' # 4-8个中文字+的/了/得 ] for pattern in common_patterns: if re.match(pattern, text): return True return False def _filter_parentheses_content(self, text): """过滤文本中的括号内容(包括中文和英文括号)- 从 recorder.py 移植""" import re # 移除中文括号内容:(内容) filtered_text = re.sub(r'([^)]*)', '', text) # 移除英文括号内容:(content) filtered_text = re.sub(r'\([^)]*\)', '', filtered_text) # 移除方括号内容:【内容】 filtered_text = re.sub(r'【[^】]*】', '', filtered_text) # 移除方括号内容:[content] filtered_text = re.sub(r'\[[^\]]*\]', '', filtered_text) # 移除书名号内容:「内容」 filtered_text = re.sub(r'「[^」]*」', '', filtered_text) # 清理多余空格 filtered_text = re.sub(r'\s+', ' ', filtered_text).strip() return filtered_text def play_greeting(self): """播放角色打招呼语音""" try: # 获取角色配置 character_config = self._load_character_config(self.config['processing']['character']) if not character_config or "greeting" not in character_config: print("⚠️ 角色配置中没有找到greeting字段,跳过打招呼") return True greeting_text = character_config["greeting"] print(f"🎭 播放角色打招呼: {greeting_text}") # 禁用录音功能,防止打招呼时录音 print("🛑 打招呼前禁用录音功能...") self.input_command_queue.put(ControlCommand('disable_recording')) # 设置状态为播放状态 self.state = RecordingState.PLAYING # 更新LED状态为打招呼效果 if LED_AVAILABLE: set_led_state(SystemState.GREETING) # 发送打招呼文本到TTS(带缓存支持) character_name = character_config.get("name", self.config['processing']['character']) print(f"📡 准备发送打招呼文本到输出进程: {greeting_text} (角色: {character_name})") success = self._send_greeting_to_output_process(greeting_text, character_name) if not success: print("❌ 打招呼TTS生成失败") return False else: print(f"✅ 打招呼文本已成功发送到输出进程") # 手动设置LLM完成状态(因为打招呼没有LLM生成过程) self._notify_llm_complete() # 发送结束信号(TTS_COMPLETE信号由TTS进程在真正完成时发送) self.output_audio_queue.put(None) print("✅ 打招呼语音已发送到输出队列") return True except Exception as e: print(f"❌ 播放打招呼失败: {e}") return False def _text_to_speech_streaming(self, text: str) -> bool: """文本转语音(流式)""" if not self.config['processing']['enable_tts']: return False try: print("🎵 开始文本转语音") print(f"📝 待转换文本: {text}") # 发送元数据 metadata_msg = f"METADATA:{text[:30]}..." print(f"📦 发送元数据: {metadata_msg}") self.output_audio_queue.put(metadata_msg) # 构建请求头 headers = { "X-Api-App-Id": self.api_config['tts']['app_id'], "X-Api-Access-Key": self.api_config['tts']['access_key'], "X-Api-Resource-Id": self.api_config['tts']['resource_id'], "X-Api-App-Key": self.api_config['tts']['app_key'], "Content-Type": "application/json", "Connection": "keep-alive" } # 构建请求参数 payload = { "user": { "uid": "multiprocess_tts" }, "req_params": { "text": text, "speaker": self.api_config['tts']['speaker'], "audio_params": { "format": "pcm", "sample_rate": self.config['audio']['sample_rate'], "enable_timestamp": True }, "additions": "{\"explicit_language\":\"zh\",\"disable_markdown_filter\":true, \"enable_timestamp\":true}\"}" } } # 发送请求 session = requests.Session() try: print(f"🌐 发送TTS请求到: {self.api_config['tts']['url']}") response = session.post( self.api_config['tts']['url'], headers=headers, json=payload, stream=True ) if response.status_code != 200: print(f"❌ TTS请求失败: {response.status_code}") return False print(f"✅ TTS请求成功,开始接收音频流") # 处理流式响应 total_audio_size = 0 chunk_count = 0 queue_size_before = self.output_audio_queue.qsize() for chunk in response.iter_lines(decode_unicode=True): if not chunk: continue try: data = json.loads(chunk) if data.get("code", 0) == 0 and "data" in data and data["data"]: chunk_audio = base64.b64decode(data["data"]) audio_size = len(chunk_audio) total_audio_size += audio_size chunk_count += 1 # 检查队列状态 current_queue_size = self.output_audio_queue.qsize() print(f"📦 发送音频块 {chunk_count}: {audio_size} 字节, 队列大小: {current_queue_size}") # 发送到输出进程 self.output_audio_queue.put(chunk_audio) # 检查是否发送成功 new_queue_size = self.output_audio_queue.qsize() if new_queue_size == current_queue_size + 1: print(f"✅ 音频块 {chunk_count} 发送成功") else: print(f"⚠️ 音频块 {chunk_count} 发送后队列大小异常: {current_queue_size} -> {new_queue_size}") # 显示进度 if chunk_count % 5 == 0: # 更频繁显示进度 progress = f"📥 TTS生成: {chunk_count} 块 | {total_audio_size / 1024:.1f} KB" print(f"\r{progress}", end='', flush=True) elif data.get("code", 0) == 0 and "sentence" in data and data["sentence"]: # 处理句子信息 - 新增 sentence_info = data["sentence"] print(f"📝 TTS句子信息: {sentence_info}") continue elif data.get("code", 0) == 20000000: print(f"🏁 收到TTS流结束信号") break elif data.get("code", 0) > 0: print(f"❌ TTS错误响应: {data}") except json.JSONDecodeError as e: print(f"❌ JSON解析错误: {e}") print(f"原始数据: {chunk}") continue print(f"\n✅ TTS音频生成完成: {chunk_count} 块, {total_audio_size / 1024:.1f} KB") print(f"📊 队列大小变化: {queue_size_before} -> {self.output_audio_queue.qsize()}") # 注意:TTS音频数据已经全部发送到队列,不需要等待队列清空 # 立即发送结束信号,让OutputProcess的增强播放完成检测机制处理 print(f"📦 TTS音频数据已全部发送到队列,立即发送结束信号") print(f"📦 让OutputProcess的增强播放完成检测机制来等待真正的播放完成") # 发送TTS完成信号 print(f"📦 发送TTS完成信号到输出进程") try: tts_complete_command = "TTS_COMPLETE:" self.output_audio_queue.put(tts_complete_command) print(f"📡 已发送TTS完成信号到输出进程") except Exception as e: print(f"❌ 发送TTS完成信号失败: {e}") # 发送结束信号,通知输出进程所有音频已发送完成 print(f"📦 发送结束信号到输出进程") print(f"📊 音频队列当前大小: {self.output_audio_queue.qsize()}") print(f"📦 注意:结束信号不会立即触发播放完成,会等待所有音频播放完成") try: self.output_audio_queue.put(None) print(f"📡 已发送结束信号到输出进程") except Exception as e: print(f"❌ 发送结束信号失败: {e}") return chunk_count > 0 finally: response.close() session.close() except Exception as e: print(f"❌ 文本转语音失败: {e}") import traceback print(f"❌ 详细错误: {traceback.format_exc()}") return False def _display_status(self): """显示系统状态""" # 每秒显示一次状态 if hasattr(self, '_last_status_time'): if time.time() - self._last_status_time < 1.0: return self._last_status_time = time.time() # 状态显示 status_lines = [ f"🎯 状态: {self.state.value}", f"📊 统计: 对话{self.stats['total_conversations']} | " f"录音{self.stats['total_recording_time']:.1f}s | " f"成功{self.stats['successful_processing']} | " f"失败{self.stats['failed_processing']}" ] # 队列状态 input_queue_size = self.input_command_queue.qsize() output_queue_size = self.output_audio_queue.qsize() if input_queue_size > 0 or output_queue_size > 0: status_lines.append(f"📦 队列: 输入{input_queue_size} | 输出{output_queue_size}") # 显示状态 status_str = " | ".join(status_lines) print(f"\r{status_str}", end='', flush=True) def display_status(self): """公开方法:显示系统状态""" return self._display_status() def shutdown(self): """关闭系统""" print("\n🛑 正在关闭系统...") self.running = False # 发送关闭命令 try: self.input_command_queue.put(ControlCommand('shutdown')) self.output_audio_queue.put(None) except: pass # 等待进程结束 if self.input_process: try: self.input_process.join(timeout=5) except: pass if self.output_process: try: self.output_process.join(timeout=5) except: pass # 显示最终统计 print("\n📊 最终统计:") print(f" 总对话次数: {self.stats['total_conversations']}") print(f" 总录音时长: {self.stats['total_recording_time']:.1f} 秒") print(f" 成功处理: {self.stats['successful_processing']}") print(f" 失败处理: {self.stats['failed_processing']}") success_rate = (self.stats['successful_processing'] / max(1, self.stats['successful_processing'] + self.stats['failed_processing']) * 100) print(f" 成功率: {success_rate:.1f}%") print("👋 系统已关闭") def enable_nfc(self): """启用NFC功能""" if self.nfc_enabled: print("⚠️ NFC功能已启用") return try: # 获取NFC管理器单例 self.nfc_manager = get_nfc_manager() self.nfc_manager.set_character_switch_callback(self._on_character_switch) self.nfc_manager.start() self.nfc_enabled = True print("✅ NFC功能已启用") except Exception as e: print(f"❌ 启用NFC功能失败: {e}") def disable_nfc(self): """禁用NFC功能""" if not self.nfc_enabled: return try: if self.nfc_manager: self.nfc_manager.stop() self.nfc_enabled = False print("🛑 NFC功能已禁用") except Exception as e: print(f"❌ 禁用NFC功能失败: {e}") def _on_character_switch(self, character_name: str): """角色切换回调函数""" print(f"🎭 NFC触发角色切换: {character_name}") # 检查角色配置是否存在 character_config = self._load_character_config(character_name) if not character_config: print(f"❌ 角色配置不存在: {character_name}") return # 显示角色切换LED效果 if LED_AVAILABLE: set_led_state(SystemState.CHARACTER_SWITCH) # === 紧急停止所有音频活动 === print("🚨 NFC切换:紧急停止所有音频活动...") self._emergency_stop_all_audio() # 更新角色配置 old_character = self.config['processing']['character'] self.config['processing']['character'] = character_name # 更新TTS语音配置 if character_config and "voice" in character_config: self.api_config['tts']['speaker'] = character_config["voice"] print(f"🎵 更新TTS语音: {character_config['voice']}") # 向输出进程发送更新TTS speaker的命令 try: update_command = f"UPDATE_TTS_SPEAKER:{character_config['voice']}" self.output_audio_queue.put(update_command) print(f"📡 已发送TTS speaker更新命令到输出进程") except Exception as e: print(f"❌ 发送TTS speaker更新命令失败: {e}") print(f"🔄 角色已切换: {old_character} -> {character_name}") # 清空聊天历史 self.clear_chat_history() print(f"🔄 角色切换,聊天历史已清空") # 播放新角色打招呼 print("🎭 播放新角色打招呼...") greeting_success = self.play_greeting() if greeting_success: print("✅ 角色切换完成") # 注意:不立即恢复NFC等待状态,让打招呼LED效果持续显示 # LED状态会在播放完成后自动恢复 else: print("⚠️ 打招呼播放失败,继续运行...") # 即使打招呼失败,也恢复NFC等待状态 if LED_AVAILABLE: set_led_state(SystemState.WAITING_NFC) def _emergency_stop_all_audio(self): """紧急停止所有音频活动 - 用于NFC切换时立即停止""" print("🚨 开始紧急停止所有音频活动...") try: # 1. 立即停止录音和监听 print("🛑 立即停止录音和监听...") if hasattr(self, '_monitoring_active') and self._monitoring_active: self.input_command_queue.put(ControlCommand('stop_monitoring')) self._monitoring_active = False if self.state == RecordingState.RECORDING: self.input_command_queue.put(ControlCommand('stop_recording')) # 强制禁用录音功能,防止立即重新开始 self.input_command_queue.put(ControlCommand('disable_recording')) # 发送紧急停止命令到输入进程 self.input_command_queue.put(ControlCommand('emergency_stop')) print("✅ 录音和监听已停止,紧急停止命令已发送") # 2. 立即停止音频播放 print("🛑 立即停止音频播放...") # 向输出进程发送紧急停止命令 try: emergency_stop_command = "EMERGENCY_STOP:" self.output_audio_queue.put(emergency_stop_command) print("✅ 紧急停止命令已发送到输出进程") except Exception as e: print(f"❌ 发送紧急停止命令失败: {e}") # 3. 清空所有音频队列 print("🛑 清空音频队列...") try: # 清空输入队列 while not self.input_command_queue.empty(): try: self.input_command_queue.get_nowait() except queue.Empty: break # 清空输出队列(保留None结束信号) temp_queue = [] while not self.output_audio_queue.empty(): try: item = self.output_audio_queue.get_nowait() if item is None: # 保留结束信号 temp_queue.append(item) except queue.Empty: break # 将保留的信号放回队列 for item in temp_queue: self.output_audio_queue.put(item) print("✅ 音频队列已清空") except Exception as e: print(f"❌ 清空队列时出错: {e}") # 4. 重置所有状态 print("🛑 重置所有状态...") self.state = RecordingState.IDLE self.processing_complete = True self.playback_complete = True self.current_audio_data = None self.current_audio_metadata = None # 重置录音状态标志 if hasattr(self, '_monitoring_active'): self._monitoring_active = False if hasattr(self, '_just_finished_playing'): self._just_finished_playing = True # 防止立即重新开始录音 print("✅ 所有状态已重置") # 5. 等待一小段时间确保音频设备完全停止 print("⏱️ 等待音频设备完全停止...") time.sleep(0.5) print("🚨 紧急停止完成") except Exception as e: print(f"❌ 紧急停止过程中出错: {e}") import traceback traceback.print_exc() def get_nfc_status(self): """获取NFC状态""" if not self.nfc_enabled or not self.nfc_manager: return {"enabled": False, "current_character": None} return { "enabled": True, "current_character": self.nfc_manager.get_current_character(), "running": self.nfc_manager.running } def _add_to_chat_history(self, user_message, assistant_response): """添加对话到历史记录""" import time # 如果历史记录超过最大长度,移除最早的记录 if len(self.chat_history) >= self.max_history_length: self.chat_history.pop(0) # 添加新的对话记录 self.chat_history.append({ "user": user_message, "assistant": assistant_response, "timestamp": time.time() }) def clear_chat_history(self): """清空聊天历史""" self.chat_history = [] print("💬 聊天历史已清空") def get_chat_history_summary(self): """获取聊天历史摘要""" if not self.chat_history: return "暂无聊天历史" return f"当前有 {len(self.chat_history)} 轮对话记录" def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='多进程音频控制系统') parser.add_argument('--character', '-c', type=str, default='libai', help='选择角色 (默认: libai)') parser.add_argument('--config', type=str, help='配置文件路径') args = parser.parse_args() # 加载配置 config = None if args.config: try: with open(args.config, 'r', encoding='utf-8') as f: config = json.load(f) except Exception as e: print(f"⚠️ 配置文件加载失败: {e}") # 创建控制系统 control_system = ControlSystem(config) # 设置角色 if args.character: control_system.config['processing']['character'] = args.character # 启动系统 control_system.start() if __name__ == "__main__": main()