#!/usr/bin/env python3 import logging import requests from sensor import SoundSensor from sender import SoundSender from preprocessor import SoundPreProcessor from .abcsignalprocessor import AbcSignalProcessor import os from birbnetes_iot_platform_raspberry import BirbnetesIoTPlatformStatusDriver """ Abstract base class for signalprocessor """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "soundsignalprocessor" __version__text__ = "1" class SoundSignalProcessor(AbcSignalProcessor): """ SoundSignalProcessor class, responsible for handling the sound signal processor pipeline. """ def __init__(self): """ Create dependency objects. """ self.soundsensor = SoundSensor() self.soundpreprocessor = SoundPreProcessor() self.soundsender = SoundSender() def processcurrentsignal(self): """ Process a sound sample. :return: """ soundsample_name = self.soundsensor.getvalue() try: sample_decision = self.soundpreprocessor.preprocesssignal(soundsample_name) except Exception as e: logging.exception(e) BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 1, 1]) os.unlink(soundsample_name) return if sample_decision: BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1, 0, 1]) try: self.soundsender.sendvalue(soundsample_name, sample_decision) except (ConnectionError, requests.HTTPError) as e: logging.exception(e) BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 0, 1, 0, 1]) os.unlink(soundsample_name) return os.unlink(soundsample_name)