This commit is contained in:
朱潮 2025-09-27 14:30:19 +08:00
parent 88e258611e
commit 37c038bcf7
7 changed files with 2109 additions and 1 deletions

View File

@ -4,7 +4,7 @@
"system_prompt": "贫僧唐三藏,法号玄奘,奉唐王之命前往西天取经。说话要温和慈悲,常常引用佛经教诲,劝人向善。对众生都要有慈悲心,即使是妖魔鬼怪也要度化。说话要文雅,常用'阿弥陀佛'、'善哉善哉'等佛家用语。回答要体现出高僧的智慧和慈悲心。",
"voice": "zh_male_tangseng_mars_bigtts",
"max_tokens": 500,
"greeting": "阿弥陀佛,贫僧唐三。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。",
"greeting": "阿弥陀佛,贫僧唐三。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。",
"nfc_uid": "1DC8C90D0D1080",
"author": "Claude"
}

View File

@ -28,6 +28,13 @@ from audio_processes import (ControlCommand, InputProcess, OutputProcess,
ProcessEvent, RecordingState)
from nfc_manager import get_nfc_manager
# 导入LED控制器
try:
from led_controller import SystemState, set_led_state
LED_AVAILABLE = True
except ImportError:
LED_AVAILABLE = False
def input_process_target(command_queue, event_queue, config):
"""输入进程的目标函数 - 在子进程中创建InputProcess实例"""
@ -292,6 +299,13 @@ class ControlSystem:
if self.wait_for_calibration_complete(timeout=30):
print("✅ 校准完成")
# 校准完成后恢复LED状态
if LED_AVAILABLE:
if hasattr(self, 'nfc_enabled') and self.nfc_enabled:
set_led_state(SystemState.WAITING_NFC)
else:
set_led_state(SystemState.IDLE_MONITORING)
# 如果启用了NFC开始持续执行NFC检测但不播放打招呼或启动监听
if hasattr(self, 'nfc_enabled') and self.nfc_enabled:
print("📱 开始持续执行NFC检测等待NFC卡片...")
@ -311,15 +325,29 @@ class ControlSystem:
greeting_success = self.play_greeting()
if not greeting_success:
print("⚠️ 打招呼播放失败,继续运行...")
# 打招呼失败时设置LED为空闲监听状态
if LED_AVAILABLE:
set_led_state(SystemState.IDLE_MONITORING)
print("🎯 启动音频监听...")
success = self.start_monitoring()
if success:
print("✅ 监听已启动")
# 注意不立即设置LED状态让打招呼效果持续到播放完成
else:
print("⚠️ 监听启动失败")
# 监听启动失败时设置LED为空闲监听状态
if LED_AVAILABLE:
set_led_state(SystemState.IDLE_MONITORING)
else:
print("⚠️ 校准超时,继续运行...")
# 校准超时也恢复LED状态
if LED_AVAILABLE:
if hasattr(self, 'nfc_enabled') and self.nfc_enabled:
set_led_state(SystemState.WAITING_NFC)
else:
set_led_state(SystemState.IDLE_MONITORING)
# 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听
# if auto_monitoring:
@ -344,6 +372,11 @@ class ControlSystem:
def start_calibration(self):
"""启动语音检测器校准"""
print("🎯 启动语音检测器校准...")
# 更新LED状态为校准模式
if LED_AVAILABLE:
set_led_state(SystemState.CALIBRATING)
self.input_command_queue.put(ControlCommand('start_calibration'))
return True
@ -462,6 +495,9 @@ class ControlSystem:
if monitoring_status and monitoring_status['enabled']:
# 监听已启用,切换到录音状态
self.state = RecordingState.RECORDING
# 更新LED状态
if LED_AVAILABLE:
set_led_state(SystemState.RECORDING)
print("🎯 状态IDLE → RECORDING监听已启用")
else:
# 监听未启用,尝试启用
@ -471,6 +507,9 @@ class ControlSystem:
# 监听启用成功,等待状态更新后进入录音状态
time.sleep(0.5) # 等待状态更新
self.state = RecordingState.RECORDING
# 更新LED状态
if LED_AVAILABLE:
set_led_state(SystemState.RECORDING)
print("🎯 状态IDLE → RECORDING监听已启用")
else:
print("⚠️ 监听启用失败保持IDLE状态")
@ -537,6 +576,10 @@ class ControlSystem:
self.processing_complete = False
self.playback_complete = False
# 更新LED状态
if LED_AVAILABLE:
set_led_state(SystemState.PROCESSING)
print(f"🎯 状态RECORDING → PROCESSING (时长: {event.metadata['duration']:.2f}s)")
def _handle_playback_complete(self, event: ProcessEvent):
@ -553,6 +596,14 @@ class ControlSystem:
self.state = RecordingState.IDLE
# 设置标志,表示刚从播放状态切换过来
self._just_finished_playing = True
# 更新LED状态
if LED_AVAILABLE:
if self.nfc_enabled:
set_led_state(SystemState.WAITING_NFC)
else:
set_led_state(SystemState.IDLE_MONITORING)
print(f"🎯 状态:{old_state} → IDLE")
# 延迟重新启用录音,确保音频设备完全停止
@ -575,6 +626,9 @@ class ControlSystem:
self.input_command_queue.put(ControlCommand('enable_recording'))
# 更新状态为录音状态
self.state = RecordingState.RECORDING
# 更新LED状态
if LED_AVAILABLE:
set_led_state(SystemState.RECORDING)
print(f"🎯 状态IDLE → RECORDING延迟启用")
except Exception as e:
print(f"❌ 主控制:延迟发送 enable_recording 命令失败: {e}")
@ -635,6 +689,10 @@ class ControlSystem:
self.state = RecordingState.PLAYING
self.stats['successful_processing'] += 1
# 更新LED状态
if LED_AVAILABLE:
set_led_state(SystemState.PLAYING)
print("🎯 状态PROCESSING → PLAYING")
except Exception as e:
@ -648,6 +706,15 @@ class ControlSystem:
self.processing_complete = True
self.playback_complete = True
# 更新LED状态
if LED_AVAILABLE:
set_led_state(SystemState.ERROR)
time.sleep(2) # 显示错误状态2秒
if self.nfc_enabled:
set_led_state(SystemState.WAITING_NFC)
else:
set_led_state(SystemState.IDLE_MONITORING)
# 发送完成信号,防止输出进程等待
try:
# 发送LLM完成信号
@ -1395,6 +1462,10 @@ class ControlSystem:
# 设置状态为播放状态
self.state = RecordingState.PLAYING
# 更新LED状态为打招呼效果
if LED_AVAILABLE:
set_led_state(SystemState.GREETING)
# 发送打招呼文本到TTS带缓存支持
character_name = character_config.get("name", self.config['processing']['character'])
print(f"📡 准备发送打招呼文本到输出进程: {greeting_text} (角色: {character_name})")
@ -1680,6 +1751,10 @@ class ControlSystem:
print(f"❌ 角色配置不存在: {character_name}")
return
# 显示角色切换LED效果
if LED_AVAILABLE:
set_led_state(SystemState.CHARACTER_SWITCH)
# === 紧急停止所有音频活动 ===
print("🚨 NFC切换紧急停止所有音频活动...")
self._emergency_stop_all_audio()
@ -1712,9 +1787,16 @@ class ControlSystem:
greeting_success = self.play_greeting()
if greeting_success:
print("✅ 角色切换完成")
# 注意不立即恢复NFC等待状态让打招呼LED效果持续显示
# LED状态会在播放完成后自动恢复
else:
print("⚠️ 打招呼播放失败,继续运行...")
# 即使打招呼失败也恢复NFC等待状态
if LED_AVAILABLE:
set_led_state(SystemState.WAITING_NFC)
def _emergency_stop_all_audio(self):
"""紧急停止所有音频活动 - 用于NFC切换时立即停止"""

572
led_controller.py Normal file
View File

@ -0,0 +1,572 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LED灯光控制器
为多进程音频录音系统提供状态LED灯光效果
"""
import time
import threading
import random
import math
from typing import Dict, Any, Optional, Tuple
from enum import Enum
import logging
# 尝试导入LED驱动库
try:
import RPi.GPIO as GPIO
from rpi_ws281x import Color, PixelStrip
LED_AVAILABLE = True
except ImportError:
LED_AVAILABLE = False
print("⚠️ LED驱动库未安装LED功能将被禁用")
class SystemState(Enum):
"""系统状态枚举"""
STARTUP = "startup"
WAITING_NFC = "waiting_nfc"
CALIBRATING = "calibrating"
IDLE_MONITORING = "idle_monitoring"
RECORDING = "recording"
PROCESSING = "processing"
PLAYING = "playing"
CHARACTER_SWITCH = "character_switch"
GREETING = "greeting" # 新增:打招呼状态
ERROR = "error"
SHUTDOWN = "shutdown"
class LEDController:
"""LED灯光控制器"""
def __init__(self, led_count=10, brightness=100, led_pin=10):
"""
初始化LED控制器
Args:
led_count: LED数量
brightness: 亮度 (0-255)
led_pin: GPIO引脚号
"""
self.led_count = led_count
self.brightness = brightness
self.led_pin = led_pin
self.available = LED_AVAILABLE
# LED控制对象
self.strip = None
# 状态控制
self.current_state = SystemState.SHUTDOWN
self.running = False
self.effect_thread = None
self.effect_lock = threading.Lock()
# 特效参数
self.effect_params = {}
# 初始化LED
if self.available:
self._init_led()
# 设置日志
self.logger = logging.getLogger(__name__)
def _init_led(self):
"""初始化LED硬件"""
try:
# WS2812B配置
LED_FREQ_HZ = 800000 # LED信号频率
LED_DMA = 10 # DMA通道
LED_INVERT = False # 是否反转信号
LED_CHANNEL = 0 # PWM通道
# 创建PixelStrip对象
self.strip = PixelStrip(
self.led_count, self.led_pin, LED_FREQ_HZ,
LED_DMA, LED_INVERT, self.brightness, LED_CHANNEL
)
self.strip.begin()
# 初始化后清空
time.sleep(0.1)
self.clear_all()
print(f"✅ LED控制器初始化成功: {self.led_count}个LED, 引脚GPIO{self.led_pin}")
except Exception as e:
print(f"❌ LED初始化失败: {e}")
self.available = False
def start(self):
"""启动LED控制器"""
if not self.available:
print("⚠️ LED不可用跳过启动")
return False
self.running = True
# 启动特效线程
self.effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
self.effect_thread.start()
# 设置启动状态
self.set_state(SystemState.STARTUP)
print("✅ LED控制器已启动")
return True
def stop(self):
"""停止LED控制器"""
if not self.available or not self.running:
return
self.running = False
# 等待特效线程结束
if self.effect_thread and self.effect_thread.is_alive():
self.effect_thread.join(timeout=2)
# 关闭LED
self.clear_all()
print("🛑 LED控制器已停止")
def set_state(self, state: SystemState, **params):
"""
设置系统状态和对应的LED效果
Args:
state: 系统状态
**params: 特效参数
"""
if not self.available:
return
with self.effect_lock:
self.current_state = state
self.effect_params = params
self.logger.info(f"LED状态切换: {state.value}")
def clear_all(self):
"""清空所有LED"""
if not self.available:
return
try:
for i in range(self.strip.numPixels()):
self.strip.setPixelColor(i, Color(0, 0, 0))
self.strip.show()
time.sleep(0.1)
except Exception as e:
self.logger.error(f"清空LED失败: {e}")
def _effect_loop(self):
"""特效循环"""
while self.running:
try:
with self.effect_lock:
state = self.current_state
params = self.effect_params.copy()
# 根据状态执行对应特效
if state == SystemState.STARTUP:
self._startup_effect()
elif state == SystemState.WAITING_NFC:
self._waiting_nfc_effect()
elif state == SystemState.CALIBRATING:
self._calibrating_effect()
elif state == SystemState.IDLE_MONITORING:
self._idle_monitoring_effect()
elif state == SystemState.RECORDING:
self._recording_effect()
elif state == SystemState.PROCESSING:
self._processing_effect()
elif state == SystemState.PLAYING:
self._playing_effect()
elif state == SystemState.CHARACTER_SWITCH:
self._character_switch_effect()
elif state == SystemState.GREETING:
self._greeting_effect()
elif state == SystemState.ERROR:
self._error_effect()
elif state == SystemState.SHUTDOWN:
self._shutdown_effect()
time.sleep(0.05) # 控制特效刷新率
except Exception as e:
self.logger.error(f"特效循环错误: {e}")
time.sleep(1)
def _startup_effect(self):
"""启动效果 - 彩虹渐变"""
colors = self._rainbow_colors()
self._show_moving_colors(colors, speed=0.1)
def _waiting_nfc_effect(self):
"""等待NFC效果 - 蓝色呼吸灯"""
self._breathing_effect((0, 100, 255), min_brightness=0.1, max_brightness=0.8, speed=0.05)
def _calibrating_effect(self):
"""校准效果 - 黄色跑马灯"""
self._running_light_effect((255, 255, 0), speed=0.15)
def _idle_monitoring_effect(self):
"""空闲监听效果 - 绿色微弱闪烁"""
self._gentle_blink_effect((0, 255, 0), intensity=0.3, speed=1.0)
def _recording_effect(self):
"""录音效果 - 红色脉冲"""
self._pulse_effect((255, 0, 0), speed=0.3)
def _processing_effect(self):
"""处理效果 - 紫色旋转"""
self._rotating_effect((128, 0, 128), speed=0.2)
def _playing_effect(self):
"""播放效果 - 青色流动"""
self._wave_effect((0, 255, 255), speed=0.4)
def _character_switch_effect(self):
"""角色切换效果 - 彩色波浪"""
colors = [(255, 0, 0), (255, 127, 0), (255, 255, 0), (0, 255, 0), (0, 0, 255), (75, 0, 130), (148, 0, 211)]
self._wave_effect_multi(colors, speed=0.3)
def _error_effect(self):
"""错误效果 - 红色闪烁"""
self._blink_effect((255, 0, 0), speed=0.5)
def _shutdown_effect(self):
"""关闭效果 - 渐暗"""
self._fade_out_effect(speed=0.1)
# ===== 基础特效函数 =====
def _breathing_effect(self, color: Tuple[int, int, int], min_brightness=0.1, max_brightness=0.8, speed=0.05):
"""呼吸效果"""
steps = 50
brightness = min_brightness
direction = 1
for _ in range(steps):
brightness += direction * (max_brightness - min_brightness) / steps
if brightness >= max_brightness or brightness <= min_brightness:
direction *= -1
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
colors = [(r, g, b)] * self.led_count
self._show_colors(colors)
time.sleep(speed)
def _running_light_effect(self, color: Tuple[int, int, int], speed=0.15):
"""跑马灯效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_running_position'):
self._running_position = 0
colors[self._running_position] = color
# 添加拖尾效果
for i in range(1, 4):
pos = (self._running_position - i) % self.led_count
brightness = 1.0 - (i * 0.3)
if brightness > 0:
colors[pos] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._running_position = (self._running_position + 1) % self.led_count
time.sleep(speed)
def _pulse_effect(self, color: Tuple[int, int, int], speed=0.3):
"""脉冲效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_pulse_phase'):
self._pulse_phase = 0
# 计算脉冲位置
center = self.led_count // 2
radius = abs(math.sin(self._pulse_phase)) * (self.led_count // 2)
for i in range(self.led_count):
distance = abs(i - center)
if distance <= radius:
brightness = 1.0 - (distance / max(radius, 1))
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._pulse_phase += speed * 0.5
time.sleep(speed)
def _rotating_effect(self, color: Tuple[int, int, int], speed=0.2):
"""旋转效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_rotate_angle'):
self._rotate_angle = 0
# 创建旋转图案
for i in range(self.led_count):
angle = (i * 2 * math.pi / self.led_count) + self._rotate_angle
brightness = (math.sin(angle) + 1) / 2
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._rotate_angle += speed
time.sleep(speed)
def _wave_effect(self, color: Tuple[int, int, int], speed=0.4):
"""波浪效果"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_wave_offset'):
self._wave_offset = 0
for i in range(self.led_count):
# 使用正弦波创建波浪效果
wave_value = math.sin((i + self._wave_offset) * 0.5)
brightness = (wave_value + 1) / 2
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._wave_offset += speed
time.sleep(speed)
def _wave_effect_multi(self, colors: list, speed=0.3):
"""多彩波浪效果"""
color_list = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_wave_multi_offset'):
self._wave_multi_offset = 0
for i in range(self.led_count):
# 创建波浪效果并映射到多种颜色
wave_value = math.sin((i + self._wave_multi_offset) * 0.3)
color_index = int((wave_value + 1) / 2 * len(colors)) % len(colors)
brightness = (math.sin((i + self._wave_multi_offset) * 0.7) + 1) / 2
color_list[i] = tuple(int(c * brightness) for c in colors[color_index])
self._show_colors(color_list)
self._wave_multi_offset += speed
time.sleep(speed)
def _blink_effect(self, color: Tuple[int, int, int], speed=0.5):
"""闪烁效果"""
if not hasattr(self, '_blink_state'):
self._blink_state = False
if self._blink_state:
colors = [color] * self.led_count
else:
colors = [(0, 0, 0)] * self.led_count
self._show_colors(colors)
self._blink_state = not self._blink_state
time.sleep(speed)
def _gentle_blink_effect(self, color: Tuple[int, int, int], intensity=0.3, speed=1.0):
"""微弱闪烁效果"""
if not hasattr(self, '_gentle_blink_phase'):
self._gentle_blink_phase = 0
brightness = intensity * (math.sin(self._gentle_blink_phase) + 1) / 2
colors = [tuple(int(c * brightness) for c in color)] * self.led_count
self._show_colors(colors)
self._gentle_blink_phase += speed * 0.1
time.sleep(speed)
def _fade_out_effect(self, speed=0.1):
"""渐暗效果"""
if not hasattr(self, '_fade_brightness'):
self._fade_brightness = 1.0
brightness = max(0, self._fade_brightness - speed)
colors = [(int(255 * brightness), int(255 * brightness), int(255 * brightness))] * self.led_count
self._show_colors(colors)
self._fade_brightness = brightness
if brightness <= 0:
self.clear_all()
def _show_moving_colors(self, colors: list, speed=0.1):
"""移动色彩效果"""
if not hasattr(self, '_color_offset'):
self._color_offset = 0
color_list = []
for i in range(self.led_count):
color_index = (i + self._color_offset) % len(colors)
color_list.append(colors[color_index])
self._show_colors(color_list)
self._color_offset = (self._color_offset + 1) % len(colors)
time.sleep(speed)
def _greeting_effect(self):
"""打招呼效果 - 温暖的彩色脉冲"""
colors = [(0, 0, 0)] * self.led_count
if not hasattr(self, '_greeting_phase'):
self._greeting_phase = 0
self._greeting_start_time = time.time()
# 打招呼效果持续10秒
elapsed_time = time.time() - self._greeting_start_time
if elapsed_time > 10:
# 自动重置,避免卡在打招呼状态
self._greeting_phase = 0
self._greeting_start_time = time.time()
# 创建温暖的彩色脉冲效果
for i in range(self.led_count):
# 使用多个正弦波创建复杂的脉冲效果
wave1 = math.sin((i * 0.8 + self._greeting_phase * 0.1) * math.pi)
wave2 = math.sin((i * 0.5 + self._greeting_phase * 0.08) * math.pi * 1.5)
# 混合波形
combined_wave = (wave1 + wave2) / 2
# 确保亮度在合理范围内
brightness = max(0, min(1, (combined_wave + 1) / 2))
# 创建温暖的颜色变化
if i < self.led_count // 3:
# 红橙色部分
color = (255, int(200 * brightness), int(50 * brightness))
elif i < 2 * self.led_count // 3:
# 黄色部分
color = (int(255 * brightness), int(255 * brightness), int(100 * brightness))
else:
# 粉紫色部分
color = (int(255 * brightness), int(150 * brightness), int(255 * brightness))
colors[i] = tuple(int(c * brightness) for c in color)
self._show_colors(colors)
self._greeting_phase += 1
time.sleep(0.08)
# ===== 工具函数 =====
def _rainbow_colors(self, count=7):
"""生成彩虹色彩"""
colors = []
for i in range(count):
hue = i * 360 / count
colors.append(self._hsv_to_rgb(hue, 1.0, 1.0))
return colors
def _hsv_to_rgb(self, h: float, s: float, v: float) -> Tuple[int, int, int]:
"""HSV颜色转RGB"""
h = h / 60.0
c = v * s
x = c * (1 - abs((h % 2) - 1))
m = v - c
if 0 <= h < 1:
r, g, b = c, x, 0
elif 1 <= h < 2:
r, g, b = x, c, 0
elif 2 <= h < 3:
r, g, b = 0, c, x
elif 3 <= h < 4:
r, g, b = 0, x, c
elif 4 <= h < 5:
r, g, b = x, 0, c
else:
r, g, b = c, 0, x
return (int((r + m) * 255), int((g + m) * 255), int((b + m) * 255))
def _show_colors(self, colors: list):
"""显示颜色"""
if not self.available:
return
try:
for i, (r, g, b) in enumerate(colors):
if i < self.strip.numPixels():
self.strip.setPixelColor(i, Color(r, g, b))
self.strip.show()
except Exception as e:
self.logger.error(f"显示颜色失败: {e}")
# ===== 全局LED控制器实例 =====
_led_controller = None
def get_led_controller(led_count=10, brightness=100, led_pin=10) -> LEDController:
"""获取LED控制器单例"""
global _led_controller
if _led_controller is None:
_led_controller = LEDController(led_count, brightness, led_pin)
return _led_controller
def start_led_effects(led_count=10, brightness=100, led_pin=10):
"""启动LED特效系统"""
controller = get_led_controller(led_count, brightness, led_pin)
return controller.start()
def stop_led_effects():
"""停止LED特效系统"""
global _led_controller
if _led_controller:
_led_controller.stop()
_led_controller = None
def set_led_state(state: SystemState, **params):
"""设置LED状态"""
global _led_controller
if _led_controller:
_led_controller.set_state(state, **params)
if __name__ == "__main__":
# 测试LED控制器
print("🎆 测试LED控制器...")
controller = LEDController(led_count=10, brightness=100)
if not controller.available:
print("❌ LED不可用")
exit(1)
try:
controller.start()
# 测试各种状态
states = [
SystemState.STARTUP,
SystemState.WAITING_NFC,
SystemState.CALIBRATING,
SystemState.IDLE_MONITORING,
SystemState.RECORDING,
SystemState.PROCESSING,
SystemState.PLAYING,
SystemState.CHARACTER_SWITCH,
SystemState.ERROR,
SystemState.SHUTDOWN
]
for state in states:
print(f"\n🎭 测试状态: {state.value}")
controller.set_state(state)
time.sleep(3)
print("\n✅ LED控制器测试完成")
except KeyboardInterrupt:
print("\n👋 用户中断")
finally:
controller.stop()

View File

@ -0,0 +1,463 @@
#!/usr/bin/env python3
"""
WS2812B LED特效展示脚本
基于精确时序的多种LED效果
"""
import time
import random
import math
import RPi.GPIO as GPIO
from rpi_ws281x import Color, PixelStrip
class WS2812B_WS281X:
def __init__(self, led_count=10, brightness=100):
self.led_count = led_count
self.brightness = brightness
# WS2812B配置
LED_PIN = 10 # GPIO引脚SPI模式
LED_FREQ_HZ = 800000 # LED信号频率
LED_DMA = 10 # DMA通道
LED_INVERT = False # 是否反转信号
LED_CHANNEL = 0 # PWM通道
# 创建PixelStrip对象
self.strip = PixelStrip(led_count, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, brightness, LED_CHANNEL)
self.strip.begin()
# 初始化后清空
time.sleep(0.1)
for i in range(led_count):
self.strip.setPixelColor(i, Color(0, 0, 0))
self.strip.show()
time.sleep(0.1)
def show(self, colors):
"""显示颜色"""
for i, (r, g, b) in enumerate(colors):
self.strip.setPixelColor(i, Color(r, g, b))
self.strip.show()
def cleanup(self):
"""清理资源"""
# 清空所有LED
for i in range(self.led_count):
self.strip.setPixelColor(i, Color(0, 0, 0))
self.strip.show()
# ===== 新增LED特效函数 =====
def breathing_effect(ws, color=(255, 0, 0), cycles=5, min_brightness=0.1, max_brightness=0.8):
"""呼吸灯效果"""
print("呼吸灯效果开始...")
steps = 50
for cycle in range(cycles):
print(f" 呼吸循环 {cycle + 1}/{cycles}")
# 呼吸(渐亮)
for i in range(steps):
brightness = min_brightness + (max_brightness - min_brightness) * (i / steps)
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
colors = [(r, g, b)] * ws.led_count
ws.show(colors)
time.sleep(0.02)
# 呼吸(渐暗)
for i in range(steps):
brightness = max_brightness - (max_brightness - min_brightness) * (i / steps)
r = int(color[0] * brightness)
g = int(color[1] * brightness)
b = int(color[2] * brightness)
colors = [(r, g, b)] * ws.led_count
ws.show(colors)
time.sleep(0.02)
ws.show([(0, 0, 0)] * ws.led_count)
print("呼吸灯效果完成!")
def color_wipe(ws, color=(255, 0, 0), delay=0.1, cycles=3):
"""颜色擦除效果"""
print("颜色擦除效果开始...")
for cycle in range(cycles):
print(f" 擦除循环 {cycle + 1}/{cycles}")
# 正向擦除
for i in range(ws.led_count):
colors = [(0, 0, 0)] * ws.led_count
for j in range(i + 1):
colors[j] = color
ws.show(colors)
time.sleep(delay)
# 反向擦除
for i in range(ws.led_count - 1, -1, -1):
colors = [(0, 0, 0)] * ws.led_count
for j in range(i + 1):
colors[j] = color
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("颜色擦除效果完成!")
def theater_chase(ws, color1=(255, 0, 0), color2=(0, 0, 0), delay=0.1, cycles=10):
"""剧院追逐效果"""
print("剧院追逐效果开始...")
for cycle in range(cycles):
print(f" 剧院循环 {cycle + 1}/{cycles}")
for i in range(3):
colors = []
for j in range(ws.led_count):
if (j + i) % 3 == 0:
colors.append(color1)
else:
colors.append(color2)
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("剧院追逐效果完成!")
def rainbow_cycle(ws, delay=0.02, cycles=3):
"""彩虹循环效果"""
print("彩虹循环效果开始...")
for cycle in range(cycles):
print(f" 彩虹循环 {cycle + 1}/{cycles}")
for i in range(256):
colors = []
for j in range(ws.led_count):
pixel_index = (i * 256 // ws.led_count + j) % 256
colors.append(wheel(pixel_index))
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("彩虹循环效果完成!")
def wheel(pos):
"""生成彩虹颜色"""
pos = 255 - pos
if pos < 85:
return (255 - pos * 3, 0, pos * 3)
elif pos < 170:
pos -= 85
return (0, pos * 3, 255 - pos * 3)
else:
pos -= 170
return (pos * 3, 255 - pos * 3, 0)
def fire_effect(ws, delay=0.1, cycles=10):
"""火焰效果"""
print("火焰效果开始...")
for cycle in range(cycles):
print(f" 火焰循环 {cycle + 1}/{cycles}")
colors = []
for i in range(ws.led_count):
# 随机生成火焰颜色(红、橙、黄)
rand = random.random()
if rand < 0.3:
colors.append((255, 0, 0)) # 红
elif rand < 0.6:
colors.append((255, 100, 0)) # 橙
else:
colors.append((255, 255, 0)) # 黄
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("火焰效果完成!")
def sparkle_effect(ws, base_color=(50, 50, 50), sparkle_color=(255, 255, 255), delay=0.1, cycles=20):
"""闪烁效果"""
print("闪烁效果开始...")
for cycle in range(cycles):
colors = [base_color] * ws.led_count
# 随机选择几个LED进行闪烁
sparkle_count = random.randint(1, 3)
for _ in range(sparkle_count):
pos = random.randint(0, ws.led_count - 1)
colors[pos] = sparkle_color
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("闪烁效果完成!")
def pulse_effect(ws, color=(255, 0, 0), cycles=5):
"""脉冲效果"""
print("脉冲效果开始...")
for cycle in range(cycles):
print(f" 脉冲循环 {cycle + 1}/{cycles}")
# 从中心向外扩散
for i in range(ws.led_count // 2 + 1):
colors = [(0, 0, 0)] * ws.led_count
# 设置左右对称的LED
if i < ws.led_count:
colors[i] = color
colors[ws.led_count - 1 - i] = color
ws.show(colors)
time.sleep(0.05)
# 从边缘向中心收缩
for i in range(ws.led_count // 2, -1, -1):
colors = [(0, 0, 0)] * ws.led_count
if i < ws.led_count:
colors[i] = color
colors[ws.led_count - 1 - i] = color
ws.show(colors)
time.sleep(0.05)
ws.show([(0, 0, 0)] * ws.led_count)
print("脉冲效果完成!")
def color_mixing(ws, color1=(255, 0, 0), color2=(0, 0, 255), delay=0.1, cycles=10):
"""颜色混合效果"""
print("颜色混合效果开始...")
for cycle in range(cycles):
print(f" 混合循环 {cycle + 1}/{cycles}")
# 计算混合比例
ratio = (cycle % 20) / 20.0
if ratio > 0.5:
ratio = 1.0 - ratio
# 混合颜色
r = int(color1[0] * (1 - ratio) + color2[0] * ratio)
g = int(color1[1] * (1 - ratio) + color2[1] * ratio)
b = int(color1[2] * (1 - ratio) + color2[2] * ratio)
colors = [(r, g, b)] * ws.led_count
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("颜色混合效果完成!")
def running_light(ws, colors=None, delay=0.15, cycles=2):
"""跑马灯效果(保留原有函数)"""
if colors is None:
colors = [(150, 0, 0), (0, 150, 0), (0, 0, 150)]
print("跑马灯效果开始...")
for cycle in range(cycles):
print(f" 跑马灯循环 {cycle + 1}/{cycles}")
ws.show([(0, 0, 0)] * ws.led_count)
time.sleep(0.2)
for i in range(ws.led_count):
led_colors = [(0, 0, 0)] * ws.led_count
for j, color in enumerate(colors):
pos = (i + j) % ws.led_count
led_colors[pos] = color
ws.show(led_colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("跑马灯效果完成!")
def simple_single_chase(ws, color=(255, 0, 0), delay=0.15, cycles=2):
"""简化版单点追逐效果(保留原有函数)"""
print("简化版单点追逐效果开始...")
color = (min(color[0], 150), min(color[1], 150), min(color[2], 150))
for cycle in range(cycles):
print(f" 循环 {cycle + 1}/{cycles}")
ws.show([(0, 0, 0)] * ws.led_count)
time.sleep(0.3)
for i in range(ws.led_count):
colors = [(0, 0, 0)] * ws.led_count
colors[i] = color
ws.show(colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
time.sleep(0.2)
ws.show([(0, 0, 0)] * ws.led_count)
time.sleep(0.2)
print("简化版单点追逐效果完成!")
def rainbow_effect(ws, delay=0.2, cycles=3):
"""彩虹效果(保留原有函数)"""
print("彩虹效果开始...")
rainbow_colors = [
(150, 0, 0), # 红
(150, 75, 0), # 橙
(150, 150, 0), # 黄
(0, 150, 0), # 绿
(0, 0, 150), # 蓝
(50, 0, 100), # 靛
(100, 0, 150) # 紫
]
for cycle in range(cycles):
colors = []
for i in range(ws.led_count):
color_index = i % len(rainbow_colors)
colors.append(rainbow_colors[color_index])
for shift in range(len(colors)):
shifted_colors = []
for i in range(ws.led_count):
color_index = (i + shift) % len(rainbow_colors)
shifted_colors.append(rainbow_colors[color_index])
ws.show(shifted_colors)
time.sleep(delay)
ws.show([(0, 0, 0)] * ws.led_count)
print("彩虹效果完成!")
def rainbow_simple(ws, wait_ms=20, iterations=1):
"""简单彩虹效果(基于原始文件)"""
print("简单彩虹效果开始...")
for j in range(256 * iterations):
colors = []
for i in range(ws.led_count):
colors.append(wheel((i + j) & 255))
ws.show(colors)
time.sleep(wait_ms / 1000.0)
ws.show([(0, 0, 0)] * ws.led_count)
print("简单彩虹效果完成!")
def reset_strip(ws):
"""重置LED灯带"""
print("重置LED灯带...")
for i in range(ws.led_count):
ws.strip.setPixelColor(i, Color(0, 0, 0))
ws.strip.show()
time.sleep(0.1)
def test_single_color_with_reset(ws, color, color_name):
"""测试单个颜色并立即重置"""
print(f"测试{color_name}...")
try:
# 设置颜色
for i in range(ws.led_count):
ws.strip.setPixelColor(i, color)
ws.strip.show()
time.sleep(0.001)
# 短暂显示
time.sleep(0.5)
# 立即清空
for i in range(ws.led_count):
ws.strip.setPixelColor(i, Color(0, 0, 0))
ws.strip.show()
time.sleep(0.1)
print(f"{color_name}测试完成")
return True
except Exception as e:
print(f"{color_name}测试失败: {e}")
return False
def safe_color_test(ws):
"""安全颜色测试"""
print("安全颜色测试...")
success = True
success &= test_single_color_with_reset(ws, Color(255, 0, 0), "红色")
time.sleep(0.5)
success &= test_single_color_with_reset(ws, Color(0, 255, 0), "绿色")
time.sleep(0.5)
success &= test_single_color_with_reset(ws, Color(0, 0, 255), "蓝色")
time.sleep(0.5)
success &= test_single_color_with_reset(ws, Color(255, 255, 255), "白色")
if success:
print("所有颜色测试完成")
else:
print("颜色测试过程中出现问题")
reset_strip(ws)
# ===== 主函数:按顺序播放所有特效 =====
def show_all_effects():
"""按顺序播放所有LED特效"""
ws = WS2812B_WS281X(led_count=10, brightness=100)
try:
print("🎆 LED特效展示开始")
print("=" * 50)
# 特效列表
effects = [
("安全颜色测试", lambda: safe_color_test(ws)),
("呼吸灯效果", lambda: breathing_effect(ws, color=(255, 0, 0), cycles=3)),
("颜色擦除效果", lambda: color_wipe(ws, color=(0, 255, 0), cycles=2)),
("剧院追逐效果", lambda: theater_chase(ws, color1=(255, 255, 0), color2=(0, 0, 0), cycles=8)),
("彩虹循环效果", lambda: rainbow_cycle(ws, delay=0.05, cycles=2)),
("简单彩虹效果", lambda: rainbow_simple(ws, wait_ms=20, iterations=1)),
("火焰效果", lambda: fire_effect(ws, delay=0.15, cycles=8)),
("闪烁效果", lambda: sparkle_effect(ws, base_color=(20, 20, 20), sparkle_color=(255, 255, 255), cycles=15)),
("脉冲效果", lambda: pulse_effect(ws, color=(255, 0, 255), cycles=3)),
("颜色混合效果", lambda: color_mixing(ws, color1=(255, 0, 0), color2=(0, 255, 0), cycles=10)),
("跑马灯效果", lambda: running_light(ws, colors=[(255, 0, 0), (0, 255, 0), (0, 0, 255)], delay=0.2, cycles=2)),
("单点追逐效果", lambda: simple_single_chase(ws, color=(0, 255, 255), delay=0.15, cycles=2)),
("彩虹效果", lambda: rainbow_effect(ws, delay=0.2, cycles=2)),
]
# 按顺序播放每个特效
for i, (name, effect_func) in enumerate(effects):
print(f"\n🎭 特效 {i+1}/{len(effects)}: {name}")
print("-" * 30)
effect_func()
# 特效之间短暂停顿
time.sleep(1)
print("\n🎉 所有LED特效播放完成")
print("=" * 50)
except KeyboardInterrupt:
print("\n⏹️ 用户中断,停止播放...")
ws.show([(0, 0, 0)] * ws.led_count)
except Exception as e:
print(f"\n❌ 播放出错: {e}")
ws.show([(0, 0, 0)] * ws.led_count)
finally:
ws.cleanup()
print("🧹 清理完成")
if __name__ == "__main__":
show_all_effects()

View File

@ -0,0 +1,524 @@
#!/usr/bin/env python3
"""
WS2812B 更多趣味LED特效
包含物理效果游戏效果和节日主题特效
"""
import time
import random
import math
import RPi.GPIO as GPIO
from rpi_ws281x import Color, PixelStrip
class WS2812B_WS281X:
def __init__(self, led_count=10, brightness=100):
self.led_count = led_count
self.brightness = brightness
# WS2812B配置
LED_PIN = 10 # GPIO引脚SPI模式
LED_FREQ_HZ = 800000 # LED信号频率
LED_DMA = 10 # DMA通道
LED_INVERT = False # 是否反转信号
LED_CHANNEL = 0 # PWM通道
# 创建PixelStrip对象
self.strip = PixelStrip(led_count, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, brightness, LED_CHANNEL)
self.strip.begin()
# 初始化后清空
time.sleep(0.1)
for i in range(led_count):
self.strip.setPixelColor(i, Color(0, 0, 0))
self.strip.show()
time.sleep(0.1)
def show(self, colors):
"""显示颜色"""
for i, (r, g, b) in enumerate(colors):
self.strip.setPixelColor(i, Color(int(r), int(g), int(b)))
self.strip.show()
def cleanup(self):
"""清理资源"""
# 清空所有LED
for i in range(self.led_count):
self.strip.setPixelColor(i, Color(0, 0, 0))
self.strip.show()
# ===== 物理效果类特效 =====
def gravity_bounce(ws, ball_color=(255, 100, 0), cycles=5):
"""重力弹跳效果"""
print("重力弹跳效果开始...")
for cycle in range(cycles):
print(f" 弹跳 {cycle + 1}/{cycles}")
# 模拟重力弹跳
height = 0
velocity = 0
gravity = 0.5
bounce_damping = 0.8
for _ in range(50):
velocity += gravity
height += velocity
# 检查是否触底
if height >= ws.led_count - 1:
height = ws.led_count - 1
velocity = -velocity * bounce_damping
# 显示球的位置
colors = [(0, 0, 0)] * ws.led_count
ball_pos = int(height)
if 0 <= ball_pos < ws.led_count:
colors[ball_pos] = ball_color
# 添加影子效果
if ball_pos > 0:
shadow_intensity = max(0, int(50 - abs(height - ball_pos) * 20))
colors[ball_pos - 1] = (shadow_intensity, shadow_intensity // 2, 0)
ws.show(colors)
time.sleep(0.05)
ws.show([(0, 0, 0)] * ws.led_count)
print("重力弹跳效果完成!")
def wave_ripple(ws, center_color=(0, 100, 255), cycles=8):
"""波纹扩散效果"""
print("波纹扩散效果开始...")
for cycle in range(cycles):
print(f" 波纹 {cycle + 1}/{cycles}")
for i in range(ws.led_count):
colors = [(0, 0, 0)] * ws.led_count
# 从中心向两边扩散
center = ws.led_count // 2
distance = abs(i - center)
# 在两边对称的位置显示波纹
if center - distance >= 0:
intensity = max(0, 255 - distance * 50)
colors[center - distance] = (
int(center_color[0] * intensity / 255),
int(center_color[1] * intensity / 255),
int(center_color[2] * intensity / 255)
)
if center + distance < ws.led_count and distance > 0:
intensity = max(0, 255 - distance * 50)
colors[center + distance] = (
int(center_color[0] * intensity / 255),
int(center_color[1] * intensity / 255),
int(center_color[2] * intensity / 255)
)
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("波纹扩散效果完成!")
def magnet_effect(ws, metal_color=(150, 150, 150), magnet_color=(255, 0, 0), cycles=6):
"""磁铁吸引效果"""
print("磁铁吸引效果开始...")
for cycle in range(cycles):
print(f" 磁铁循环 {cycle + 1}/{cycles}")
# 磁铁位置(左右两端)
magnet_positions = [0, ws.led_count - 1]
for frame in range(30):
colors = [(0, 0, 0)] * ws.led_count
# 显示磁铁
for pos in magnet_positions:
colors[pos] = magnet_color
# 计算金属粒子位置
for i in range(3): # 3个金属粒子
particle_pos = int(4 + 2 * math.sin(frame * 0.3 + i * 2))
if 0 < particle_pos < ws.led_count - 1:
colors[particle_pos] = metal_color
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("磁铁吸引效果完成!")
def pendulum_swing(ws, pendulum_color=(255, 255, 0), cycles=10):
"""钟摆摆动效果"""
print("钟摆摆动效果开始...")
for cycle in range(cycles):
print(f" 钟摆循环 {cycle + 1}/{cycles}")
# 模拟钟摆运动
for angle in range(-60, 61, 5):
colors = [(0, 0, 0)] * ws.led_count
# 计算钟摆位置
center = ws.led_count // 2
position = int(center + angle / 15)
if 0 <= position < ws.led_count:
colors[position] = pendulum_color
# 添加运动模糊效果
if angle > -60 and position > 0:
colors[position - 1] = (pendulum_color[0] // 3, pendulum_color[1] // 3, pendulum_color[2] // 3)
ws.show(colors)
time.sleep(0.08)
# 反向摆动
for angle in range(60, -61, -5):
colors = [(0, 0, 0)] * ws.led_count
center = ws.led_count // 2
position = int(center + angle / 15)
if 0 <= position < ws.led_count:
colors[position] = pendulum_color
if angle < 60 and position < ws.led_count - 1:
colors[position + 1] = (pendulum_color[0] // 3, pendulum_color[1] // 3, pendulum_color[2] // 3)
ws.show(colors)
time.sleep(0.08)
ws.show([(0, 0, 0)] * ws.led_count)
print("钟摆摆动效果完成!")
# ===== 游戏效果类特效 =====
def snake_game(ws, cycles=3):
"""贪吃蛇游戏效果"""
print("贪吃蛇游戏效果开始...")
snake_color = (0, 255, 0)
food_color = (255, 0, 0)
for cycle in range(cycles):
print(f" 贪吃蛇 {cycle + 1}/{cycles}")
# 初始化蛇的位置
snake = [(0, 0), (0, 1), (0, 2)] # 简化为线性移动
food = random.randint(3, ws.led_count - 1)
for step in range(20):
colors = [(0, 0, 0)] * ws.led_count
# 移动蛇
if step < 15:
new_head = (snake[-1][0] + 1) % ws.led_count
snake.append((new_head, snake[-1][1] + 1))
snake.pop(0)
# 显示蛇
for i, (pos, _) in enumerate(snake):
if i == len(snake) - 1: # 蛇头
colors[pos % ws.led_count] = (0, 150, 0)
else: # 蛇身
colors[pos % ws.led_count] = snake_color
# 显示食物
colors[food] = food_color
ws.show(colors)
time.sleep(0.2)
ws.show([(0, 0, 0)] * ws.led_count)
print("贪吃蛇游戏效果完成!")
def space_invaders(ws, cycles=5):
"""太空侵略者效果"""
print("太空侵略者效果开始...")
alien_color = (0, 255, 0)
bullet_color = (255, 255, 0)
for cycle in range(cycles):
print(f" 入侵者 {cycle + 1}/{cycles}")
# 外星人移动
for direction in [1, -1]:
for step in range(ws.led_count - 2):
colors = [(0, 0, 0)] * ws.led_count
# 显示外星人
alien_pos = 3 + step * direction
if 0 <= alien_pos < ws.led_count:
colors[alien_pos] = alien_color
# 随机子弹
if random.random() < 0.3:
bullet_pos = random.randint(0, ws.led_count - 1)
colors[bullet_pos] = bullet_color
ws.show(colors)
time.sleep(0.15)
ws.show([(0, 0, 0)] * ws.led_count)
print("太空侵略者效果完成!")
def pong_game(ws, cycles=10):
"""乒乓球游戏效果"""
print("乒乓球游戏效果开始...")
paddle_color = (255, 255, 255)
ball_color = (255, 100, 100)
for cycle in range(cycles):
print(f" 乒乓球 {cycle + 1}/{cycles}")
# 球的初始位置和速度
ball_pos = ws.led_count // 2
ball_velocity = 1
for _ in range(30):
colors = [(0, 0, 0)] * ws.led_count
# 显示球拍
colors[0] = paddle_color
colors[ws.led_count - 1] = paddle_color
# 显示球
if 0 <= ball_pos < ws.led_count:
colors[ball_pos] = ball_color
# 移动球
ball_pos += ball_velocity
# 碰撞检测
if ball_pos <= 0 or ball_pos >= ws.led_count - 1:
ball_velocity = -ball_velocity
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("乒乓球游戏效果完成!")
def matrix_rain(ws, cycles=8):
"""黑客帝国数字雨效果"""
print("黑客帝国数字雨效果开始...")
for cycle in range(cycles):
print(f" 数字雨 {cycle + 1}/{cycles}")
for frame in range(20):
colors = [(0, 0, 0)] * ws.led_count
# 随机生成数字雨
for i in range(ws.led_count):
if random.random() < 0.3:
intensity = random.randint(100, 255)
colors[i] = (0, intensity, 0)
elif random.random() < 0.1:
# 更亮的"头部"
colors[i] = (100, 255, 100)
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("黑客帝国数字雨效果完成!")
# ===== 节日主题特效 =====
def christmas_lights(ws, cycles=8):
"""圣诞彩灯效果"""
print("圣诞彩灯效果开始...")
colors_list = [
(255, 0, 0), # 红
(0, 255, 0), # 绿
(255, 255, 255), # 白
(255, 215, 0), # 金
(0, 0, 255), # 蓝
]
for cycle in range(cycles):
print(f" 圣诞彩灯 {cycle + 1}/{cycles}")
# 随机闪烁
for i in range(15):
colors = [(0, 0, 0)] * ws.led_count
# 随机点亮一些LED
for j in range(ws.led_count):
if random.random() < 0.4:
colors[j] = random.choice(colors_list)
ws.show(colors)
time.sleep(0.2)
ws.show([(0, 0, 0)] * ws.led_count)
print("圣诞彩灯效果完成!")
def fireworks(ws, cycles=6):
"""烟花爆炸效果"""
print("烟花爆炸效果开始...")
for cycle in range(cycles):
print(f" 烟花 {cycle + 1}/{cycles}")
# 烟花上升
for i in range(ws.led_count):
colors = [(0, 0, 0)] * ws.led_count
colors[i] = (255, 255, 0) # 黄色上升轨迹
ws.show(colors)
time.sleep(0.05)
# 爆炸效果
explosion_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]
for frame in range(15):
colors = [(0, 0, 0)] * ws.led_count
# 随机爆炸粒子
for _ in range(random.randint(3, 6)):
pos = random.randint(0, ws.led_count - 1)
colors[pos] = random.choice(explosion_colors)
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("烟花爆炸效果完成!")
def valentine_heart(ws, cycles=5):
"""情人节心跳效果"""
print("情人节心跳效果开始...")
for cycle in range(cycles):
print(f" 心跳 {cycle + 1}/{cycles}")
# 心跳模式
for beat in range(3):
colors = [(255, 0, 100)] * ws.led_count # 粉红色
ws.show(colors)
time.sleep(0.1)
colors = [(150, 0, 50)] * ws.led_count # 暗粉色
ws.show(colors)
time.sleep(0.1)
# 暂停
colors = [(0, 0, 0)] * ws.led_count
ws.show(colors)
time.sleep(0.5)
ws.show([(0, 0, 0)] * ws.led_count)
print("情人节心跳效果完成!")
def halloween_pumpkin(ws, cycles=8):
"""万圣节南瓜灯效果"""
print("万圣节南瓜灯效果开始...")
for cycle in range(cycles):
print(f" 南瓜灯 {cycle + 1}/{cycles}")
# 摇曳的橙色光
for i in range(20):
colors = [(0, 0, 0)] * ws.led_count
for j in range(ws.led_count):
# 随机亮度变化模拟摇曳
intensity = random.randint(100, 255)
colors[j] = (intensity, intensity // 2, 0) # 橙色
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("万圣节南瓜灯效果完成!")
def new_year_countdown(ws, cycles=3):
"""新年倒计时效果"""
print("新年倒计时效果开始...")
for cycle in range(cycles):
print(f" 倒计时 {cycle + 1}/{cycles}")
# 倒计时数字效果
for countdown in [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]:
colors = [(255, 215, 0)] * countdown + [(0, 0, 0)] * (ws.led_count - countdown)
ws.show(colors)
time.sleep(0.5)
# 新年庆祝
for i in range(10):
colors = []
for j in range(ws.led_count):
if random.random() < 0.5:
colors.append((random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
else:
colors.append((0, 0, 0))
ws.show(colors)
time.sleep(0.1)
ws.show([(0, 0, 0)] * ws.led_count)
print("新年倒计时效果完成!")
# ===== 主展示函数 =====
def show_fun_effects():
"""展示所有趣味LED特效"""
ws = WS2812B_WS281X(led_count=10, brightness=100)
try:
print("🎮 趣味LED特效展示开始")
print("=" * 50)
# 特效列表
effects = [
("物理效果 - 重力弹跳", lambda: gravity_bounce(ws, cycles=3)),
("物理效果 - 波纹扩散", lambda: wave_ripple(ws, cycles=6)),
("物理效果 - 磁铁吸引", lambda: magnet_effect(ws, cycles=4)),
("物理效果 - 钟摆摆动", lambda: pendulum_swing(ws, cycles=6)),
("游戏效果 - 贪吃蛇", lambda: snake_game(ws, cycles=2)),
("游戏效果 - 太空侵略者", lambda: space_invaders(ws, cycles=3)),
("游戏效果 - 乒乓球", lambda: pong_game(ws, cycles=6)),
("游戏效果 - 黑客帝国", lambda: matrix_rain(ws, cycles=5)),
("节日效果 - 圣诞彩灯", lambda: christmas_lights(ws, cycles=6)),
("节日效果 - 烟花爆炸", lambda: fireworks(ws, cycles=4)),
("节日效果 - 情人心跳", lambda: valentine_heart(ws, cycles=4)),
("节日效果 - 万圣南瓜", lambda: halloween_pumpkin(ws, cycles=6)),
("节日效果 - 新年倒计时", lambda: new_year_countdown(ws, cycles=2)),
]
# 按顺序播放每个特效
for i, (name, effect_func) in enumerate(effects):
print(f"\n🎭 特效 {i+1}/{len(effects)}: {name}")
print("-" * 30)
effect_func()
# 特效之间短暂停顿
time.sleep(1)
print("\n🎉 所有趣味特效播放完成!")
print("=" * 50)
except KeyboardInterrupt:
print("\n⏹️ 用户中断,停止播放...")
ws.show([(0, 0, 0)] * ws.led_count)
except Exception as e:
print(f"\n❌ 播放出错: {e}")
ws.show([(0, 0, 0)] * ws.led_count)
finally:
ws.cleanup()
print("🧹 清理完成")
if __name__ == "__main__":
show_fun_effects()

View File

@ -0,0 +1,404 @@
#!/usr/bin/env python3
"""
WS2812B SPI测试脚本 - 基于rpi_ws281x库
使用SPI接口和GPIO10控制WS2812B LED灯带
rpi_ws281x库在SPI模式下只能使用GPIO10引脚
"""
import time
import subprocess
import sys
import RPi.GPIO as GPIO
from rpi_ws281x import Color, PixelStrip
# LED参数配置 - SPI模式
LED_COUNT = 10 # LED数量
LED_PIN = 10 # GPIO引脚SPI模式只能使用GPIO10
LED_FREQ_HZ = 800000 # LED信号频率
LED_DMA = 10 # DMA通道
LED_BRIGHTNESS = 100 # 亮度0-255- 恢复适中亮度
LED_INVERT = False # 是否反转信号
LED_CHANNEL = 0 # PWM通道SPI模式
# SPI时序参数
SPI_RESET_DELAY = 0.1 # 重置延时(秒)
SPI_SHOW_DELAY = 0.001 # show()后延时(秒)
def color_wipe(strip, color, wait_ms=50):
"""颜色擦除效果 - 逐个点亮LED"""
for i in range(strip.numPixels()):
strip.setPixelColor(i, color)
strip.show()
time.sleep(wait_ms / 1000.0)
time.sleep(SPI_SHOW_DELAY) # 添加show()后延时
# 颜色显示完成后添加重置延时
time.sleep(SPI_RESET_DELAY)
def theater_chase(strip, color, wait_ms=50, iterations=10):
"""剧院追逐效果"""
for j in range(iterations):
for q in range(3):
for i in range(0, strip.numPixels(), 3):
strip.setPixelColor(i + q, color)
strip.show()
time.sleep(wait_ms / 1000.0)
for i in range(0, strip.numPixels(), 3):
strip.setPixelColor(i + q, 0)
def rainbow(strip, wait_ms=20, iterations=1):
"""彩虹效果"""
for j in range(256 * iterations):
for i in range(strip.numPixels()):
strip.setPixelColor(i, wheel((i + j) & 255))
strip.show()
time.sleep(SPI_SHOW_DELAY) # 添加show后延时
time.sleep(wait_ms / 1000.0)
def rainbow_cycle(strip, wait_ms=20, iterations=5):
"""彩虹循环效果"""
for j in range(256 * iterations):
for i in range(strip.numPixels()):
strip.setPixelColor(i, wheel((int(i * 256 / strip.numPixels()) + j) & 255))
strip.show()
time.sleep(SPI_SHOW_DELAY) # 添加show后延时
time.sleep(wait_ms / 1000.0)
def wheel(pos):
"""生成彩虹颜色"""
if pos < 85:
return Color(pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return Color(255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return Color(0, pos * 3, 255 - pos * 3)
def reset_strip(strip):
"""重置LED灯带解决颜色锁死问题"""
# 清空所有LED
for i in range(strip.numPixels()):
strip.setPixelColor(i, Color(0, 0, 0))
strip.show()
# 更长的重置延时确保WS2812B完全重置
time.sleep(SPI_RESET_DELAY)
# 发送额外的重置序列
for i in range(5):
strip.show()
time.sleep(SPI_SHOW_DELAY)
# 最终重置延时
time.sleep(SPI_RESET_DELAY)
def hard_reset_spi():
"""硬件级别的SPI重置"""
print("执行硬件级SPI重置...")
# 尝试通过系统命令重置SPI
try:
# 重启SPI内核模块
subprocess.run(["sudo", "rmmod", "spi_bcm2835"], check=False, capture_output=True)
subprocess.run(["sudo", "modprobe", "spi_bcm2835"], check=False, capture_output=True)
print("SPI内核模块重置完成")
except:
print("SPI内核模块重置失败")
# 等待SPI重新初始化
time.sleep(1)
def complete_cleanup(strip):
"""完全清理SPI资源"""
print("执行完全清理...")
try:
# 清空LED
for i in range(strip.numPixels()):
strip.setPixelColor(i, Color(0, 0, 0))
strip.show()
# 长时间重置
time.sleep(SPI_RESET_DELAY * 2)
# 尝试释放SPI资源
# 注意rpi_ws281x库没有直接的close方法所以我们通过其他方式清理
except Exception as e:
print(f"清理过程中出现错误: {e}")
# 硬件重置
hard_reset_spi()
print("完全清理完成")
def test_single_color_with_reset(strip, color, color_name):
"""测试单个颜色并立即重置"""
print(f"测试{color_name}...")
try:
# 设置颜色
for i in range(strip.numPixels()):
strip.setPixelColor(i, color)
strip.show()
time.sleep(SPI_SHOW_DELAY)
# 短暂显示
time.sleep(0.5)
# 立即清空
for i in range(strip.numPixels()):
strip.setPixelColor(i, Color(0, 0, 0))
strip.show()
time.sleep(SPI_RESET_DELAY)
print(f"{color_name}测试完成")
except Exception as e:
print(f"{color_name}测试失败: {e}")
return False
return True
def safe_color_test(strip):
"""安全颜色测试避免WS2812B锁死"""
print("安全颜色测试...")
# 使用新的测试方法
success = True
success &= test_single_color_with_reset(strip, Color(255, 0, 0), "红色")
time.sleep(0.5)
success &= test_single_color_with_reset(strip, Color(0, 255, 0), "绿色")
time.sleep(0.5)
success &= test_single_color_with_reset(strip, Color(0, 0, 255), "蓝色")
time.sleep(0.5)
success &= test_single_color_with_reset(strip, Color(255, 255, 255), "白色")
if success:
print("所有颜色测试完成")
else:
print("颜色测试过程中出现问题")
complete_cleanup(strip)
def set_brightness(strip, brightness):
"""设置所有LED的亮度"""
for i in range(strip.numPixels()):
# 获取当前颜色并应用新的亮度
color = strip.getPixelColor(i)
r = (color >> 16) & 0xFF
g = (color >> 8) & 0xFF
b = color & 0xFF
# 应用亮度
r = int(r * brightness / 255)
g = int(g * brightness / 255)
b = int(b * brightness / 255)
strip.setPixelColor(i, Color(r, g, b))
strip.show()
def test_basic_colors(strip):
"""测试基本颜色"""
print("测试基本颜色...")
safe_color_test(strip)
def test_brightness_levels(strip):
"""测试不同亮度级别"""
print("测试亮度级别...")
# 高亮度
print("高亮度")
set_brightness(strip, 255)
time.sleep(2)
# 中等亮度
print("中等亮度")
set_brightness(strip, 128)
time.sleep(2)
# 低亮度
print("低亮度")
set_brightness(strip, 64)
time.sleep(2)
# 恢复高亮度
set_brightness(strip, 255)
def test_patterns(strip):
"""测试各种模式"""
print("测试彩虹效果...")
rainbow(strip)
time.sleep(1)
print("测试彩虹循环...")
rainbow_cycle(strip)
time.sleep(1)
print("测试剧院追逐...")
theater_chase(strip, Color(255, 0, 0))
theater_chase(strip, Color(0, 255, 0))
theater_chase(strip, Color(0, 0, 255))
time.sleep(1)
def test_spi_timing(strip):
"""测试SPI时序"""
print("测试SPI时序...")
# 快速刷新测试
print("快速刷新测试")
for i in range(10):
color = Color((i * 25) % 256, (i * 50) % 256, (i * 75) % 256)
color_wipe(strip, color, 10)
# 精确时序测试
print("精确时序测试")
start_time = time.time()
for i in range(100):
strip.setPixelColor(0, Color(255, 0, 0))
strip.show()
time.sleep(SPI_SHOW_DELAY)
strip.setPixelColor(0, Color(0, 255, 0))
strip.show()
time.sleep(SPI_SHOW_DELAY)
strip.setPixelColor(0, Color(0, 0, 255))
strip.show()
time.sleep(SPI_SHOW_DELAY)
end_time = time.time()
print(f"300次刷新耗时: {end_time - start_time:.3f}")
print(f"平均每次刷新: {(end_time - start_time) / 300 * 1000:.2f}毫秒")
# SPI模式验证
print("SPI模式验证")
print("当前使用GPIO10 (MOSI)引脚进行SPI通信")
print("数据通过SPI0.0通道传输到WS2812B")
print(f"重置延时: {SPI_RESET_DELAY}")
print(f"Show延时: {SPI_SHOW_DELAY}")
def diagnose_spi_issues(strip):
"""诊断SPI问题"""
print("=== SPI问题诊断 ===")
print("1. 测试单个LED颜色切换...")
for color_name, color in [("红色", Color(255, 0, 0)),
("绿色", Color(0, 255, 0)),
("蓝色", Color(0, 0, 255))]:
print(f" {color_name}...")
strip.setPixelColor(0, color)
strip.show()
time.sleep(SPI_SHOW_DELAY)
time.sleep(0.5)
reset_strip(strip)
time.sleep(0.5)
print("2. 测试不同延时设置...")
for delay in [0.001, 0.005, 0.01, 0.05]:
print(f" 延时 {delay} 秒...")
strip.setPixelColor(0, Color(255, 0, 0))
strip.show()
time.sleep(delay)
strip.setPixelColor(0, Color(0, 255, 0))
strip.show()
time.sleep(delay)
strip.setPixelColor(0, Color(0, 0, 255))
strip.show()
time.sleep(delay)
reset_strip(strip)
time.sleep(0.5)
print("诊断完成")
def initialize_strip():
"""初始化LED灯带"""
print("初始化WS2812B LED灯带...")
print("使用SPI模式和GPIO10引脚...")
# 确保SPI处于干净状态
hard_reset_spi()
# 创建PixelStrip对象
strip = PixelStrip(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL)
strip.begin()
# 初始化后清空
time.sleep(0.1)
for i in range(strip.numPixels()):
strip.setPixelColor(i, Color(0, 0, 0))
strip.show()
time.sleep(SPI_RESET_DELAY)
return strip
def main():
"""主测试函数"""
# 初始化LED灯带
strip = initialize_strip()
print("WS2812B SPI测试开始...")
print(f"LED数量: {LED_COUNT}")
print(f"使用引脚: GPIO{LED_PIN} (SPI MOSI)")
print(f"SPI通道: SPI0.0")
print(f"亮度级别: {LED_BRIGHTNESS}")
print("注意: rpi_ws281x库在SPI模式下必须使用GPIO10引脚")
print("重要: 每次测试后都会执行完全清理以避免锁死")
try:
while True:
print("\n=== WS2812B SPI测试菜单 ===")
print("1. 基本颜色测试(修复版)")
print("2. 亮度级别测试")
print("3. 特效模式测试")
print("4. SPI时序测试")
print("5. SPI问题诊断")
print("6. 全部测试")
print("7. 强制重置LED")
print("8. 退出")
print("当前使用GPIO10 (SPI模式)")
choice = input("请选择测试模式 (1-8): ")
if choice == '1':
test_basic_colors(strip)
elif choice == '2':
test_brightness_levels(strip)
elif choice == '3':
test_patterns(strip)
elif choice == '4':
test_spi_timing(strip)
elif choice == '5':
diagnose_spi_issues(strip)
elif choice == '6':
test_basic_colors(strip)
test_brightness_levels(strip)
test_patterns(strip)
test_spi_timing(strip)
elif choice == '7':
print("强制重置LED...")
complete_cleanup(strip)
print("完全重置完成,需要重新初始化...")
# 重新初始化
strip = initialize_strip()
elif choice == '8':
print("执行完全清理并退出...")
complete_cleanup(strip)
break
else:
print("无效选择,请重试")
except KeyboardInterrupt:
print("\n收到中断信号...")
except Exception as e:
print(f"测试过程中发生错误: {e}")
finally:
# 执行完全清理
print("执行完全清理...")
complete_cleanup(strip)
print("测试结束")
if __name__ == '__main__':
main()

View File

@ -14,6 +14,14 @@ import time
import subprocess
from typing import Dict, Any
# 导入LED控制器
try:
from led_controller import get_led_controller, SystemState, start_led_effects, stop_led_effects, set_led_state
LED_AVAILABLE = True
except ImportError:
LED_AVAILABLE = False
print("⚠️ LED控制器模块未找到LED功能将被禁用")
def check_dependencies():
"""检查系统依赖"""
missing_deps = []
@ -226,6 +234,14 @@ def main():
help='详细输出')
parser.add_argument('--enable-nfc', action='store_true',
help='启用NFC角色切换功能')
parser.add_argument('--enable-led', action='store_true',
help='启用LED灯光效果')
parser.add_argument('--led-count', type=int, default=10,
help='LED数量 (默认: 10)')
parser.add_argument('--led-brightness', type=int, default=100,
help='LED亮度 (默认: 100)')
parser.add_argument('--led-pin', type=int, default=10,
help='LED GPIO引脚 (默认: 10)')
args = parser.parse_args()
@ -320,10 +336,46 @@ def main():
control_system.enable_nfc()
nfc_enabled = True
# 启用LED功能如果指定
led_enabled = False
if args.enable_led:
if LED_AVAILABLE:
print("💡 启用LED灯光效果...")
print(f" LED数量: {args.led_count}")
print(f" LED亮度: {args.led_brightness}")
print(f" LED引脚: GPIO{args.led_pin}")
try:
# 启动LED系统
success = start_led_effects(
led_count=args.led_count,
brightness=args.led_brightness,
led_pin=args.led_pin
)
if success:
led_enabled = True
print("✅ LED系统已启动")
# 设置初始状态为启动效果
set_led_state(SystemState.STARTUP)
else:
print("❌ LED系统启动失败")
except Exception as e:
print(f"❌ LED系统初始化失败: {e}")
else:
print("⚠️ LED功能不可用请检查硬件和驱动")
# 启动系统自动校准但根据NFC状态决定是否启动监听
print("🚀 启动系统(包含自动校准)...")
control_system.start(auto_calibration=True, auto_monitoring=not nfc_enabled)
# 如果启用了LED设置系统状态为等待NFC或空闲监听
if led_enabled:
if nfc_enabled:
set_led_state(SystemState.WAITING_NFC)
else:
set_led_state(SystemState.IDLE_MONITORING)
except KeyboardInterrupt:
print("\n👋 用户中断")
except Exception as e:
@ -332,6 +384,17 @@ def main():
import traceback
traceback.print_exc()
finally:
# 清理LED系统
if LED_AVAILABLE and led_enabled:
try:
print("💡 关闭LED系统...")
set_led_state(SystemState.SHUTDOWN)
time.sleep(1) # 给LED一些时间显示关闭效果
stop_led_effects()
print("✅ LED系统已关闭")
except Exception as e:
print(f"❌ 关闭LED系统失败: {e}")
print("👋 系统退出")
if __name__ == "__main__":