iot-platform-raspberry/birbnetes_iot_platform_rasp.../led_stuff.py

95 lines
2.4 KiB
Python
Raw Normal View History

2020-09-30 00:04:50 +02:00
from threading import Thread
from queue import Queue, Empty
import time
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
class BlinkenLight:
def __init__(self, pin):
self._pin = pin
self._pattern_queue = Queue()
self._active_pattern = None
self._pointer = 0
GPIO.setup(pin, GPIO.OUT, initial=GPIO.LOW)
def clk(self):
if not self._active_pattern: # No pattern loaded
if self._pointer == -5: # Skip 5 ticks before new pattern
try:
pattern = self._pattern_queue.get(block=False)
self._active_pattern = pattern
self._pointer = 0
except Empty:
return
else:
self._pointer -= 1
# Meanwhile a pattern loaded
if self._active_pattern: # There is a pattern loaded
if self._pointer == len(self._active_pattern):
self._active_pattern = None
self._pointer = 0
GPIO.output(self._pin, False)
else:
GPIO.output(self._pin, bool(self._active_pattern[self._pointer]))
self._pointer += 1
def enqueue_pattern(self, pattern: list):
self._pattern_queue.put(pattern)
def cleanup(self):
GPIO.cleanup(self._pin)
class BlinkenLights(Thread):
2020-09-30 05:09:09 +02:00
def __init__(self, targets: dict):
2020-09-30 00:04:50 +02:00
super().__init__()
self._led_controllers = {}
2020-09-30 05:09:09 +02:00
for name, pin in targets.items():
2020-09-30 00:04:50 +02:00
self._led_controllers[name] = BlinkenLight(pin)
self._active = True
def run(self):
while self._active:
2020-09-30 05:01:15 +02:00
time.sleep(0.2)
2020-09-30 05:06:03 +02:00
for name, led in self._led_controllers.items():
2020-09-30 00:04:50 +02:00
led.clk()
2020-09-30 05:06:03 +02:00
for name, led in self._led_controllers.items():
2020-09-30 00:04:50 +02:00
led.cleanup()
def enqueue_pattern(self, target: str, pattern: list):
self._led_controllers[target].enqueue_pattern(pattern)
def stop(self):
self._active = False
class BirbnetesIoTPlatformStatusDriver:
blinken_lights = None
@classmethod
def init(cls):
2020-11-12 21:29:56 +01:00
cls.blinken_lights = BlinkenLights({'red': 24, 'green': 23, 'blue': 26})
2020-09-30 00:04:50 +02:00
cls.blinken_lights.start()
@classmethod
def enqueue_pattern(cls, target: str, pattern: list):
cls.blinken_lights.enqueue_pattern(target, pattern)
2020-09-30 03:12:25 +02:00
@classmethod
def cleanup(cls):
cls.blinken_lights.stop()