iot-logic/src/signal_processor/soundsignalprocessor.py

57 lines
1.6 KiB
Python

#!/usr/bin/env python3
import logging
import requests
from sensor import SoundSensor
from sender import SoundSender
from preprocessor import SoundPreProcessor
from .abcsignalprocessor import AbcSignalProcessor
from birbnetes_iot_platform_raspberry import BirbnetesIoTPlatformStatusDriver
"""
Abstract base class for signalprocessor
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "soundsignalprocessor"
__version__text__ = "1"
class SoundSignalProcessor(AbcSignalProcessor):
"""
SoundSignalProcessor class, responsible for handling the sound signal processor pipeline.
"""
def __init__(self):
"""
Create dependency objects.
"""
self.soundsensor = SoundSensor()
self.soundpreprocessor = SoundPreProcessor()
self.soundsender = SoundSender()
def processcurrentsignal(self):
"""
Process a sound sample.
:return:
"""
soundsample_name = self.soundsensor.getvalue()
try:
sample_decision = self.soundpreprocessor.preprocesssignal(soundsample_name)
except Exception as e:
logging.exception(e)
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 1, 1])
return
if sample_decision:
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1, 0, 1])
try:
self.soundsender.sendvalue(soundsample_name, sample_decision)
except (ConnectionError, requests.HTTPError) as e:
logging.exception(e)
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 0, 1, 0, 1])
return