Implemented blinkenlights
This commit is contained in:
1
birbnetes_iot_platform_raspberry/__init__.py
Normal file
1
birbnetes_iot_platform_raspberry/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .led_stuff import BirbnetesIoTPlatformStatusDriver
|
95
birbnetes_iot_platform_raspberry/led_stuff.py
Normal file
95
birbnetes_iot_platform_raspberry/led_stuff.py
Normal file
@ -0,0 +1,95 @@
|
||||
from threading import Thread
|
||||
from queue import Queue, Empty
|
||||
import time
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
|
||||
|
||||
class BlinkenLight:
|
||||
|
||||
def __init__(self, pin):
|
||||
self._pin = pin
|
||||
self._pattern_queue = Queue()
|
||||
self._active_pattern = None
|
||||
self._pointer = 0
|
||||
|
||||
GPIO.setup(pin, GPIO.OUT, initial=GPIO.LOW)
|
||||
|
||||
def clk(self):
|
||||
|
||||
if not self._active_pattern: # No pattern loaded
|
||||
|
||||
if self._pointer == -5: # Skip 5 ticks before new pattern
|
||||
|
||||
try:
|
||||
pattern = self._pattern_queue.get(block=False)
|
||||
self._active_pattern = pattern
|
||||
self._pointer = 0
|
||||
except Empty:
|
||||
return
|
||||
|
||||
else:
|
||||
self._pointer -= 1
|
||||
|
||||
# Meanwhile a pattern loaded
|
||||
if self._active_pattern: # There is a pattern loaded
|
||||
|
||||
if self._pointer == len(self._active_pattern):
|
||||
self._active_pattern = None
|
||||
self._pointer = 0
|
||||
GPIO.output(self._pin, False)
|
||||
else:
|
||||
GPIO.output(self._pin, bool(self._active_pattern[self._pointer]))
|
||||
|
||||
self._pointer += 1
|
||||
|
||||
def enqueue_pattern(self, pattern: list):
|
||||
self._pattern_queue.put(pattern)
|
||||
|
||||
def cleanup(self):
|
||||
GPIO.cleanup(self._pin)
|
||||
|
||||
|
||||
class BlinkenLights(Thread):
|
||||
LED_TARGETS = {
|
||||
'red': 23,
|
||||
'green': 24
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._led_controllers = {}
|
||||
for name, pin in self.LED_TARGETS.items():
|
||||
self._led_controllers[name] = BlinkenLight(pin)
|
||||
self._active = True
|
||||
|
||||
def run(self):
|
||||
|
||||
while self._active:
|
||||
time.sleep(200)
|
||||
for name, led in self._led_controllers:
|
||||
led.clk()
|
||||
|
||||
for name, led in self._led_controllers:
|
||||
led.cleanup()
|
||||
|
||||
def enqueue_pattern(self, target: str, pattern: list):
|
||||
self._led_controllers[target].enqueue_pattern(pattern)
|
||||
|
||||
def stop(self):
|
||||
self._active = False
|
||||
|
||||
|
||||
class BirbnetesIoTPlatformStatusDriver:
|
||||
|
||||
blinken_lights = None
|
||||
|
||||
@classmethod
|
||||
def init(cls):
|
||||
cls.blinken_lights = BlinkenLights()
|
||||
cls.blinken_lights.start()
|
||||
|
||||
@classmethod
|
||||
def enqueue_pattern(cls, target: str, pattern: list):
|
||||
cls.blinken_lights.enqueue_pattern(target, pattern)
|
1
birbnetes_iot_platform_raspberry/playback_stuff.py
Normal file
1
birbnetes_iot_platform_raspberry/playback_stuff.py
Normal file
@ -0,0 +1 @@
|
||||
import alsaaudio
|
0
birbnetes_iot_platform_raspberry/record_stuff.py
Normal file
0
birbnetes_iot_platform_raspberry/record_stuff.py
Normal file
Reference in New Issue
Block a user