from threading import Thread from queue import Queue, Empty import time import RPi.GPIO as GPIO GPIO.setmode(GPIO.BCM) class BlinkenLight: def __init__(self, pin): self._pin = pin self._pattern_queue = Queue() self._active_pattern = None self._pointer = 0 GPIO.setup(pin, GPIO.OUT, initial=GPIO.LOW) def clk(self): if not self._active_pattern: # No pattern loaded if self._pointer == -5: # Skip 5 ticks before new pattern try: pattern = self._pattern_queue.get(block=False) self._active_pattern = pattern self._pointer = 0 except Empty: return else: self._pointer -= 1 # Meanwhile a pattern loaded if self._active_pattern: # There is a pattern loaded if self._pointer == len(self._active_pattern): self._active_pattern = None self._pointer = 0 GPIO.output(self._pin, False) else: GPIO.output(self._pin, bool(self._active_pattern[self._pointer])) self._pointer += 1 def enqueue_pattern(self, pattern: list): self._pattern_queue.put(pattern) def cleanup(self): GPIO.cleanup(self._pin) class BlinkenLights(Thread): def __init__(self, targets: dict): super().__init__() self._led_controllers = {} for name, pin in targets.items(): self._led_controllers[name] = BlinkenLight(pin) self._active = True def run(self): while self._active: time.sleep(0.2) for name, led in self._led_controllers.items(): led.clk() for name, led in self._led_controllers.items(): led.cleanup() def enqueue_pattern(self, target: str, pattern: list): self._led_controllers[target].enqueue_pattern(pattern) def stop(self): self._active = False class BirbnetesIoTPlatformStatusDriver: blinken_lights = None @classmethod def init(cls): cls.blinken_lights = BlinkenLights({'red': 24, 'green': 23, 'blue': 26}) cls.blinken_lights.start() @classmethod def enqueue_pattern(cls, target: str, pattern: list): cls.blinken_lights.enqueue_pattern(target, pattern) @classmethod def cleanup(cls): cls.blinken_lights.stop()