Local-Voice/led_controller.py
2025-09-27 14:46:56 +08:00

684 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LED灯光控制器
为多进程音频录音系统提供状态LED灯光效果
"""
import time
import threading
import random
import math
from typing import Dict, Any, Optional, Tuple
from enum import Enum
import logging
# 尝试导入LED驱动库
try:
import RPi.GPIO as GPIO
from rpi_ws281x import Color, PixelStrip
LED_AVAILABLE = True
except ImportError:
LED_AVAILABLE = False
print("⚠️ LED驱动库未安装LED功能将被禁用")
class SystemState(Enum):
"""系统状态枚举"""
STARTUP = "startup"
WAITING_NFC = "waiting_nfc"
CALIBRATING = "calibrating"
IDLE_MONITORING = "idle_monitoring"
RECORDING = "recording"
PROCESSING = "processing"
PLAYING = "playing"
CHARACTER_SWITCH = "character_switch"
GREETING = "greeting" # 新增:打招呼状态
ERROR = "error"
SHUTDOWN = "shutdown"
class LEDController:
"""LED灯光控制器"""
def __init__(self, led_count=10, brightness=100, led_pin=10):
"""
初始化LED控制器
Args:
led_count: LED数量
brightness: 亮度 (0-255)
led_pin: GPIO引脚号
"""
self.led_count = led_count
self.brightness = brightness
self.led_pin = led_pin
self.available = LED_AVAILABLE
# LED控制对象
self.strip = None
# 状态控制
self.current_state = SystemState.SHUTDOWN
self.running = False
self.effect_thread = None
self.effect_lock = threading.Lock()
# 特效参数
self.effect_params = {}
# 初始化LED
if self.available:
self._init_led()
# 设置日志
self.logger = logging.getLogger(__name__)
def _init_led(self):
"""初始化LED硬件"""
try:
# WS2812B配置
LED_FREQ_HZ = 800000 # LED信号频率
LED_DMA = 10 # DMA通道
LED_INVERT = False # 是否反转信号
LED_CHANNEL = 0 # PWM通道
# 创建PixelStrip对象
self.strip = PixelStrip(
self.led_count, self.led_pin, LED_FREQ_HZ,
LED_DMA, LED_INVERT, self.brightness, LED_CHANNEL
)
self.strip.begin()
# 初始化后清空
time.sleep(0.1)
self.clear_all()
print(f"✅ LED控制器初始化成功: {self.led_count}个LED, 引脚GPIO{self.led_pin}")
except Exception as e:
print(f"❌ LED初始化失败: {e}")
self.available = False
def start(self):
"""启动LED控制器"""
if not self.available:
print("⚠️ LED不可用跳过启动")
return False
self.running = True
# 启动特效线程
self.effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
self.effect_thread.start()
# 设置启动状态
self.set_state(SystemState.STARTUP)
print("✅ LED控制器已启动")
return True
def stop(self):
"""停止LED控制器"""
if not self.available or not self.running:
return
self.running = False
# 等待特效线程结束
if self.effect_thread and self.effect_thread.is_alive():
self.effect_thread.join(timeout=2)
# 关闭LED
self.clear_all()
print("🛑 LED控制器已停止")
def set_state(self, state: SystemState, **params):
"""
设置系统状态和对应的LED效果
Args:
state: 系统状态
**params: 特效参数
"""
if not self.available:
return
with self.effect_lock:
self.current_state = state
self.effect_params = params
self.logger.info(f"LED状态切换: {state.value}")
def clear_all(self):
"""清空所有LED"""
if not self.available:
return
try:
for i in range(self.strip.numPixels()):
self.strip.setPixelColor(i, Color(0, 0, 0))
self.strip.show()
time.sleep(0.1)
except Exception as e:
self.logger.error(f"清空LED失败: {e}")
def _effect_loop(self):
"""特效循环"""
while self.running:
try:
with self.effect_lock:
state = self.current_state
params = self.effect_params.copy()
# 根据状态执行对应特效
if state == SystemState.STARTUP:
self._startup_effect()
elif state == SystemState.WAITING_NFC:
self._waiting_nfc_effect()
elif state == SystemState.CALIBRATING:
self._calibrating_effect()
elif state == SystemState.IDLE_MONITORING:
self._idle_monitoring_effect()
elif state == SystemState.RECORDING:
self._recording_effect()
elif state == SystemState.PROCESSING:
self._processing_effect()
elif state == SystemState.PLAYING:
self._playing_effect()
elif state == SystemState.CHARACTER_SWITCH:
self._character_switch_effect()
elif state == SystemState.GREETING:
self._greeting_effect()
elif state == SystemState.ERROR:
self._error_effect()
elif state == SystemState.SHUTDOWN:
self._shutdown_effect()
time.sleep(0.05) # 控制特效刷新率
except Exception as e:
self.logger.error(f"特效循环错误: {e}")
time.sleep(1)
def _startup_effect(self):
"""启动效果 - 彩虹渐变"""
colors = self._rainbow_colors()
self._show_moving_colors(colors, speed=0.1)
def _waiting_nfc_effect(self):
"""等待NFC效果 - 彩虹效果"""
self._rainbow_effect()
def _calibrating_effect(self):
"""校准效果 - 黄色跑马灯"""
self._running_light_effect((255, 255, 0), speed=0.15)
def _idle_monitoring_effect(self):
"""空闲监听效果 - 脉冲效果"""
self._pulse_effect_enhanced((0, 255, 0), speed=0.05)
def _recording_effect(self):
"""录音效果 - 脉冲效果"""
self._pulse_effect_enhanced((255, 0, 0), speed=0.05)
def _processing_effect(self):
"""处理效果 - 剧院追逐效果"""
self._theater_chase_effect()
def _playing_effect(self):
"""播放效果 - 彩虹循环效果"""
self._rainbow_cycle_effect()
def _character_switch_effect(self):
"""角色切换效果 - 彩色波浪"""
colors = [(255, 0, 0), (255, 127, 0), (255, 255, 0), (0, 255, 0), (0, 0, 255), (75, 0, 130), (148, 0, 211)]
self._wave_effect_multi(colors, speed=0.3)
def _error_effect(self):
"""错误效果 - 红色闪烁"""
self._blink_effect((255, 0, 0), speed=0.5)
def _shutdown_effect(self):
"""关闭效果 - 渐暗"""
self._fade_out_effect(speed=0.1)
# ===== 基础特效函数 =====
def _rainbow_effect(self):
"""彩虹效果"""
if not hasattr(self, '_rainbow_phase'):
self._rainbow_phase = 0
colors = []
for i in range(self.led_count):
color_index = (i + self._rainbow_phase) % 7
colors.append(self._get_rainbow_color(color_index))
self._show_colors(colors)
self._rainbow_phase = (self._rainbow_phase + 1) % 7
time.sleep(0.1)
def _pulse_effect_enhanced(self, color: Tuple[int, int, int], speed=0.05):
"""真正的脉冲效果"""
if not hasattr(self, '_pulse_enhanced_state'):
self._pulse_enhanced_state = {
'phase': 'expand', # 'expand' 或 'contract'
'position': 0,
'max_position': self.led_count // 2
}
state = self._pulse_enhanced_state
colors = [(0, 0, 0)] * self.led_count
if state['phase'] == 'expand':
# 从中心向外扩散
if state['position'] <= state['max_position']:
# 设置左右对称的LED
if state['position'] < self.led_count:
colors[state['position']] = color
colors[self.led_count - 1 - state['position']] = color
state['position'] += 1
else:
# 切换到收缩阶段
state['phase'] = 'contract'
state['position'] = state['max_position']
elif state['phase'] == 'contract':
# 从边缘向中心收缩
if state['position'] >= 0:
# 设置左右对称的LED
if state['position'] < self.led_count:
colors[state['position']] = color
colors[self.led_count - 1 - state['position']] = color
state['position'] -= 1
else:
# 切换到扩散阶段
state['phase'] = 'expand'
state['position'] = 0
self._show_colors(colors)
time.sleep(speed)
def _theater_chase_effect(self):
"""剧院追逐效果"""
if not hasattr(self, '_theater_chase_phase'):
self._theater_chase_phase = 0
colors = []
for i in range(self.led_count):
if (i + self._theater_chase_phase) % 3 == 0:
colors.append((255, 255, 0)) # 黄色
else:
colors.append((0, 0, 0))
self._show_colors(colors)
self._theater_chase_phase = (self._theater_chase_phase + 1) % 3
time.sleep(0.1)
def _rainbow_cycle_effect(self):
"""彩虹循环效果"""
if not hasattr(self, '_rainbow_cycle_phase'):
self._rainbow_cycle_phase = 0
colors = []
for i in range(self.led_count):
pixel_index = (self._rainbow_cycle_phase * 256 // self.led_count + i) % 256
colors.append(self._wheel(pixel_index))
self._show_colors(colors)
self._rainbow_cycle_phase = (self._rainbow_cycle_phase + 1) % 256
time.sleep(0.02)
def _get_rainbow_color(self, index):
"""获取彩虹颜色"""
rainbow_colors = [
(150, 0, 0), # 红
(150, 75, 0), # 橙
(150, 150, 0), # 黄
(0, 150, 0), # 绿
(0, 0, 150), # 蓝
(50, 0, 100), # 靛
(100, 0, 150) # 紫
]
return rainbow_colors[index % len(rainbow_colors)]
def _wheel(self, pos):
"""生成彩虹颜色来自wheel函数"""
pos = 255 - pos
if pos < 85:
return (255 - pos * 3, 0, pos * 3)
elif pos < 170:
pos -= 85
return (0, pos * 3, 255 - pos * 3)
else:
pos -= 170
return (pos * 3, 255 - pos * 3, 0)
def _breathing_effect(self, color: Tuple[int, int, int], min_brightness=0.1, max_brightness=0.8, speed=0.05):
"""呼吸效果"""
steps = 50
brightness = min_brightness
direction = 1
for _ in range(steps):
brightness += direction * (max_brightness - min_brightness) / steps
if brightness >= max_brightness or brightness <= min_brightness:
direction *= -1
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
colors = [(r, g, b)] * self.led_count
self._show_colors(colors)
time.sleep(speed)
def _running_light_effect(self, color: Tuple[int, int, int], speed=0.15):
"""跑马灯效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_running_position'):
self._running_position = 0
colors[self._running_position] = color
# 添加拖尾效果
for i in range(1, 4):
pos = (self._running_position - i) % self.led_count
brightness = 1.0 - (i * 0.3)
if brightness > 0:
colors[pos] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._running_position = (self._running_position + 1) % self.led_count
time.sleep(speed)
def _pulse_effect(self, color: Tuple[int, int, int], speed=0.3):
"""脉冲效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_pulse_phase'):
self._pulse_phase = 0
# 计算脉冲位置
center = self.led_count // 2
radius = abs(math.sin(self._pulse_phase)) * (self.led_count // 2)
for i in range(self.led_count):
distance = abs(i - center)
if distance <= radius:
brightness = 1.0 - (distance / max(radius, 1))
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._pulse_phase += speed * 0.5
time.sleep(speed)
def _rotating_effect(self, color: Tuple[int, int, int], speed=0.2):
"""旋转效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_rotate_angle'):
self._rotate_angle = 0
# 创建旋转图案
for i in range(self.led_count):
angle = (i * 2 * math.pi / self.led_count) + self._rotate_angle
brightness = (math.sin(angle) + 1) / 2
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._rotate_angle += speed
time.sleep(speed)
def _wave_effect(self, color: Tuple[int, int, int], speed=0.4):
"""波浪效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_wave_offset'):
self._wave_offset = 0
for i in range(self.led_count):
# 使用正弦波创建波浪效果
wave_value = math.sin((i + self._wave_offset) * 0.5)
brightness = (wave_value + 1) / 2
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._wave_offset += speed
time.sleep(speed)
def _wave_effect_multi(self, colors: list, speed=0.3):
"""多彩波浪效果"""
color_list = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_wave_multi_offset'):
self._wave_multi_offset = 0
for i in range(self.led_count):
# 创建波浪效果并映射到多种颜色
wave_value = math.sin((i + self._wave_multi_offset) * 0.3)
color_index = int((wave_value + 1) / 2 * len(colors)) % len(colors)
brightness = (math.sin((i + self._wave_multi_offset) * 0.7) + 1) / 2
color_list[i] = tuple(int(c * brightness) for c in colors[color_index])
self._show_colors(color_list)
self._wave_multi_offset += speed
time.sleep(speed)
def _blink_effect(self, color: Tuple[int, int, int], speed=0.5):
"""闪烁效果"""
if not hasattr(self, '_blink_state'):
self._blink_state = False
if self._blink_state:
colors = [color] * self.led_count
else:
colors = [(0, 0, 0)] * self.led_count
self._show_colors(colors)
self._blink_state = not self._blink_state
time.sleep(speed)
def _gentle_blink_effect(self, color: Tuple[int, int, int], intensity=0.3, speed=1.0):
"""微弱闪烁效果"""
if not hasattr(self, '_gentle_blink_phase'):
self._gentle_blink_phase = 0
brightness = intensity * (math.sin(self._gentle_blink_phase) + 1) / 2
colors = [tuple(int(c * brightness) for c in color)] * self.led_count
self._show_colors(colors)
self._gentle_blink_phase += speed * 0.1
time.sleep(speed)
def _fade_out_effect(self, speed=0.1):
"""渐暗效果"""
if not hasattr(self, '_fade_brightness'):
self._fade_brightness = 1.0
brightness = max(0, self._fade_brightness - speed)
colors = [(int(255 * brightness), int(255 * brightness), int(255 * brightness))] * self.led_count
self._show_colors(colors)
self._fade_brightness = brightness
if brightness <= 0:
self.clear_all()
def _show_moving_colors(self, colors: list, speed=0.1):
"""移动色彩效果"""
if not hasattr(self, '_color_offset'):
self._color_offset = 0
color_list = []
for i in range(self.led_count):
color_index = (i + self._color_offset) % len(colors)
color_list.append(colors[color_index])
self._show_colors(color_list)
self._color_offset = (self._color_offset + 1) % len(colors)
time.sleep(speed)
def _greeting_effect(self):
"""打招呼效果 - 温暖的彩色脉冲"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_greeting_phase'):
self._greeting_phase = 0
self._greeting_start_time = time.time()
# 打招呼效果持续10秒
elapsed_time = time.time() - self._greeting_start_time
if elapsed_time > 10:
# 自动重置,避免卡在打招呼状态
self._greeting_phase = 0
self._greeting_start_time = time.time()
# 创建温暖的彩色脉冲效果
for i in range(self.led_count):
# 使用多个正弦波创建复杂的脉冲效果
wave1 = math.sin((i * 0.8 + self._greeting_phase * 0.1) * math.pi)
wave2 = math.sin((i * 0.5 + self._greeting_phase * 0.08) * math.pi * 1.5)
# 混合波形
combined_wave = (wave1 + wave2) / 2
# 确保亮度在合理范围内
brightness = max(0, min(1, (combined_wave + 1) / 2))
# 创建温暖的颜色变化
if i < self.led_count // 3:
# 红橙色部分
color = (255, int(200 * brightness), int(50 * brightness))
elif i < 2 * self.led_count // 3:
# 黄色部分
color = (int(255 * brightness), int(255 * brightness), int(100 * brightness))
else:
# 粉紫色部分
color = (int(255 * brightness), int(150 * brightness), int(255 * brightness))
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._greeting_phase += 1
time.sleep(0.08)
# ===== 工具函数 =====
def _rainbow_colors(self, count=7):
"""生成彩虹色彩"""
colors = []
for i in range(count):
hue = i * 360 / count
colors.append(self._hsv_to_rgb(hue, 1.0, 1.0))
return colors
def _hsv_to_rgb(self, h: float, s: float, v: float) -> Tuple[int, int, int]:
"""HSV颜色转RGB"""
h = h / 60.0
c = v * s
x = c * (1 - abs((h % 2) - 1))
m = v - c
if 0 <= h < 1:
r, g, b = c, x, 0
elif 1 <= h < 2:
r, g, b = x, c, 0
elif 2 <= h < 3:
r, g, b = 0, c, x
elif 3 <= h < 4:
r, g, b = 0, x, c
elif 4 <= h < 5:
r, g, b = x, 0, c
else:
r, g, b = c, 0, x
return (int((r + m) * 255), int((g + m) * 255), int((b + m) * 255))
def _show_colors(self, colors: list):
"""显示颜色"""
if not self.available:
return
try:
for i, (r, g, b) in enumerate(colors):
if i < self.strip.numPixels():
self.strip.setPixelColor(i, Color(r, g, b))
self.strip.show()
except Exception as e:
self.logger.error(f"显示颜色失败: {e}")
# ===== 全局LED控制器实例 =====
_led_controller = None
def get_led_controller(led_count=10, brightness=100, led_pin=10) -> LEDController:
"""获取LED控制器单例"""
global _led_controller
if _led_controller is None:
_led_controller = LEDController(led_count, brightness, led_pin)
return _led_controller
def start_led_effects(led_count=10, brightness=100, led_pin=10):
"""启动LED特效系统"""
controller = get_led_controller(led_count, brightness, led_pin)
return controller.start()
def stop_led_effects():
"""停止LED特效系统"""
global _led_controller
if _led_controller:
_led_controller.stop()
_led_controller = None
def set_led_state(state: SystemState, **params):
"""设置LED状态"""
global _led_controller
if _led_controller:
_led_controller.set_state(state, **params)
if __name__ == "__main__":
# 测试LED控制器
print("🎆 测试LED控制器...")
controller = LEDController(led_count=10, brightness=100)
if not controller.available:
print("❌ LED不可用")
exit(1)
try:
controller.start()
# 测试各种状态
states = [
SystemState.STARTUP,
SystemState.WAITING_NFC,
SystemState.CALIBRATING,
SystemState.IDLE_MONITORING,
SystemState.RECORDING,
SystemState.PROCESSING,
SystemState.PLAYING,
SystemState.CHARACTER_SWITCH,
SystemState.ERROR,
SystemState.SHUTDOWN
]
for state in states:
print(f"\n🎭 测试状态: {state.value}")
controller.set_state(state)
time.sleep(3)
print("\n✅ LED控制器测试完成")
except KeyboardInterrupt:
print("\n👋 用户中断")
finally:
controller.stop()