Files
iot-logic/src/signal_processor/soundsignalprocessor.py

65 lines
1.8 KiB
Python

#!/usr/bin/env python3
import logging
import requests
from sensor import SoundSensor
from sender import SoundSender
from preprocessor import SoundPreProcessor
from .abcsignalprocessor import AbcSignalProcessor
import os
from utils import BirbnetesIoTPlatformStatusDriver
"""
Abstract base class for signalprocessor
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "soundsignalprocessor"
__version__text__ = "1"
class SoundSignalProcessor(AbcSignalProcessor):
"""
SoundSignalProcessor class, responsible for handling the sound signal processor pipeline.
"""
def __init__(self):
"""
Create dependency objects.
"""
self.soundsensor = SoundSensor()
self.soundpreprocessor = SoundPreProcessor()
self.soundsender = SoundSender()
def processcurrentsignal(self):
"""
Process a sound sample.
:return:
"""
soundsample_name = self.soundsensor.getvalue()
if not soundsample_name: # No new sample... nothing to do
return
try:
sample_decision = self.soundpreprocessor.preprocesssignal(soundsample_name)
except Exception as e:
logging.exception(e)
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 1, 1])
os.unlink(soundsample_name)
return
if sample_decision:
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1, 0, 1])
try:
self.soundsender.sendvalue(soundsample_name, sample_decision)
except (ConnectionError, requests.HTTPError) as e:
logging.exception(e)
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1, 0, 1, 0, 1])
os.unlink(soundsample_name)
return
os.unlink(soundsample_name)