#!/usr/bin/env python3 import logging import requests from sensor import SoundSensor from sender import SoundSender from preprocessor import SoundPreProcessor from .abcsignalprocessor import AbcSignalProcessor import os from utils import BirbnetesIoTPlatformStatusDriver """ Abstract base class for signalprocessor """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "soundsignalprocessor" __version__text__ = "1" class SoundSignalProcessor(AbcSignalProcessor): """ SoundSignalProcessor class, responsible for handling the sound signal processor pipeline. """ def __init__(self): """ Create dependency objects. """ self.soundsensor = SoundSensor() self.soundpreprocessor = SoundPreProcessor() self.soundsender = SoundSender() def processcurrentsignal(self): """ Process a sound sample. :return: """ soundsample_name = self.soundsensor.getvalue() if not soundsample_name: # No new sample... nothing to do return try: sample_decision = self.soundpreprocessor.preprocesssignal(soundsample_name) except Exception as e: logging.exception(e) BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 1, 1]) os.unlink(soundsample_name) return if sample_decision: BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1, 0, 1]) try: self.soundsender.sendvalue(soundsample_name, sample_decision) except (ConnectionError, requests.HTTPError) as e: logging.exception(e) BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 0, 1, 0, 1]) os.unlink(soundsample_name) return os.unlink(soundsample_name)