diff --git a/characters/tangseng.json b/characters/tangseng.json index 89f340f..d4c10ca 100644 --- a/characters/tangseng.json +++ b/characters/tangseng.json @@ -4,7 +4,7 @@ "system_prompt": "贫僧唐三藏,法号玄奘,奉唐王之命前往西天取经。说话要温和慈悲,常常引用佛经教诲,劝人向善。对众生都要有慈悲心,即使是妖魔鬼怪也要度化。说话要文雅,常用'阿弥陀佛'、'善哉善哉'等佛家用语。回答要体现出高僧的智慧和慈悲心。", "voice": "zh_male_tangseng_mars_bigtts", "max_tokens": 500, - "greeting": "阿弥陀佛,贫僧唐三藏。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。", + "greeting": "阿弥陀佛,贫僧唐三葬。今日与君相遇,实乃有缘。愿与施主共论佛法,同修善果。", "nfc_uid": "1DC8C90D0D1080", "author": "Claude" } diff --git a/control_system.py b/control_system.py index a1afbf3..241626c 100644 --- a/control_system.py +++ b/control_system.py @@ -28,6 +28,13 @@ from audio_processes import (ControlCommand, InputProcess, OutputProcess, ProcessEvent, RecordingState) from nfc_manager import get_nfc_manager +# 导入LED控制器 +try: + from led_controller import SystemState, set_led_state + LED_AVAILABLE = True +except ImportError: + LED_AVAILABLE = False + def input_process_target(command_queue, event_queue, config): """输入进程的目标函数 - 在子进程中创建InputProcess实例""" @@ -292,6 +299,13 @@ class ControlSystem: if self.wait_for_calibration_complete(timeout=30): print("✅ 校准完成") + # 校准完成后恢复LED状态 + if LED_AVAILABLE: + if hasattr(self, 'nfc_enabled') and self.nfc_enabled: + set_led_state(SystemState.WAITING_NFC) + else: + set_led_state(SystemState.IDLE_MONITORING) + # 如果启用了NFC,开始持续执行NFC检测,但不播放打招呼或启动监听 if hasattr(self, 'nfc_enabled') and self.nfc_enabled: print("📱 开始持续执行NFC检测,等待NFC卡片...") @@ -311,15 +325,29 @@ class ControlSystem: greeting_success = self.play_greeting() if not greeting_success: print("⚠️ 打招呼播放失败,继续运行...") + # 打招呼失败时设置LED为空闲监听状态 + if LED_AVAILABLE: + set_led_state(SystemState.IDLE_MONITORING) print("🎯 启动音频监听...") success = self.start_monitoring() if success: print("✅ 监听已启动") + # 注意:不立即设置LED状态,让打招呼效果持续到播放完成 else: print("⚠️ 监听启动失败") + # 监听启动失败时设置LED为空闲监听状态 + if LED_AVAILABLE: + set_led_state(SystemState.IDLE_MONITORING) else: print("⚠️ 校准超时,继续运行...") + + # 校准超时也恢复LED状态 + if LED_AVAILABLE: + if hasattr(self, 'nfc_enabled') and self.nfc_enabled: + set_led_state(SystemState.WAITING_NFC) + else: + set_led_state(SystemState.IDLE_MONITORING) # 注释掉自动启动监听功能,让打招呼播放完成后自动开启监听 # if auto_monitoring: @@ -344,6 +372,11 @@ class ControlSystem: def start_calibration(self): """启动语音检测器校准""" print("🎯 启动语音检测器校准...") + + # 更新LED状态为校准模式 + if LED_AVAILABLE: + set_led_state(SystemState.CALIBRATING) + self.input_command_queue.put(ControlCommand('start_calibration')) return True @@ -462,6 +495,9 @@ class ControlSystem: if monitoring_status and monitoring_status['enabled']: # 监听已启用,切换到录音状态 self.state = RecordingState.RECORDING + # 更新LED状态 + if LED_AVAILABLE: + set_led_state(SystemState.RECORDING) print("🎯 状态:IDLE → RECORDING(监听已启用)") else: # 监听未启用,尝试启用 @@ -471,6 +507,9 @@ class ControlSystem: # 监听启用成功,等待状态更新后进入录音状态 time.sleep(0.5) # 等待状态更新 self.state = RecordingState.RECORDING + # 更新LED状态 + if LED_AVAILABLE: + set_led_state(SystemState.RECORDING) print("🎯 状态:IDLE → RECORDING(监听已启用)") else: print("⚠️ 监听启用失败,保持IDLE状态") @@ -537,6 +576,10 @@ class ControlSystem: self.processing_complete = False self.playback_complete = False + # 更新LED状态 + if LED_AVAILABLE: + set_led_state(SystemState.PROCESSING) + print(f"🎯 状态:RECORDING → PROCESSING (时长: {event.metadata['duration']:.2f}s)") def _handle_playback_complete(self, event: ProcessEvent): @@ -553,6 +596,14 @@ class ControlSystem: self.state = RecordingState.IDLE # 设置标志,表示刚从播放状态切换过来 self._just_finished_playing = True + + # 更新LED状态 + if LED_AVAILABLE: + if self.nfc_enabled: + set_led_state(SystemState.WAITING_NFC) + else: + set_led_state(SystemState.IDLE_MONITORING) + print(f"🎯 状态:{old_state} → IDLE") # 延迟重新启用录音,确保音频设备完全停止 @@ -575,6 +626,9 @@ class ControlSystem: self.input_command_queue.put(ControlCommand('enable_recording')) # 更新状态为录音状态 self.state = RecordingState.RECORDING + # 更新LED状态 + if LED_AVAILABLE: + set_led_state(SystemState.RECORDING) print(f"🎯 状态:IDLE → RECORDING(延迟启用)") except Exception as e: print(f"❌ 主控制:延迟发送 enable_recording 命令失败: {e}") @@ -635,6 +689,10 @@ class ControlSystem: self.state = RecordingState.PLAYING self.stats['successful_processing'] += 1 + # 更新LED状态 + if LED_AVAILABLE: + set_led_state(SystemState.PLAYING) + print("🎯 状态:PROCESSING → PLAYING") except Exception as e: @@ -648,6 +706,15 @@ class ControlSystem: self.processing_complete = True self.playback_complete = True + # 更新LED状态 + if LED_AVAILABLE: + set_led_state(SystemState.ERROR) + time.sleep(2) # 显示错误状态2秒 + if self.nfc_enabled: + set_led_state(SystemState.WAITING_NFC) + else: + set_led_state(SystemState.IDLE_MONITORING) + # 发送完成信号,防止输出进程等待 try: # 发送LLM完成信号 @@ -1395,6 +1462,10 @@ class ControlSystem: # 设置状态为播放状态 self.state = RecordingState.PLAYING + # 更新LED状态为打招呼效果 + if LED_AVAILABLE: + set_led_state(SystemState.GREETING) + # 发送打招呼文本到TTS(带缓存支持) character_name = character_config.get("name", self.config['processing']['character']) print(f"📡 准备发送打招呼文本到输出进程: {greeting_text} (角色: {character_name})") @@ -1680,6 +1751,10 @@ class ControlSystem: print(f"❌ 角色配置不存在: {character_name}") return + # 显示角色切换LED效果 + if LED_AVAILABLE: + set_led_state(SystemState.CHARACTER_SWITCH) + # === 紧急停止所有音频活动 === print("🚨 NFC切换:紧急停止所有音频活动...") self._emergency_stop_all_audio() @@ -1712,9 +1787,16 @@ class ControlSystem: greeting_success = self.play_greeting() if greeting_success: print("✅ 角色切换完成") + + # 注意:不立即恢复NFC等待状态,让打招呼LED效果持续显示 + # LED状态会在播放完成后自动恢复 else: print("⚠️ 打招呼播放失败,继续运行...") + # 即使打招呼失败,也恢复NFC等待状态 + if LED_AVAILABLE: + set_led_state(SystemState.WAITING_NFC) + def _emergency_stop_all_audio(self): """紧急停止所有音频活动 - 用于NFC切换时立即停止""" diff --git a/led_controller.py b/led_controller.py new file mode 100644 index 0000000..c478e6c --- /dev/null +++ b/led_controller.py @@ -0,0 +1,572 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +LED灯光控制器 +为多进程音频录音系统提供状态LED灯光效果 +""" + +import time +import threading +import random +import math +from typing import Dict, Any, Optional, Tuple +from enum import Enum +import logging + +# 尝试导入LED驱动库 +try: + import RPi.GPIO as GPIO + from rpi_ws281x import Color, PixelStrip + LED_AVAILABLE = True +except ImportError: + LED_AVAILABLE = False + print("⚠️ LED驱动库未安装,LED功能将被禁用") + +class SystemState(Enum): + """系统状态枚举""" + STARTUP = "startup" + WAITING_NFC = "waiting_nfc" + CALIBRATING = "calibrating" + IDLE_MONITORING = "idle_monitoring" + RECORDING = "recording" + PROCESSING = "processing" + PLAYING = "playing" + CHARACTER_SWITCH = "character_switch" + GREETING = "greeting" # 新增:打招呼状态 + ERROR = "error" + SHUTDOWN = "shutdown" + +class LEDController: + """LED灯光控制器""" + + def __init__(self, led_count=10, brightness=100, led_pin=10): + """ + 初始化LED控制器 + + Args: + led_count: LED数量 + brightness: 亮度 (0-255) + led_pin: GPIO引脚号 + """ + self.led_count = led_count + self.brightness = brightness + self.led_pin = led_pin + self.available = LED_AVAILABLE + + # LED控制对象 + self.strip = None + + # 状态控制 + self.current_state = SystemState.SHUTDOWN + self.running = False + self.effect_thread = None + self.effect_lock = threading.Lock() + + # 特效参数 + self.effect_params = {} + + # 初始化LED + if self.available: + self._init_led() + + # 设置日志 + self.logger = logging.getLogger(__name__) + + def _init_led(self): + """初始化LED硬件""" + try: + # WS2812B配置 + LED_FREQ_HZ = 800000 # LED信号频率 + LED_DMA = 10 # DMA通道 + LED_INVERT = False # 是否反转信号 + LED_CHANNEL = 0 # PWM通道 + + # 创建PixelStrip对象 + self.strip = PixelStrip( + self.led_count, self.led_pin, LED_FREQ_HZ, + LED_DMA, LED_INVERT, self.brightness, LED_CHANNEL + ) + self.strip.begin() + + # 初始化后清空 + time.sleep(0.1) + self.clear_all() + + print(f"✅ LED控制器初始化成功: {self.led_count}个LED, 引脚GPIO{self.led_pin}") + + except Exception as e: + print(f"❌ LED初始化失败: {e}") + self.available = False + + def start(self): + """启动LED控制器""" + if not self.available: + print("⚠️ LED不可用,跳过启动") + return False + + self.running = True + + # 启动特效线程 + self.effect_thread = threading.Thread(target=self._effect_loop, daemon=True) + self.effect_thread.start() + + # 设置启动状态 + self.set_state(SystemState.STARTUP) + + print("✅ LED控制器已启动") + return True + + def stop(self): + """停止LED控制器""" + if not self.available or not self.running: + return + + self.running = False + + # 等待特效线程结束 + if self.effect_thread and self.effect_thread.is_alive(): + self.effect_thread.join(timeout=2) + + # 关闭LED + self.clear_all() + + print("🛑 LED控制器已停止") + + def set_state(self, state: SystemState, **params): + """ + 设置系统状态和对应的LED效果 + + Args: + state: 系统状态 + **params: 特效参数 + """ + if not self.available: + return + + with self.effect_lock: + self.current_state = state + self.effect_params = params + + self.logger.info(f"LED状态切换: {state.value}") + + def clear_all(self): + """清空所有LED""" + if not self.available: + return + + try: + for i in range(self.strip.numPixels()): + self.strip.setPixelColor(i, Color(0, 0, 0)) + self.strip.show() + time.sleep(0.1) + except Exception as e: + self.logger.error(f"清空LED失败: {e}") + + def _effect_loop(self): + """特效循环""" + while self.running: + try: + with self.effect_lock: + state = self.current_state + params = self.effect_params.copy() + + # 根据状态执行对应特效 + if state == SystemState.STARTUP: + self._startup_effect() + elif state == SystemState.WAITING_NFC: + self._waiting_nfc_effect() + elif state == SystemState.CALIBRATING: + self._calibrating_effect() + elif state == SystemState.IDLE_MONITORING: + self._idle_monitoring_effect() + elif state == SystemState.RECORDING: + self._recording_effect() + elif state == SystemState.PROCESSING: + self._processing_effect() + elif state == SystemState.PLAYING: + self._playing_effect() + elif state == SystemState.CHARACTER_SWITCH: + self._character_switch_effect() + elif state == SystemState.GREETING: + self._greeting_effect() + elif state == SystemState.ERROR: + self._error_effect() + elif state == SystemState.SHUTDOWN: + self._shutdown_effect() + + time.sleep(0.05) # 控制特效刷新率 + + except Exception as e: + self.logger.error(f"特效循环错误: {e}") + time.sleep(1) + + def _startup_effect(self): + """启动效果 - 彩虹渐变""" + colors = self._rainbow_colors() + self._show_moving_colors(colors, speed=0.1) + + def _waiting_nfc_effect(self): + """等待NFC效果 - 蓝色呼吸灯""" + self._breathing_effect((0, 100, 255), min_brightness=0.1, max_brightness=0.8, speed=0.05) + + def _calibrating_effect(self): + """校准效果 - 黄色跑马灯""" + self._running_light_effect((255, 255, 0), speed=0.15) + + def _idle_monitoring_effect(self): + """空闲监听效果 - 绿色微弱闪烁""" + self._gentle_blink_effect((0, 255, 0), intensity=0.3, speed=1.0) + + def _recording_effect(self): + """录音效果 - 红色脉冲""" + self._pulse_effect((255, 0, 0), speed=0.3) + + def _processing_effect(self): + """处理效果 - 紫色旋转""" + self._rotating_effect((128, 0, 128), speed=0.2) + + def _playing_effect(self): + """播放效果 - 青色流动""" + self._wave_effect((0, 255, 255), speed=0.4) + + def _character_switch_effect(self): + """角色切换效果 - 彩色波浪""" + colors = [(255, 0, 0), (255, 127, 0), (255, 255, 0), (0, 255, 0), (0, 0, 255), (75, 0, 130), (148, 0, 211)] + self._wave_effect_multi(colors, speed=0.3) + + def _error_effect(self): + """错误效果 - 红色闪烁""" + self._blink_effect((255, 0, 0), speed=0.5) + + def _shutdown_effect(self): + """关闭效果 - 渐暗""" + self._fade_out_effect(speed=0.1) + + # ===== 基础特效函数 ===== + + def _breathing_effect(self, color: Tuple[int, int, int], min_brightness=0.1, max_brightness=0.8, speed=0.05): + """呼吸效果""" + steps = 50 + brightness = min_brightness + direction = 1 + + for _ in range(steps): + brightness += direction * (max_brightness - min_brightness) / steps + if brightness >= max_brightness or brightness <= min_brightness: + direction *= -1 + + r = int(color[0] * brightness) + g = int(color[1] * brightness) + b = int(color[2] * brightness) + + colors = [(r, g, b)] * self.led_count + self._show_colors(colors) + time.sleep(speed) + + def _running_light_effect(self, color: Tuple[int, int, int], speed=0.15): + """跑马灯效果""" + colors = [(0, 0, 0)] * self.led_count + + if not hasattr(self, '_running_position'): + self._running_position = 0 + + colors[self._running_position] = color + + # 添加拖尾效果 + for i in range(1, 4): + pos = (self._running_position - i) % self.led_count + brightness = 1.0 - (i * 0.3) + if brightness > 0: + colors[pos] = tuple(int(c * brightness) for c in color) + + self._show_colors(colors) + self._running_position = (self._running_position + 1) % self.led_count + time.sleep(speed) + + def _pulse_effect(self, color: Tuple[int, int, int], speed=0.3): + """脉冲效果""" + colors = [(0, 0, 0)] * self.led_count + + if not hasattr(self, '_pulse_phase'): + self._pulse_phase = 0 + + # 计算脉冲位置 + center = self.led_count // 2 + radius = abs(math.sin(self._pulse_phase)) * (self.led_count // 2) + + for i in range(self.led_count): + distance = abs(i - center) + if distance <= radius: + brightness = 1.0 - (distance / max(radius, 1)) + colors[i] = tuple(int(c * brightness) for c in color) + + self._show_colors(colors) + self._pulse_phase += speed * 0.5 + time.sleep(speed) + + def _rotating_effect(self, color: Tuple[int, int, int], speed=0.2): + """旋转效果""" + colors = [(0, 0, 0)] * self.led_count + + if not hasattr(self, '_rotate_angle'): + self._rotate_angle = 0 + + # 创建旋转图案 + for i in range(self.led_count): + angle = (i * 2 * math.pi / self.led_count) + self._rotate_angle + brightness = (math.sin(angle) + 1) / 2 + colors[i] = tuple(int(c * brightness) for c in color) + + self._show_colors(colors) + self._rotate_angle += speed + time.sleep(speed) + + def _wave_effect(self, color: Tuple[int, int, int], speed=0.4): + """波浪效果""" + colors = [(0, 0, 0)] * self.led_count + + if not hasattr(self, '_wave_offset'): + self._wave_offset = 0 + + for i in range(self.led_count): + # 使用正弦波创建波浪效果 + wave_value = math.sin((i + self._wave_offset) * 0.5) + brightness = (wave_value + 1) / 2 + colors[i] = tuple(int(c * brightness) for c in color) + + self._show_colors(colors) + self._wave_offset += speed + time.sleep(speed) + + def _wave_effect_multi(self, colors: list, speed=0.3): + """多彩波浪效果""" + color_list = [(0, 0, 0)] * self.led_count + + if not hasattr(self, '_wave_multi_offset'): + self._wave_multi_offset = 0 + + for i in range(self.led_count): + # 创建波浪效果并映射到多种颜色 + wave_value = math.sin((i + self._wave_multi_offset) * 0.3) + color_index = int((wave_value + 1) / 2 * len(colors)) % len(colors) + brightness = (math.sin((i + self._wave_multi_offset) * 0.7) + 1) / 2 + color_list[i] = tuple(int(c * brightness) for c in colors[color_index]) + + self._show_colors(color_list) + self._wave_multi_offset += speed + time.sleep(speed) + + def _blink_effect(self, color: Tuple[int, int, int], speed=0.5): + """闪烁效果""" + if not hasattr(self, '_blink_state'): + self._blink_state = False + + if self._blink_state: + colors = [color] * self.led_count + else: + colors = [(0, 0, 0)] * self.led_count + + self._show_colors(colors) + self._blink_state = not self._blink_state + time.sleep(speed) + + def _gentle_blink_effect(self, color: Tuple[int, int, int], intensity=0.3, speed=1.0): + """微弱闪烁效果""" + if not hasattr(self, '_gentle_blink_phase'): + self._gentle_blink_phase = 0 + + brightness = intensity * (math.sin(self._gentle_blink_phase) + 1) / 2 + colors = [tuple(int(c * brightness) for c in color)] * self.led_count + + self._show_colors(colors) + self._gentle_blink_phase += speed * 0.1 + time.sleep(speed) + + def _fade_out_effect(self, speed=0.1): + """渐暗效果""" + if not hasattr(self, '_fade_brightness'): + self._fade_brightness = 1.0 + + brightness = max(0, self._fade_brightness - speed) + colors = [(int(255 * brightness), int(255 * brightness), int(255 * brightness))] * self.led_count + + self._show_colors(colors) + self._fade_brightness = brightness + + if brightness <= 0: + self.clear_all() + + def _show_moving_colors(self, colors: list, speed=0.1): + """移动色彩效果""" + if not hasattr(self, '_color_offset'): + self._color_offset = 0 + + color_list = [] + for i in range(self.led_count): + color_index = (i + self._color_offset) % len(colors) + color_list.append(colors[color_index]) + + self._show_colors(color_list) + self._color_offset = (self._color_offset + 1) % len(colors) + time.sleep(speed) + + def _greeting_effect(self): + """打招呼效果 - 温暖的彩色脉冲""" + colors = [(0, 0, 0)] * self.led_count + + if not hasattr(self, '_greeting_phase'): + self._greeting_phase = 0 + self._greeting_start_time = time.time() + + # 打招呼效果持续10秒 + elapsed_time = time.time() - self._greeting_start_time + if elapsed_time > 10: + # 自动重置,避免卡在打招呼状态 + self._greeting_phase = 0 + self._greeting_start_time = time.time() + + # 创建温暖的彩色脉冲效果 + for i in range(self.led_count): + # 使用多个正弦波创建复杂的脉冲效果 + wave1 = math.sin((i * 0.8 + self._greeting_phase * 0.1) * math.pi) + wave2 = math.sin((i * 0.5 + self._greeting_phase * 0.08) * math.pi * 1.5) + + # 混合波形 + combined_wave = (wave1 + wave2) / 2 + + # 确保亮度在合理范围内 + brightness = max(0, min(1, (combined_wave + 1) / 2)) + + # 创建温暖的颜色变化 + if i < self.led_count // 3: + # 红橙色部分 + color = (255, int(200 * brightness), int(50 * brightness)) + elif i < 2 * self.led_count // 3: + # 黄色部分 + color = (int(255 * brightness), int(255 * brightness), int(100 * brightness)) + else: + # 粉紫色部分 + color = (int(255 * brightness), int(150 * brightness), int(255 * brightness)) + + colors[i] = tuple(int(c * brightness) for c in color) + + self._show_colors(colors) + self._greeting_phase += 1 + time.sleep(0.08) + + # ===== 工具函数 ===== + + def _rainbow_colors(self, count=7): + """生成彩虹色彩""" + colors = [] + for i in range(count): + hue = i * 360 / count + colors.append(self._hsv_to_rgb(hue, 1.0, 1.0)) + return colors + + def _hsv_to_rgb(self, h: float, s: float, v: float) -> Tuple[int, int, int]: + """HSV颜色转RGB""" + h = h / 60.0 + c = v * s + x = c * (1 - abs((h % 2) - 1)) + m = v - c + + if 0 <= h < 1: + r, g, b = c, x, 0 + elif 1 <= h < 2: + r, g, b = x, c, 0 + elif 2 <= h < 3: + r, g, b = 0, c, x + elif 3 <= h < 4: + r, g, b = 0, x, c + elif 4 <= h < 5: + r, g, b = x, 0, c + else: + r, g, b = c, 0, x + + return (int((r + m) * 255), int((g + m) * 255), int((b + m) * 255)) + + def _show_colors(self, colors: list): + """显示颜色""" + if not self.available: + return + + try: + for i, (r, g, b) in enumerate(colors): + if i < self.strip.numPixels(): + self.strip.setPixelColor(i, Color(r, g, b)) + self.strip.show() + except Exception as e: + self.logger.error(f"显示颜色失败: {e}") + +# ===== 全局LED控制器实例 ===== +_led_controller = None + +def get_led_controller(led_count=10, brightness=100, led_pin=10) -> LEDController: + """获取LED控制器单例""" + global _led_controller + + if _led_controller is None: + _led_controller = LEDController(led_count, brightness, led_pin) + + return _led_controller + +def start_led_effects(led_count=10, brightness=100, led_pin=10): + """启动LED特效系统""" + controller = get_led_controller(led_count, brightness, led_pin) + return controller.start() + +def stop_led_effects(): + """停止LED特效系统""" + global _led_controller + + if _led_controller: + _led_controller.stop() + _led_controller = None + +def set_led_state(state: SystemState, **params): + """设置LED状态""" + global _led_controller + + if _led_controller: + _led_controller.set_state(state, **params) + +if __name__ == "__main__": + # 测试LED控制器 + print("🎆 测试LED控制器...") + + controller = LEDController(led_count=10, brightness=100) + + if not controller.available: + print("❌ LED不可用") + exit(1) + + try: + controller.start() + + # 测试各种状态 + states = [ + SystemState.STARTUP, + SystemState.WAITING_NFC, + SystemState.CALIBRATING, + SystemState.IDLE_MONITORING, + SystemState.RECORDING, + SystemState.PROCESSING, + SystemState.PLAYING, + SystemState.CHARACTER_SWITCH, + SystemState.ERROR, + SystemState.SHUTDOWN + ] + + for state in states: + print(f"\n🎭 测试状态: {state.value}") + controller.set_state(state) + time.sleep(3) + + print("\n✅ LED控制器测试完成") + + except KeyboardInterrupt: + print("\n👋 用户中断") + finally: + controller.stop() \ No newline at end of file diff --git a/led_demo/ws2812b_effects_show.py b/led_demo/ws2812b_effects_show.py new file mode 100644 index 0000000..fa4cc56 --- /dev/null +++ b/led_demo/ws2812b_effects_show.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 +""" +WS2812B LED特效展示脚本 +基于精确时序的多种LED效果 +""" + +import time +import random +import math +import RPi.GPIO as GPIO +from rpi_ws281x import Color, PixelStrip + + +class WS2812B_WS281X: + def __init__(self, led_count=10, brightness=100): + self.led_count = led_count + self.brightness = brightness + + # WS2812B配置 + LED_PIN = 10 # GPIO引脚(SPI模式) + LED_FREQ_HZ = 800000 # LED信号频率 + LED_DMA = 10 # DMA通道 + LED_INVERT = False # 是否反转信号 + LED_CHANNEL = 0 # PWM通道 + + # 创建PixelStrip对象 + self.strip = PixelStrip(led_count, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, brightness, LED_CHANNEL) + self.strip.begin() + + # 初始化后清空 + time.sleep(0.1) + for i in range(led_count): + self.strip.setPixelColor(i, Color(0, 0, 0)) + self.strip.show() + time.sleep(0.1) + + def show(self, colors): + """显示颜色""" + for i, (r, g, b) in enumerate(colors): + self.strip.setPixelColor(i, Color(r, g, b)) + self.strip.show() + + def cleanup(self): + """清理资源""" + # 清空所有LED + for i in range(self.led_count): + self.strip.setPixelColor(i, Color(0, 0, 0)) + self.strip.show() + +# ===== 新增LED特效函数 ===== + +def breathing_effect(ws, color=(255, 0, 0), cycles=5, min_brightness=0.1, max_brightness=0.8): + """呼吸灯效果""" + print("呼吸灯效果开始...") + + steps = 50 + for cycle in range(cycles): + print(f" 呼吸循环 {cycle + 1}/{cycles}") + + # 呼吸(渐亮) + for i in range(steps): + brightness = min_brightness + (max_brightness - min_brightness) * (i / steps) + r = int(color[0] * brightness) + g = int(color[1] * brightness) + b = int(color[2] * brightness) + + colors = [(r, g, b)] * ws.led_count + ws.show(colors) + time.sleep(0.02) + + # 呼吸(渐暗) + for i in range(steps): + brightness = max_brightness - (max_brightness - min_brightness) * (i / steps) + r = int(color[0] * brightness) + g = int(color[1] * brightness) + b = int(color[2] * brightness) + + colors = [(r, g, b)] * ws.led_count + ws.show(colors) + time.sleep(0.02) + + ws.show([(0, 0, 0)] * ws.led_count) + print("呼吸灯效果完成!") + +def color_wipe(ws, color=(255, 0, 0), delay=0.1, cycles=3): + """颜色擦除效果""" + print("颜色擦除效果开始...") + + for cycle in range(cycles): + print(f" 擦除循环 {cycle + 1}/{cycles}") + + # 正向擦除 + for i in range(ws.led_count): + colors = [(0, 0, 0)] * ws.led_count + for j in range(i + 1): + colors[j] = color + ws.show(colors) + time.sleep(delay) + + # 反向擦除 + for i in range(ws.led_count - 1, -1, -1): + colors = [(0, 0, 0)] * ws.led_count + for j in range(i + 1): + colors[j] = color + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("颜色擦除效果完成!") + +def theater_chase(ws, color1=(255, 0, 0), color2=(0, 0, 0), delay=0.1, cycles=10): + """剧院追逐效果""" + print("剧院追逐效果开始...") + + for cycle in range(cycles): + print(f" 剧院循环 {cycle + 1}/{cycles}") + + for i in range(3): + colors = [] + for j in range(ws.led_count): + if (j + i) % 3 == 0: + colors.append(color1) + else: + colors.append(color2) + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("剧院追逐效果完成!") + +def rainbow_cycle(ws, delay=0.02, cycles=3): + """彩虹循环效果""" + print("彩虹循环效果开始...") + + for cycle in range(cycles): + print(f" 彩虹循环 {cycle + 1}/{cycles}") + + for i in range(256): + colors = [] + for j in range(ws.led_count): + pixel_index = (i * 256 // ws.led_count + j) % 256 + colors.append(wheel(pixel_index)) + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("彩虹循环效果完成!") + +def wheel(pos): + """生成彩虹颜色""" + pos = 255 - pos + if pos < 85: + return (255 - pos * 3, 0, pos * 3) + elif pos < 170: + pos -= 85 + return (0, pos * 3, 255 - pos * 3) + else: + pos -= 170 + return (pos * 3, 255 - pos * 3, 0) + +def fire_effect(ws, delay=0.1, cycles=10): + """火焰效果""" + print("火焰效果开始...") + + for cycle in range(cycles): + print(f" 火焰循环 {cycle + 1}/{cycles}") + + colors = [] + for i in range(ws.led_count): + # 随机生成火焰颜色(红、橙、黄) + rand = random.random() + if rand < 0.3: + colors.append((255, 0, 0)) # 红 + elif rand < 0.6: + colors.append((255, 100, 0)) # 橙 + else: + colors.append((255, 255, 0)) # 黄 + + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("火焰效果完成!") + +def sparkle_effect(ws, base_color=(50, 50, 50), sparkle_color=(255, 255, 255), delay=0.1, cycles=20): + """闪烁效果""" + print("闪烁效果开始...") + + for cycle in range(cycles): + colors = [base_color] * ws.led_count + + # 随机选择几个LED进行闪烁 + sparkle_count = random.randint(1, 3) + for _ in range(sparkle_count): + pos = random.randint(0, ws.led_count - 1) + colors[pos] = sparkle_color + + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("闪烁效果完成!") + +def pulse_effect(ws, color=(255, 0, 0), cycles=5): + """脉冲效果""" + print("脉冲效果开始...") + + for cycle in range(cycles): + print(f" 脉冲循环 {cycle + 1}/{cycles}") + + # 从中心向外扩散 + for i in range(ws.led_count // 2 + 1): + colors = [(0, 0, 0)] * ws.led_count + + # 设置左右对称的LED + if i < ws.led_count: + colors[i] = color + colors[ws.led_count - 1 - i] = color + + ws.show(colors) + time.sleep(0.05) + + # 从边缘向中心收缩 + for i in range(ws.led_count // 2, -1, -1): + colors = [(0, 0, 0)] * ws.led_count + + if i < ws.led_count: + colors[i] = color + colors[ws.led_count - 1 - i] = color + + ws.show(colors) + time.sleep(0.05) + + ws.show([(0, 0, 0)] * ws.led_count) + print("脉冲效果完成!") + +def color_mixing(ws, color1=(255, 0, 0), color2=(0, 0, 255), delay=0.1, cycles=10): + """颜色混合效果""" + print("颜色混合效果开始...") + + for cycle in range(cycles): + print(f" 混合循环 {cycle + 1}/{cycles}") + + # 计算混合比例 + ratio = (cycle % 20) / 20.0 + if ratio > 0.5: + ratio = 1.0 - ratio + + # 混合颜色 + r = int(color1[0] * (1 - ratio) + color2[0] * ratio) + g = int(color1[1] * (1 - ratio) + color2[1] * ratio) + b = int(color1[2] * (1 - ratio) + color2[2] * ratio) + + colors = [(r, g, b)] * ws.led_count + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("颜色混合效果完成!") + +def running_light(ws, colors=None, delay=0.15, cycles=2): + """跑马灯效果(保留原有函数)""" + if colors is None: + colors = [(150, 0, 0), (0, 150, 0), (0, 0, 150)] + + print("跑马灯效果开始...") + + for cycle in range(cycles): + print(f" 跑马灯循环 {cycle + 1}/{cycles}") + ws.show([(0, 0, 0)] * ws.led_count) + time.sleep(0.2) + + for i in range(ws.led_count): + led_colors = [(0, 0, 0)] * ws.led_count + for j, color in enumerate(colors): + pos = (i + j) % ws.led_count + led_colors[pos] = color + + ws.show(led_colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("跑马灯效果完成!") + +def simple_single_chase(ws, color=(255, 0, 0), delay=0.15, cycles=2): + """简化版单点追逐效果(保留原有函数)""" + print("简化版单点追逐效果开始...") + + color = (min(color[0], 150), min(color[1], 150), min(color[2], 150)) + + for cycle in range(cycles): + print(f" 循环 {cycle + 1}/{cycles}") + ws.show([(0, 0, 0)] * ws.led_count) + time.sleep(0.3) + + for i in range(ws.led_count): + colors = [(0, 0, 0)] * ws.led_count + colors[i] = color + ws.show(colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + time.sleep(0.2) + + ws.show([(0, 0, 0)] * ws.led_count) + time.sleep(0.2) + print("简化版单点追逐效果完成!") + +def rainbow_effect(ws, delay=0.2, cycles=3): + """彩虹效果(保留原有函数)""" + print("彩虹效果开始...") + + rainbow_colors = [ + (150, 0, 0), # 红 + (150, 75, 0), # 橙 + (150, 150, 0), # 黄 + (0, 150, 0), # 绿 + (0, 0, 150), # 蓝 + (50, 0, 100), # 靛 + (100, 0, 150) # 紫 + ] + + for cycle in range(cycles): + colors = [] + for i in range(ws.led_count): + color_index = i % len(rainbow_colors) + colors.append(rainbow_colors[color_index]) + + for shift in range(len(colors)): + shifted_colors = [] + for i in range(ws.led_count): + color_index = (i + shift) % len(rainbow_colors) + shifted_colors.append(rainbow_colors[color_index]) + + ws.show(shifted_colors) + time.sleep(delay) + + ws.show([(0, 0, 0)] * ws.led_count) + print("彩虹效果完成!") + +def rainbow_simple(ws, wait_ms=20, iterations=1): + """简单彩虹效果(基于原始文件)""" + print("简单彩虹效果开始...") + + for j in range(256 * iterations): + colors = [] + for i in range(ws.led_count): + colors.append(wheel((i + j) & 255)) + ws.show(colors) + time.sleep(wait_ms / 1000.0) + + ws.show([(0, 0, 0)] * ws.led_count) + print("简单彩虹效果完成!") + +def reset_strip(ws): + """重置LED灯带""" + print("重置LED灯带...") + for i in range(ws.led_count): + ws.strip.setPixelColor(i, Color(0, 0, 0)) + ws.strip.show() + time.sleep(0.1) + +def test_single_color_with_reset(ws, color, color_name): + """测试单个颜色并立即重置""" + print(f"测试{color_name}...") + + try: + # 设置颜色 + for i in range(ws.led_count): + ws.strip.setPixelColor(i, color) + ws.strip.show() + time.sleep(0.001) + + # 短暂显示 + time.sleep(0.5) + + # 立即清空 + for i in range(ws.led_count): + ws.strip.setPixelColor(i, Color(0, 0, 0)) + ws.strip.show() + time.sleep(0.1) + + print(f"{color_name}测试完成") + return True + + except Exception as e: + print(f"{color_name}测试失败: {e}") + return False + +def safe_color_test(ws): + """安全颜色测试""" + print("安全颜色测试...") + + success = True + success &= test_single_color_with_reset(ws, Color(255, 0, 0), "红色") + time.sleep(0.5) + + success &= test_single_color_with_reset(ws, Color(0, 255, 0), "绿色") + time.sleep(0.5) + + success &= test_single_color_with_reset(ws, Color(0, 0, 255), "蓝色") + time.sleep(0.5) + + success &= test_single_color_with_reset(ws, Color(255, 255, 255), "白色") + + if success: + print("所有颜色测试完成") + else: + print("颜色测试过程中出现问题") + reset_strip(ws) + +# ===== 主函数:按顺序播放所有特效 ===== + +def show_all_effects(): + """按顺序播放所有LED特效""" + ws = WS2812B_WS281X(led_count=10, brightness=100) + + try: + print("🎆 LED特效展示开始!") + print("=" * 50) + + # 特效列表 + effects = [ + ("安全颜色测试", lambda: safe_color_test(ws)), + ("呼吸灯效果", lambda: breathing_effect(ws, color=(255, 0, 0), cycles=3)), + ("颜色擦除效果", lambda: color_wipe(ws, color=(0, 255, 0), cycles=2)), + ("剧院追逐效果", lambda: theater_chase(ws, color1=(255, 255, 0), color2=(0, 0, 0), cycles=8)), + ("彩虹循环效果", lambda: rainbow_cycle(ws, delay=0.05, cycles=2)), + ("简单彩虹效果", lambda: rainbow_simple(ws, wait_ms=20, iterations=1)), + ("火焰效果", lambda: fire_effect(ws, delay=0.15, cycles=8)), + ("闪烁效果", lambda: sparkle_effect(ws, base_color=(20, 20, 20), sparkle_color=(255, 255, 255), cycles=15)), + ("脉冲效果", lambda: pulse_effect(ws, color=(255, 0, 255), cycles=3)), + ("颜色混合效果", lambda: color_mixing(ws, color1=(255, 0, 0), color2=(0, 255, 0), cycles=10)), + ("跑马灯效果", lambda: running_light(ws, colors=[(255, 0, 0), (0, 255, 0), (0, 0, 255)], delay=0.2, cycles=2)), + ("单点追逐效果", lambda: simple_single_chase(ws, color=(0, 255, 255), delay=0.15, cycles=2)), + ("彩虹效果", lambda: rainbow_effect(ws, delay=0.2, cycles=2)), + ] + + # 按顺序播放每个特效 + for i, (name, effect_func) in enumerate(effects): + print(f"\n🎭 特效 {i+1}/{len(effects)}: {name}") + print("-" * 30) + + effect_func() + + # 特效之间短暂停顿 + time.sleep(1) + + print("\n🎉 所有LED特效播放完成!") + print("=" * 50) + + except KeyboardInterrupt: + print("\n⏹️ 用户中断,停止播放...") + ws.show([(0, 0, 0)] * ws.led_count) + except Exception as e: + print(f"\n❌ 播放出错: {e}") + ws.show([(0, 0, 0)] * ws.led_count) + finally: + ws.cleanup() + print("🧹 清理完成") + +if __name__ == "__main__": + show_all_effects() \ No newline at end of file diff --git a/led_demo/ws2812b_fun_effects.py b/led_demo/ws2812b_fun_effects.py new file mode 100644 index 0000000..aa832b5 --- /dev/null +++ b/led_demo/ws2812b_fun_effects.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +""" +WS2812B 更多趣味LED特效 +包含物理效果、游戏效果和节日主题特效 +""" + +import time +import random +import math +import RPi.GPIO as GPIO +from rpi_ws281x import Color, PixelStrip + + +class WS2812B_WS281X: + def __init__(self, led_count=10, brightness=100): + self.led_count = led_count + self.brightness = brightness + + # WS2812B配置 + LED_PIN = 10 # GPIO引脚(SPI模式) + LED_FREQ_HZ = 800000 # LED信号频率 + LED_DMA = 10 # DMA通道 + LED_INVERT = False # 是否反转信号 + LED_CHANNEL = 0 # PWM通道 + + # 创建PixelStrip对象 + self.strip = PixelStrip(led_count, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, brightness, LED_CHANNEL) + self.strip.begin() + + # 初始化后清空 + time.sleep(0.1) + for i in range(led_count): + self.strip.setPixelColor(i, Color(0, 0, 0)) + self.strip.show() + time.sleep(0.1) + + def show(self, colors): + """显示颜色""" + for i, (r, g, b) in enumerate(colors): + self.strip.setPixelColor(i, Color(int(r), int(g), int(b))) + self.strip.show() + + def cleanup(self): + """清理资源""" + # 清空所有LED + for i in range(self.led_count): + self.strip.setPixelColor(i, Color(0, 0, 0)) + self.strip.show() + +# ===== 物理效果类特效 ===== + +def gravity_bounce(ws, ball_color=(255, 100, 0), cycles=5): + """重力弹跳效果""" + print("重力弹跳效果开始...") + + for cycle in range(cycles): + print(f" 弹跳 {cycle + 1}/{cycles}") + + # 模拟重力弹跳 + height = 0 + velocity = 0 + gravity = 0.5 + bounce_damping = 0.8 + + for _ in range(50): + velocity += gravity + height += velocity + + # 检查是否触底 + if height >= ws.led_count - 1: + height = ws.led_count - 1 + velocity = -velocity * bounce_damping + + # 显示球的位置 + colors = [(0, 0, 0)] * ws.led_count + ball_pos = int(height) + if 0 <= ball_pos < ws.led_count: + colors[ball_pos] = ball_color + + # 添加影子效果 + if ball_pos > 0: + shadow_intensity = max(0, int(50 - abs(height - ball_pos) * 20)) + colors[ball_pos - 1] = (shadow_intensity, shadow_intensity // 2, 0) + + ws.show(colors) + time.sleep(0.05) + + ws.show([(0, 0, 0)] * ws.led_count) + print("重力弹跳效果完成!") + +def wave_ripple(ws, center_color=(0, 100, 255), cycles=8): + """波纹扩散效果""" + print("波纹扩散效果开始...") + + for cycle in range(cycles): + print(f" 波纹 {cycle + 1}/{cycles}") + + for i in range(ws.led_count): + colors = [(0, 0, 0)] * ws.led_count + + # 从中心向两边扩散 + center = ws.led_count // 2 + distance = abs(i - center) + + # 在两边对称的位置显示波纹 + if center - distance >= 0: + intensity = max(0, 255 - distance * 50) + colors[center - distance] = ( + int(center_color[0] * intensity / 255), + int(center_color[1] * intensity / 255), + int(center_color[2] * intensity / 255) + ) + + if center + distance < ws.led_count and distance > 0: + intensity = max(0, 255 - distance * 50) + colors[center + distance] = ( + int(center_color[0] * intensity / 255), + int(center_color[1] * intensity / 255), + int(center_color[2] * intensity / 255) + ) + + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("波纹扩散效果完成!") + +def magnet_effect(ws, metal_color=(150, 150, 150), magnet_color=(255, 0, 0), cycles=6): + """磁铁吸引效果""" + print("磁铁吸引效果开始...") + + for cycle in range(cycles): + print(f" 磁铁循环 {cycle + 1}/{cycles}") + + # 磁铁位置(左右两端) + magnet_positions = [0, ws.led_count - 1] + + for frame in range(30): + colors = [(0, 0, 0)] * ws.led_count + + # 显示磁铁 + for pos in magnet_positions: + colors[pos] = magnet_color + + # 计算金属粒子位置 + for i in range(3): # 3个金属粒子 + particle_pos = int(4 + 2 * math.sin(frame * 0.3 + i * 2)) + if 0 < particle_pos < ws.led_count - 1: + colors[particle_pos] = metal_color + + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("磁铁吸引效果完成!") + +def pendulum_swing(ws, pendulum_color=(255, 255, 0), cycles=10): + """钟摆摆动效果""" + print("钟摆摆动效果开始...") + + for cycle in range(cycles): + print(f" 钟摆循环 {cycle + 1}/{cycles}") + + # 模拟钟摆运动 + for angle in range(-60, 61, 5): + colors = [(0, 0, 0)] * ws.led_count + + # 计算钟摆位置 + center = ws.led_count // 2 + position = int(center + angle / 15) + + if 0 <= position < ws.led_count: + colors[position] = pendulum_color + + # 添加运动模糊效果 + if angle > -60 and position > 0: + colors[position - 1] = (pendulum_color[0] // 3, pendulum_color[1] // 3, pendulum_color[2] // 3) + + ws.show(colors) + time.sleep(0.08) + + # 反向摆动 + for angle in range(60, -61, -5): + colors = [(0, 0, 0)] * ws.led_count + + center = ws.led_count // 2 + position = int(center + angle / 15) + + if 0 <= position < ws.led_count: + colors[position] = pendulum_color + + if angle < 60 and position < ws.led_count - 1: + colors[position + 1] = (pendulum_color[0] // 3, pendulum_color[1] // 3, pendulum_color[2] // 3) + + ws.show(colors) + time.sleep(0.08) + + ws.show([(0, 0, 0)] * ws.led_count) + print("钟摆摆动效果完成!") + +# ===== 游戏效果类特效 ===== + +def snake_game(ws, cycles=3): + """贪吃蛇游戏效果""" + print("贪吃蛇游戏效果开始...") + + snake_color = (0, 255, 0) + food_color = (255, 0, 0) + + for cycle in range(cycles): + print(f" 贪吃蛇 {cycle + 1}/{cycles}") + + # 初始化蛇的位置 + snake = [(0, 0), (0, 1), (0, 2)] # 简化为线性移动 + food = random.randint(3, ws.led_count - 1) + + for step in range(20): + colors = [(0, 0, 0)] * ws.led_count + + # 移动蛇 + if step < 15: + new_head = (snake[-1][0] + 1) % ws.led_count + snake.append((new_head, snake[-1][1] + 1)) + snake.pop(0) + + # 显示蛇 + for i, (pos, _) in enumerate(snake): + if i == len(snake) - 1: # 蛇头 + colors[pos % ws.led_count] = (0, 150, 0) + else: # 蛇身 + colors[pos % ws.led_count] = snake_color + + # 显示食物 + colors[food] = food_color + + ws.show(colors) + time.sleep(0.2) + + ws.show([(0, 0, 0)] * ws.led_count) + print("贪吃蛇游戏效果完成!") + +def space_invaders(ws, cycles=5): + """太空侵略者效果""" + print("太空侵略者效果开始...") + + alien_color = (0, 255, 0) + bullet_color = (255, 255, 0) + + for cycle in range(cycles): + print(f" 入侵者 {cycle + 1}/{cycles}") + + # 外星人移动 + for direction in [1, -1]: + for step in range(ws.led_count - 2): + colors = [(0, 0, 0)] * ws.led_count + + # 显示外星人 + alien_pos = 3 + step * direction + if 0 <= alien_pos < ws.led_count: + colors[alien_pos] = alien_color + + # 随机子弹 + if random.random() < 0.3: + bullet_pos = random.randint(0, ws.led_count - 1) + colors[bullet_pos] = bullet_color + + ws.show(colors) + time.sleep(0.15) + + ws.show([(0, 0, 0)] * ws.led_count) + print("太空侵略者效果完成!") + +def pong_game(ws, cycles=10): + """乒乓球游戏效果""" + print("乒乓球游戏效果开始...") + + paddle_color = (255, 255, 255) + ball_color = (255, 100, 100) + + for cycle in range(cycles): + print(f" 乒乓球 {cycle + 1}/{cycles}") + + # 球的初始位置和速度 + ball_pos = ws.led_count // 2 + ball_velocity = 1 + + for _ in range(30): + colors = [(0, 0, 0)] * ws.led_count + + # 显示球拍 + colors[0] = paddle_color + colors[ws.led_count - 1] = paddle_color + + # 显示球 + if 0 <= ball_pos < ws.led_count: + colors[ball_pos] = ball_color + + # 移动球 + ball_pos += ball_velocity + + # 碰撞检测 + if ball_pos <= 0 or ball_pos >= ws.led_count - 1: + ball_velocity = -ball_velocity + + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("乒乓球游戏效果完成!") + +def matrix_rain(ws, cycles=8): + """黑客帝国数字雨效果""" + print("黑客帝国数字雨效果开始...") + + for cycle in range(cycles): + print(f" 数字雨 {cycle + 1}/{cycles}") + + for frame in range(20): + colors = [(0, 0, 0)] * ws.led_count + + # 随机生成数字雨 + for i in range(ws.led_count): + if random.random() < 0.3: + intensity = random.randint(100, 255) + colors[i] = (0, intensity, 0) + elif random.random() < 0.1: + # 更亮的"头部" + colors[i] = (100, 255, 100) + + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("黑客帝国数字雨效果完成!") + +# ===== 节日主题特效 ===== + +def christmas_lights(ws, cycles=8): + """圣诞彩灯效果""" + print("圣诞彩灯效果开始...") + + colors_list = [ + (255, 0, 0), # 红 + (0, 255, 0), # 绿 + (255, 255, 255), # 白 + (255, 215, 0), # 金 + (0, 0, 255), # 蓝 + ] + + for cycle in range(cycles): + print(f" 圣诞彩灯 {cycle + 1}/{cycles}") + + # 随机闪烁 + for i in range(15): + colors = [(0, 0, 0)] * ws.led_count + + # 随机点亮一些LED + for j in range(ws.led_count): + if random.random() < 0.4: + colors[j] = random.choice(colors_list) + + ws.show(colors) + time.sleep(0.2) + + ws.show([(0, 0, 0)] * ws.led_count) + print("圣诞彩灯效果完成!") + +def fireworks(ws, cycles=6): + """烟花爆炸效果""" + print("烟花爆炸效果开始...") + + for cycle in range(cycles): + print(f" 烟花 {cycle + 1}/{cycles}") + + # 烟花上升 + for i in range(ws.led_count): + colors = [(0, 0, 0)] * ws.led_count + colors[i] = (255, 255, 0) # 黄色上升轨迹 + ws.show(colors) + time.sleep(0.05) + + # 爆炸效果 + explosion_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)] + + for frame in range(15): + colors = [(0, 0, 0)] * ws.led_count + + # 随机爆炸粒子 + for _ in range(random.randint(3, 6)): + pos = random.randint(0, ws.led_count - 1) + colors[pos] = random.choice(explosion_colors) + + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("烟花爆炸效果完成!") + +def valentine_heart(ws, cycles=5): + """情人节心跳效果""" + print("情人节心跳效果开始...") + + for cycle in range(cycles): + print(f" 心跳 {cycle + 1}/{cycles}") + + # 心跳模式 + for beat in range(3): + colors = [(255, 0, 100)] * ws.led_count # 粉红色 + ws.show(colors) + time.sleep(0.1) + + colors = [(150, 0, 50)] * ws.led_count # 暗粉色 + ws.show(colors) + time.sleep(0.1) + + # 暂停 + colors = [(0, 0, 0)] * ws.led_count + ws.show(colors) + time.sleep(0.5) + + ws.show([(0, 0, 0)] * ws.led_count) + print("情人节心跳效果完成!") + +def halloween_pumpkin(ws, cycles=8): + """万圣节南瓜灯效果""" + print("万圣节南瓜灯效果开始...") + + for cycle in range(cycles): + print(f" 南瓜灯 {cycle + 1}/{cycles}") + + # 摇曳的橙色光 + for i in range(20): + colors = [(0, 0, 0)] * ws.led_count + + for j in range(ws.led_count): + # 随机亮度变化模拟摇曳 + intensity = random.randint(100, 255) + colors[j] = (intensity, intensity // 2, 0) # 橙色 + + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("万圣节南瓜灯效果完成!") + +def new_year_countdown(ws, cycles=3): + """新年倒计时效果""" + print("新年倒计时效果开始...") + + for cycle in range(cycles): + print(f" 倒计时 {cycle + 1}/{cycles}") + + # 倒计时数字效果 + for countdown in [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]: + colors = [(255, 215, 0)] * countdown + [(0, 0, 0)] * (ws.led_count - countdown) + ws.show(colors) + time.sleep(0.5) + + # 新年庆祝 + for i in range(10): + colors = [] + for j in range(ws.led_count): + if random.random() < 0.5: + colors.append((random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) + else: + colors.append((0, 0, 0)) + ws.show(colors) + time.sleep(0.1) + + ws.show([(0, 0, 0)] * ws.led_count) + print("新年倒计时效果完成!") + +# ===== 主展示函数 ===== + +def show_fun_effects(): + """展示所有趣味LED特效""" + ws = WS2812B_WS281X(led_count=10, brightness=100) + + try: + print("🎮 趣味LED特效展示开始!") + print("=" * 50) + + # 特效列表 + effects = [ + ("物理效果 - 重力弹跳", lambda: gravity_bounce(ws, cycles=3)), + ("物理效果 - 波纹扩散", lambda: wave_ripple(ws, cycles=6)), + ("物理效果 - 磁铁吸引", lambda: magnet_effect(ws, cycles=4)), + ("物理效果 - 钟摆摆动", lambda: pendulum_swing(ws, cycles=6)), + ("游戏效果 - 贪吃蛇", lambda: snake_game(ws, cycles=2)), + ("游戏效果 - 太空侵略者", lambda: space_invaders(ws, cycles=3)), + ("游戏效果 - 乒乓球", lambda: pong_game(ws, cycles=6)), + ("游戏效果 - 黑客帝国", lambda: matrix_rain(ws, cycles=5)), + ("节日效果 - 圣诞彩灯", lambda: christmas_lights(ws, cycles=6)), + ("节日效果 - 烟花爆炸", lambda: fireworks(ws, cycles=4)), + ("节日效果 - 情人心跳", lambda: valentine_heart(ws, cycles=4)), + ("节日效果 - 万圣南瓜", lambda: halloween_pumpkin(ws, cycles=6)), + ("节日效果 - 新年倒计时", lambda: new_year_countdown(ws, cycles=2)), + ] + + # 按顺序播放每个特效 + for i, (name, effect_func) in enumerate(effects): + print(f"\n🎭 特效 {i+1}/{len(effects)}: {name}") + print("-" * 30) + + effect_func() + + # 特效之间短暂停顿 + time.sleep(1) + + print("\n🎉 所有趣味特效播放完成!") + print("=" * 50) + + except KeyboardInterrupt: + print("\n⏹️ 用户中断,停止播放...") + ws.show([(0, 0, 0)] * ws.led_count) + except Exception as e: + print(f"\n❌ 播放出错: {e}") + ws.show([(0, 0, 0)] * ws.led_count) + finally: + ws.cleanup() + print("🧹 清理完成") + +if __name__ == "__main__": + show_fun_effects() \ No newline at end of file diff --git a/led_demo/ws2812b_spi_rpi_ws281x_test.py b/led_demo/ws2812b_spi_rpi_ws281x_test.py new file mode 100644 index 0000000..959edd6 --- /dev/null +++ b/led_demo/ws2812b_spi_rpi_ws281x_test.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python3 +""" +WS2812B SPI测试脚本 - 基于rpi_ws281x库 +使用SPI接口和GPIO10控制WS2812B LED灯带 +rpi_ws281x库在SPI模式下只能使用GPIO10引脚 +""" + +import time +import subprocess +import sys + +import RPi.GPIO as GPIO +from rpi_ws281x import Color, PixelStrip + +# LED参数配置 - SPI模式 +LED_COUNT = 10 # LED数量 +LED_PIN = 10 # GPIO引脚(SPI模式只能使用GPIO10) +LED_FREQ_HZ = 800000 # LED信号频率 +LED_DMA = 10 # DMA通道 +LED_BRIGHTNESS = 100 # 亮度(0-255)- 恢复适中亮度 +LED_INVERT = False # 是否反转信号 +LED_CHANNEL = 0 # PWM通道(SPI模式) + +# SPI时序参数 +SPI_RESET_DELAY = 0.1 # 重置延时(秒) +SPI_SHOW_DELAY = 0.001 # show()后延时(秒) + +def color_wipe(strip, color, wait_ms=50): + """颜色擦除效果 - 逐个点亮LED""" + for i in range(strip.numPixels()): + strip.setPixelColor(i, color) + strip.show() + time.sleep(wait_ms / 1000.0) + time.sleep(SPI_SHOW_DELAY) # 添加show()后延时 + + # 颜色显示完成后添加重置延时 + time.sleep(SPI_RESET_DELAY) + +def theater_chase(strip, color, wait_ms=50, iterations=10): + """剧院追逐效果""" + for j in range(iterations): + for q in range(3): + for i in range(0, strip.numPixels(), 3): + strip.setPixelColor(i + q, color) + strip.show() + time.sleep(wait_ms / 1000.0) + for i in range(0, strip.numPixels(), 3): + strip.setPixelColor(i + q, 0) + +def rainbow(strip, wait_ms=20, iterations=1): + """彩虹效果""" + for j in range(256 * iterations): + for i in range(strip.numPixels()): + strip.setPixelColor(i, wheel((i + j) & 255)) + strip.show() + time.sleep(SPI_SHOW_DELAY) # 添加show后延时 + time.sleep(wait_ms / 1000.0) + +def rainbow_cycle(strip, wait_ms=20, iterations=5): + """彩虹循环效果""" + for j in range(256 * iterations): + for i in range(strip.numPixels()): + strip.setPixelColor(i, wheel((int(i * 256 / strip.numPixels()) + j) & 255)) + strip.show() + time.sleep(SPI_SHOW_DELAY) # 添加show后延时 + time.sleep(wait_ms / 1000.0) + +def wheel(pos): + """生成彩虹颜色""" + if pos < 85: + return Color(pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return Color(255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return Color(0, pos * 3, 255 - pos * 3) + +def reset_strip(strip): + """重置LED灯带,解决颜色锁死问题""" + # 清空所有LED + for i in range(strip.numPixels()): + strip.setPixelColor(i, Color(0, 0, 0)) + strip.show() + + # 更长的重置延时确保WS2812B完全重置 + time.sleep(SPI_RESET_DELAY) + + # 发送额外的重置序列 + for i in range(5): + strip.show() + time.sleep(SPI_SHOW_DELAY) + + # 最终重置延时 + time.sleep(SPI_RESET_DELAY) + +def hard_reset_spi(): + """硬件级别的SPI重置""" + print("执行硬件级SPI重置...") + + # 尝试通过系统命令重置SPI + try: + # 重启SPI内核模块 + subprocess.run(["sudo", "rmmod", "spi_bcm2835"], check=False, capture_output=True) + subprocess.run(["sudo", "modprobe", "spi_bcm2835"], check=False, capture_output=True) + print("SPI内核模块重置完成") + except: + print("SPI内核模块重置失败") + + # 等待SPI重新初始化 + time.sleep(1) + +def complete_cleanup(strip): + """完全清理SPI资源""" + print("执行完全清理...") + + try: + # 清空LED + for i in range(strip.numPixels()): + strip.setPixelColor(i, Color(0, 0, 0)) + strip.show() + + # 长时间重置 + time.sleep(SPI_RESET_DELAY * 2) + + # 尝试释放SPI资源 + # 注意:rpi_ws281x库没有直接的close方法,所以我们通过其他方式清理 + + except Exception as e: + print(f"清理过程中出现错误: {e}") + + # 硬件重置 + hard_reset_spi() + + print("完全清理完成") + +def test_single_color_with_reset(strip, color, color_name): + """测试单个颜色并立即重置""" + print(f"测试{color_name}...") + + try: + # 设置颜色 + for i in range(strip.numPixels()): + strip.setPixelColor(i, color) + strip.show() + time.sleep(SPI_SHOW_DELAY) + + # 短暂显示 + time.sleep(0.5) + + # 立即清空 + for i in range(strip.numPixels()): + strip.setPixelColor(i, Color(0, 0, 0)) + strip.show() + time.sleep(SPI_RESET_DELAY) + + print(f"{color_name}测试完成") + + except Exception as e: + print(f"{color_name}测试失败: {e}") + return False + + return True + +def safe_color_test(strip): + """安全颜色测试,避免WS2812B锁死""" + print("安全颜色测试...") + + # 使用新的测试方法 + success = True + success &= test_single_color_with_reset(strip, Color(255, 0, 0), "红色") + time.sleep(0.5) + + success &= test_single_color_with_reset(strip, Color(0, 255, 0), "绿色") + time.sleep(0.5) + + success &= test_single_color_with_reset(strip, Color(0, 0, 255), "蓝色") + time.sleep(0.5) + + success &= test_single_color_with_reset(strip, Color(255, 255, 255), "白色") + + if success: + print("所有颜色测试完成") + else: + print("颜色测试过程中出现问题") + complete_cleanup(strip) + +def set_brightness(strip, brightness): + """设置所有LED的亮度""" + for i in range(strip.numPixels()): + # 获取当前颜色并应用新的亮度 + color = strip.getPixelColor(i) + r = (color >> 16) & 0xFF + g = (color >> 8) & 0xFF + b = color & 0xFF + + # 应用亮度 + r = int(r * brightness / 255) + g = int(g * brightness / 255) + b = int(b * brightness / 255) + + strip.setPixelColor(i, Color(r, g, b)) + strip.show() + +def test_basic_colors(strip): + """测试基本颜色""" + print("测试基本颜色...") + safe_color_test(strip) + +def test_brightness_levels(strip): + """测试不同亮度级别""" + print("测试亮度级别...") + + # 高亮度 + print("高亮度") + set_brightness(strip, 255) + time.sleep(2) + + # 中等亮度 + print("中等亮度") + set_brightness(strip, 128) + time.sleep(2) + + # 低亮度 + print("低亮度") + set_brightness(strip, 64) + time.sleep(2) + + # 恢复高亮度 + set_brightness(strip, 255) + +def test_patterns(strip): + """测试各种模式""" + print("测试彩虹效果...") + rainbow(strip) + time.sleep(1) + + print("测试彩虹循环...") + rainbow_cycle(strip) + time.sleep(1) + + print("测试剧院追逐...") + theater_chase(strip, Color(255, 0, 0)) + theater_chase(strip, Color(0, 255, 0)) + theater_chase(strip, Color(0, 0, 255)) + time.sleep(1) + +def test_spi_timing(strip): + """测试SPI时序""" + print("测试SPI时序...") + + # 快速刷新测试 + print("快速刷新测试") + for i in range(10): + color = Color((i * 25) % 256, (i * 50) % 256, (i * 75) % 256) + color_wipe(strip, color, 10) + + # 精确时序测试 + print("精确时序测试") + start_time = time.time() + for i in range(100): + strip.setPixelColor(0, Color(255, 0, 0)) + strip.show() + time.sleep(SPI_SHOW_DELAY) + strip.setPixelColor(0, Color(0, 255, 0)) + strip.show() + time.sleep(SPI_SHOW_DELAY) + strip.setPixelColor(0, Color(0, 0, 255)) + strip.show() + time.sleep(SPI_SHOW_DELAY) + end_time = time.time() + + print(f"300次刷新耗时: {end_time - start_time:.3f}秒") + print(f"平均每次刷新: {(end_time - start_time) / 300 * 1000:.2f}毫秒") + + # SPI模式验证 + print("SPI模式验证") + print("当前使用GPIO10 (MOSI)引脚进行SPI通信") + print("数据通过SPI0.0通道传输到WS2812B") + print(f"重置延时: {SPI_RESET_DELAY}秒") + print(f"Show延时: {SPI_SHOW_DELAY}秒") + +def diagnose_spi_issues(strip): + """诊断SPI问题""" + print("=== SPI问题诊断 ===") + + print("1. 测试单个LED颜色切换...") + for color_name, color in [("红色", Color(255, 0, 0)), + ("绿色", Color(0, 255, 0)), + ("蓝色", Color(0, 0, 255))]: + print(f" {color_name}...") + strip.setPixelColor(0, color) + strip.show() + time.sleep(SPI_SHOW_DELAY) + time.sleep(0.5) + reset_strip(strip) + time.sleep(0.5) + + print("2. 测试不同延时设置...") + for delay in [0.001, 0.005, 0.01, 0.05]: + print(f" 延时 {delay} 秒...") + strip.setPixelColor(0, Color(255, 0, 0)) + strip.show() + time.sleep(delay) + strip.setPixelColor(0, Color(0, 255, 0)) + strip.show() + time.sleep(delay) + strip.setPixelColor(0, Color(0, 0, 255)) + strip.show() + time.sleep(delay) + reset_strip(strip) + time.sleep(0.5) + + print("诊断完成") + +def initialize_strip(): + """初始化LED灯带""" + print("初始化WS2812B LED灯带...") + print("使用SPI模式和GPIO10引脚...") + + # 确保SPI处于干净状态 + hard_reset_spi() + + # 创建PixelStrip对象 + strip = PixelStrip(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL) + strip.begin() + + # 初始化后清空 + time.sleep(0.1) + for i in range(strip.numPixels()): + strip.setPixelColor(i, Color(0, 0, 0)) + strip.show() + time.sleep(SPI_RESET_DELAY) + + return strip + +def main(): + """主测试函数""" + # 初始化LED灯带 + strip = initialize_strip() + + print("WS2812B SPI测试开始...") + print(f"LED数量: {LED_COUNT}") + print(f"使用引脚: GPIO{LED_PIN} (SPI MOSI)") + print(f"SPI通道: SPI0.0") + print(f"亮度级别: {LED_BRIGHTNESS}") + print("注意: rpi_ws281x库在SPI模式下必须使用GPIO10引脚") + print("重要: 每次测试后都会执行完全清理以避免锁死") + + try: + while True: + print("\n=== WS2812B SPI测试菜单 ===") + print("1. 基本颜色测试(修复版)") + print("2. 亮度级别测试") + print("3. 特效模式测试") + print("4. SPI时序测试") + print("5. SPI问题诊断") + print("6. 全部测试") + print("7. 强制重置LED") + print("8. 退出") + print("当前使用GPIO10 (SPI模式)") + + choice = input("请选择测试模式 (1-8): ") + + if choice == '1': + test_basic_colors(strip) + elif choice == '2': + test_brightness_levels(strip) + elif choice == '3': + test_patterns(strip) + elif choice == '4': + test_spi_timing(strip) + elif choice == '5': + diagnose_spi_issues(strip) + elif choice == '6': + test_basic_colors(strip) + test_brightness_levels(strip) + test_patterns(strip) + test_spi_timing(strip) + elif choice == '7': + print("强制重置LED...") + complete_cleanup(strip) + print("完全重置完成,需要重新初始化...") + # 重新初始化 + strip = initialize_strip() + elif choice == '8': + print("执行完全清理并退出...") + complete_cleanup(strip) + break + else: + print("无效选择,请重试") + + except KeyboardInterrupt: + print("\n收到中断信号...") + except Exception as e: + print(f"测试过程中发生错误: {e}") + finally: + # 执行完全清理 + print("执行完全清理...") + complete_cleanup(strip) + print("测试结束") + +if __name__ == '__main__': + main() diff --git a/multiprocess_recorder.py b/multiprocess_recorder.py index ef5541d..1c93045 100644 --- a/multiprocess_recorder.py +++ b/multiprocess_recorder.py @@ -14,6 +14,14 @@ import time import subprocess from typing import Dict, Any +# 导入LED控制器 +try: + from led_controller import get_led_controller, SystemState, start_led_effects, stop_led_effects, set_led_state + LED_AVAILABLE = True +except ImportError: + LED_AVAILABLE = False + print("⚠️ LED控制器模块未找到,LED功能将被禁用") + def check_dependencies(): """检查系统依赖""" missing_deps = [] @@ -226,6 +234,14 @@ def main(): help='详细输出') parser.add_argument('--enable-nfc', action='store_true', help='启用NFC角色切换功能') + parser.add_argument('--enable-led', action='store_true', + help='启用LED灯光效果') + parser.add_argument('--led-count', type=int, default=10, + help='LED数量 (默认: 10)') + parser.add_argument('--led-brightness', type=int, default=100, + help='LED亮度 (默认: 100)') + parser.add_argument('--led-pin', type=int, default=10, + help='LED GPIO引脚 (默认: 10)') args = parser.parse_args() @@ -320,10 +336,46 @@ def main(): control_system.enable_nfc() nfc_enabled = True + # 启用LED功能(如果指定) + led_enabled = False + if args.enable_led: + if LED_AVAILABLE: + print("💡 启用LED灯光效果...") + print(f" LED数量: {args.led_count}") + print(f" LED亮度: {args.led_brightness}") + print(f" LED引脚: GPIO{args.led_pin}") + + try: + # 启动LED系统 + success = start_led_effects( + led_count=args.led_count, + brightness=args.led_brightness, + led_pin=args.led_pin + ) + if success: + led_enabled = True + print("✅ LED系统已启动") + + # 设置初始状态为启动效果 + set_led_state(SystemState.STARTUP) + else: + print("❌ LED系统启动失败") + except Exception as e: + print(f"❌ LED系统初始化失败: {e}") + else: + print("⚠️ LED功能不可用,请检查硬件和驱动") + # 启动系统(自动校准,但根据NFC状态决定是否启动监听) print("🚀 启动系统(包含自动校准)...") control_system.start(auto_calibration=True, auto_monitoring=not nfc_enabled) + # 如果启用了LED,设置系统状态为等待NFC或空闲监听 + if led_enabled: + if nfc_enabled: + set_led_state(SystemState.WAITING_NFC) + else: + set_led_state(SystemState.IDLE_MONITORING) + except KeyboardInterrupt: print("\n👋 用户中断") except Exception as e: @@ -332,6 +384,17 @@ def main(): import traceback traceback.print_exc() finally: + # 清理LED系统 + if LED_AVAILABLE and led_enabled: + try: + print("💡 关闭LED系统...") + set_led_state(SystemState.SHUTDOWN) + time.sleep(1) # 给LED一些时间显示关闭效果 + stop_led_effects() + print("✅ LED系统已关闭") + except Exception as e: + print(f"❌ 关闭LED系统失败: {e}") + print("👋 系统退出") if __name__ == "__main__":