#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LED灯光控制器 为多进程音频录音系统提供状态LED灯光效果 """ import time import threading import random import math from typing import Dict, Any, Optional, Tuple from enum import Enum import logging # 尝试导入LED驱动库 try: import RPi.GPIO as GPIO from rpi_ws281x import Color, PixelStrip LED_AVAILABLE = True except ImportError: LED_AVAILABLE = False print("⚠️ LED驱动库未安装,LED功能将被禁用") class SystemState(Enum): """系统状态枚举""" STARTUP = "startup" WAITING_NFC = "waiting_nfc" CALIBRATING = "calibrating" IDLE_MONITORING = "idle_monitoring" RECORDING = "recording" PROCESSING = "processing" PLAYING = "playing" CHARACTER_SWITCH = "character_switch" GREETING = "greeting" # 新增:打招呼状态 ERROR = "error" SHUTDOWN = "shutdown" class LEDController: """LED灯光控制器""" def __init__(self, led_count=10, brightness=100, led_pin=10): """ 初始化LED控制器 Args: led_count: LED数量 brightness: 亮度 (0-255) led_pin: GPIO引脚号 """ self.led_count = led_count self.brightness = brightness self.led_pin = led_pin self.available = LED_AVAILABLE # LED控制对象 self.strip = None # 状态控制 self.current_state = SystemState.SHUTDOWN self.running = False self.effect_thread = None self.effect_lock = threading.Lock() # 特效参数 self.effect_params = {} # 初始化LED if self.available: self._init_led() # 设置日志 self.logger = logging.getLogger(__name__) def _init_led(self): """初始化LED硬件""" try: # WS2812B配置 LED_FREQ_HZ = 800000 # LED信号频率 LED_DMA = 10 # DMA通道 LED_INVERT = False # 是否反转信号 LED_CHANNEL = 0 # PWM通道 # 创建PixelStrip对象 self.strip = PixelStrip( self.led_count, self.led_pin, LED_FREQ_HZ, LED_DMA, LED_INVERT, self.brightness, LED_CHANNEL ) self.strip.begin() # 初始化后清空 time.sleep(0.1) self.clear_all() print(f"✅ LED控制器初始化成功: {self.led_count}个LED, 引脚GPIO{self.led_pin}") except Exception as e: print(f"❌ LED初始化失败: {e}") self.available = False def start(self): """启动LED控制器""" if not self.available: print("⚠️ LED不可用,跳过启动") return False self.running = True # 启动特效线程 self.effect_thread = threading.Thread(target=self._effect_loop, daemon=True) self.effect_thread.start() # 设置启动状态 self.set_state(SystemState.STARTUP) print("✅ LED控制器已启动") return True def stop(self): """停止LED控制器""" if not self.available or not self.running: return self.running = False # 等待特效线程结束 if self.effect_thread and self.effect_thread.is_alive(): self.effect_thread.join(timeout=2) # 关闭LED self.clear_all() print("🛑 LED控制器已停止") def set_state(self, state: SystemState, **params): """ 设置系统状态和对应的LED效果 Args: state: 系统状态 **params: 特效参数 """ if not self.available: return with self.effect_lock: self.current_state = state self.effect_params = params self.logger.info(f"LED状态切换: {state.value}") def clear_all(self): """清空所有LED""" if not self.available: return try: for i in range(self.strip.numPixels()): self.strip.setPixelColor(i, Color(0, 0, 0)) self.strip.show() time.sleep(0.1) except Exception as e: self.logger.error(f"清空LED失败: {e}") def _effect_loop(self): """特效循环""" while self.running: try: with self.effect_lock: state = self.current_state params = self.effect_params.copy() # 根据状态执行对应特效 if state == SystemState.STARTUP: self._startup_effect() elif state == SystemState.WAITING_NFC: self._waiting_nfc_effect() elif state == SystemState.CALIBRATING: self._calibrating_effect() elif state == SystemState.IDLE_MONITORING: self._idle_monitoring_effect() elif state == SystemState.RECORDING: self._recording_effect() elif state == SystemState.PROCESSING: self._processing_effect() elif state == SystemState.PLAYING: self._playing_effect() elif state == SystemState.CHARACTER_SWITCH: self._character_switch_effect() elif state == SystemState.GREETING: self._greeting_effect() elif state == SystemState.ERROR: self._error_effect() elif state == SystemState.SHUTDOWN: self._shutdown_effect() time.sleep(0.05) # 控制特效刷新率 except Exception as e: self.logger.error(f"特效循环错误: {e}") time.sleep(1) def _startup_effect(self): """启动效果 - 彩虹渐变""" colors = self._rainbow_colors() self._show_moving_colors(colors, speed=0.1) def _waiting_nfc_effect(self): """等待NFC效果 - 彩虹效果""" self._rainbow_effect() def _calibrating_effect(self): """校准效果 - 黄色跑马灯""" self._running_light_effect((255, 255, 0), speed=0.15) def _idle_monitoring_effect(self): """空闲监听效果 - 脉冲效果""" self._pulse_effect_enhanced((0, 255, 0), speed=0.05) def _recording_effect(self): """录音效果 - 脉冲效果""" self._pulse_effect_enhanced((255, 0, 0), speed=0.05) def _processing_effect(self): """处理效果 - 剧院追逐效果""" self._theater_chase_effect() def _playing_effect(self): """播放效果 - 彩虹循环效果""" self._rainbow_cycle_effect() def _character_switch_effect(self): """角色切换效果 - 彩色波浪""" colors = [(255, 0, 0), (255, 127, 0), (255, 255, 0), (0, 255, 0), (0, 0, 255), (75, 0, 130), (148, 0, 211)] self._wave_effect_multi(colors, speed=0.3) def _error_effect(self): """错误效果 - 红色闪烁""" self._blink_effect((255, 0, 0), speed=0.5) def _shutdown_effect(self): """关闭效果 - 渐暗""" self._fade_out_effect(speed=0.1) # ===== 基础特效函数 ===== def _rainbow_effect(self): """彩虹效果""" if not hasattr(self, '_rainbow_phase'): self._rainbow_phase = 0 colors = [] for i in range(self.led_count): color_index = (i + self._rainbow_phase) % 7 colors.append(self._get_rainbow_color(color_index)) self._show_colors(colors) self._rainbow_phase = (self._rainbow_phase + 1) % 7 time.sleep(0.1) def _pulse_effect_enhanced(self, color: Tuple[int, int, int], speed=0.05): """真正的脉冲效果""" if not hasattr(self, '_pulse_enhanced_state'): self._pulse_enhanced_state = { 'phase': 'expand', # 'expand' 或 'contract' 'position': 0, 'max_position': self.led_count // 2 } state = self._pulse_enhanced_state colors = [(0, 0, 0)] * self.led_count if state['phase'] == 'expand': # 从中心向外扩散 if state['position'] <= state['max_position']: # 设置左右对称的LED if state['position'] < self.led_count: colors[state['position']] = color colors[self.led_count - 1 - state['position']] = color state['position'] += 1 else: # 切换到收缩阶段 state['phase'] = 'contract' state['position'] = state['max_position'] elif state['phase'] == 'contract': # 从边缘向中心收缩 if state['position'] >= 0: # 设置左右对称的LED if state['position'] < self.led_count: colors[state['position']] = color colors[self.led_count - 1 - state['position']] = color state['position'] -= 1 else: # 切换到扩散阶段 state['phase'] = 'expand' state['position'] = 0 self._show_colors(colors) time.sleep(speed) def _theater_chase_effect(self): """剧院追逐效果""" if not hasattr(self, '_theater_chase_phase'): self._theater_chase_phase = 0 colors = [] for i in range(self.led_count): if (i + self._theater_chase_phase) % 3 == 0: colors.append((255, 255, 0)) # 黄色 else: colors.append((0, 0, 0)) self._show_colors(colors) self._theater_chase_phase = (self._theater_chase_phase + 1) % 3 time.sleep(0.1) def _rainbow_cycle_effect(self): """彩虹循环效果""" if not hasattr(self, '_rainbow_cycle_phase'): self._rainbow_cycle_phase = 0 colors = [] for i in range(self.led_count): pixel_index = (self._rainbow_cycle_phase * 256 // self.led_count + i) % 256 colors.append(self._wheel(pixel_index)) self._show_colors(colors) self._rainbow_cycle_phase = (self._rainbow_cycle_phase + 1) % 256 time.sleep(0.02) def _get_rainbow_color(self, index): """获取彩虹颜色""" rainbow_colors = [ (150, 0, 0), # 红 (150, 75, 0), # 橙 (150, 150, 0), # 黄 (0, 150, 0), # 绿 (0, 0, 150), # 蓝 (50, 0, 100), # 靛 (100, 0, 150) # 紫 ] return rainbow_colors[index % len(rainbow_colors)] def _wheel(self, pos): """生成彩虹颜色(来自wheel函数)""" pos = 255 - pos if pos < 85: return (255 - pos * 3, 0, pos * 3) elif pos < 170: pos -= 85 return (0, pos * 3, 255 - pos * 3) else: pos -= 170 return (pos * 3, 255 - pos * 3, 0) def _breathing_effect(self, color: Tuple[int, int, int], min_brightness=0.1, max_brightness=0.8, speed=0.05): """呼吸效果""" steps = 50 brightness = min_brightness direction = 1 for _ in range(steps): brightness += direction * (max_brightness - min_brightness) / steps if brightness >= max_brightness or brightness <= min_brightness: direction *= -1 r = int(color[0] * brightness) g = int(color[1] * brightness) b = int(color[2] * brightness) colors = [(r, g, b)] * self.led_count self._show_colors(colors) time.sleep(speed) def _running_light_effect(self, color: Tuple[int, int, int], speed=0.15): """跑马灯效果""" colors = [(0, 0, 0)] * self.led_count if not hasattr(self, '_running_position'): self._running_position = 0 colors[self._running_position] = color # 添加拖尾效果 for i in range(1, 4): pos = (self._running_position - i) % self.led_count brightness = 1.0 - (i * 0.3) if brightness > 0: colors[pos] = tuple(int(c * brightness) for c in color) self._show_colors(colors) self._running_position = (self._running_position + 1) % self.led_count time.sleep(speed) def _pulse_effect(self, color: Tuple[int, int, int], speed=0.3): """脉冲效果""" colors = [(0, 0, 0)] * self.led_count if not hasattr(self, '_pulse_phase'): self._pulse_phase = 0 # 计算脉冲位置 center = self.led_count // 2 radius = abs(math.sin(self._pulse_phase)) * (self.led_count // 2) for i in range(self.led_count): distance = abs(i - center) if distance <= radius: brightness = 1.0 - (distance / max(radius, 1)) colors[i] = tuple(int(c * brightness) for c in color) self._show_colors(colors) self._pulse_phase += speed * 0.5 time.sleep(speed) def _rotating_effect(self, color: Tuple[int, int, int], speed=0.2): """旋转效果""" colors = [(0, 0, 0)] * self.led_count if not hasattr(self, '_rotate_angle'): self._rotate_angle = 0 # 创建旋转图案 for i in range(self.led_count): angle = (i * 2 * math.pi / self.led_count) + self._rotate_angle brightness = (math.sin(angle) + 1) / 2 colors[i] = tuple(int(c * brightness) for c in color) self._show_colors(colors) self._rotate_angle += speed time.sleep(speed) def _wave_effect(self, color: Tuple[int, int, int], speed=0.4): """波浪效果""" colors = [(0, 0, 0)] * self.led_count if not hasattr(self, '_wave_offset'): self._wave_offset = 0 for i in range(self.led_count): # 使用正弦波创建波浪效果 wave_value = math.sin((i + self._wave_offset) * 0.5) brightness = (wave_value + 1) / 2 colors[i] = tuple(int(c * brightness) for c in color) self._show_colors(colors) self._wave_offset += speed time.sleep(speed) def _wave_effect_multi(self, colors: list, speed=0.3): """多彩波浪效果""" color_list = [(0, 0, 0)] * self.led_count if not hasattr(self, '_wave_multi_offset'): self._wave_multi_offset = 0 for i in range(self.led_count): # 创建波浪效果并映射到多种颜色 wave_value = math.sin((i + self._wave_multi_offset) * 0.3) color_index = int((wave_value + 1) / 2 * len(colors)) % len(colors) brightness = (math.sin((i + self._wave_multi_offset) * 0.7) + 1) / 2 color_list[i] = tuple(int(c * brightness) for c in colors[color_index]) self._show_colors(color_list) self._wave_multi_offset += speed time.sleep(speed) def _blink_effect(self, color: Tuple[int, int, int], speed=0.5): """闪烁效果""" if not hasattr(self, '_blink_state'): self._blink_state = False if self._blink_state: colors = [color] * self.led_count else: colors = [(0, 0, 0)] * self.led_count self._show_colors(colors) self._blink_state = not self._blink_state time.sleep(speed) def _gentle_blink_effect(self, color: Tuple[int, int, int], intensity=0.3, speed=1.0): """微弱闪烁效果""" if not hasattr(self, '_gentle_blink_phase'): self._gentle_blink_phase = 0 brightness = intensity * (math.sin(self._gentle_blink_phase) + 1) / 2 colors = [tuple(int(c * brightness) for c in color)] * self.led_count self._show_colors(colors) self._gentle_blink_phase += speed * 0.1 time.sleep(speed) def _fade_out_effect(self, speed=0.1): """渐暗效果""" if not hasattr(self, '_fade_brightness'): self._fade_brightness = 1.0 brightness = max(0, self._fade_brightness - speed) colors = [(int(255 * brightness), int(255 * brightness), int(255 * brightness))] * self.led_count self._show_colors(colors) self._fade_brightness = brightness if brightness <= 0: self.clear_all() def _show_moving_colors(self, colors: list, speed=0.1): """移动色彩效果""" if not hasattr(self, '_color_offset'): self._color_offset = 0 color_list = [] for i in range(self.led_count): color_index = (i + self._color_offset) % len(colors) color_list.append(colors[color_index]) self._show_colors(color_list) self._color_offset = (self._color_offset + 1) % len(colors) time.sleep(speed) def _greeting_effect(self): """打招呼效果 - 温暖的彩色脉冲""" colors = [(0, 0, 0)] * self.led_count if not hasattr(self, '_greeting_phase'): self._greeting_phase = 0 self._greeting_start_time = time.time() # 打招呼效果持续10秒 elapsed_time = time.time() - self._greeting_start_time if elapsed_time > 10: # 自动重置,避免卡在打招呼状态 self._greeting_phase = 0 self._greeting_start_time = time.time() # 创建温暖的彩色脉冲效果 for i in range(self.led_count): # 使用多个正弦波创建复杂的脉冲效果 wave1 = math.sin((i * 0.8 + self._greeting_phase * 0.1) * math.pi) wave2 = math.sin((i * 0.5 + self._greeting_phase * 0.08) * math.pi * 1.5) # 混合波形 combined_wave = (wave1 + wave2) / 2 # 确保亮度在合理范围内 brightness = max(0, min(1, (combined_wave + 1) / 2)) # 创建温暖的颜色变化 if i < self.led_count // 3: # 红橙色部分 color = (255, int(200 * brightness), int(50 * brightness)) elif i < 2 * self.led_count // 3: # 黄色部分 color = (int(255 * brightness), int(255 * brightness), int(100 * brightness)) else: # 粉紫色部分 color = (int(255 * brightness), int(150 * brightness), int(255 * brightness)) colors[i] = tuple(int(c * brightness) for c in color) self._show_colors(colors) self._greeting_phase += 1 time.sleep(0.08) # ===== 工具函数 ===== def _rainbow_colors(self, count=7): """生成彩虹色彩""" colors = [] for i in range(count): hue = i * 360 / count colors.append(self._hsv_to_rgb(hue, 1.0, 1.0)) return colors def _hsv_to_rgb(self, h: float, s: float, v: float) -> Tuple[int, int, int]: """HSV颜色转RGB""" h = h / 60.0 c = v * s x = c * (1 - abs((h % 2) - 1)) m = v - c if 0 <= h < 1: r, g, b = c, x, 0 elif 1 <= h < 2: r, g, b = x, c, 0 elif 2 <= h < 3: r, g, b = 0, c, x elif 3 <= h < 4: r, g, b = 0, x, c elif 4 <= h < 5: r, g, b = x, 0, c else: r, g, b = c, 0, x return (int((r + m) * 255), int((g + m) * 255), int((b + m) * 255)) def _show_colors(self, colors: list): """显示颜色""" if not self.available: return try: for i, (r, g, b) in enumerate(colors): if i < self.strip.numPixels(): self.strip.setPixelColor(i, Color(r, g, b)) self.strip.show() except Exception as e: self.logger.error(f"显示颜色失败: {e}") # ===== 全局LED控制器实例 ===== _led_controller = None def get_led_controller(led_count=10, brightness=100, led_pin=10) -> LEDController: """获取LED控制器单例""" global _led_controller if _led_controller is None: _led_controller = LEDController(led_count, brightness, led_pin) return _led_controller def start_led_effects(led_count=10, brightness=100, led_pin=10): """启动LED特效系统""" controller = get_led_controller(led_count, brightness, led_pin) return controller.start() def stop_led_effects(): """停止LED特效系统""" global _led_controller if _led_controller: _led_controller.stop() _led_controller = None def set_led_state(state: SystemState, **params): """设置LED状态""" global _led_controller if _led_controller: _led_controller.set_state(state, **params) if __name__ == "__main__": # 测试LED控制器 print("🎆 测试LED控制器...") controller = LEDController(led_count=10, brightness=100) if not controller.available: print("❌ LED不可用") exit(1) try: controller.start() # 测试各种状态 states = [ SystemState.STARTUP, SystemState.WAITING_NFC, SystemState.CALIBRATING, SystemState.IDLE_MONITORING, SystemState.RECORDING, SystemState.PROCESSING, SystemState.PLAYING, SystemState.CHARACTER_SWITCH, SystemState.ERROR, SystemState.SHUTDOWN ] for state in states: print(f"\n🎭 测试状态: {state.value}") controller.set_state(state) time.sleep(3) print("\n✅ LED控制器测试完成") except KeyboardInterrupt: print("\n👋 用户中断") finally: controller.stop()