iot-logic/src/preprocessor/soundpreprocessor.py

163 lines
5.7 KiB
Python

#!/usr/bin/env python3
from utils import config
from .abcpreprocessor import AbcPreProcessor
import logging
if not config.DISABLE_AI:
import tempfile
import requests
from urllib.parse import urljoin
import os
from pyAudioAnalysis.audioTrainTest import load_model, load_model_knn, classifier_wrapper
from pyAudioAnalysis import audioBasicIO
from pyAudioAnalysis import MidTermFeatures
import numpy
from birbnetes_iot_platform_raspberry import BirbnetesIoTPlatformStatusDriver
"""
Abstract base class for Sender
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "soundpreprocessor"
__version__text__ = "1"
class SoundPreProcessorLegit(AbcPreProcessor):
"""
SoundPreProcessor class, responsible for detecting birb chirps in sound sample.
"""
def __init__(self):
logging.info("Downloading current model...")
temp_model_handle, self._temp_model_name = tempfile.mkstemp()
self._temp_means_name = self._temp_model_name + "MEANS"
logging.debug("Fetching model info...")
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1])
if config.SVM_MODEL_ID:
model_id_to_get = config.SVM_MODEL_ID
else:
model_id_to_get = '$default'
model_root_url = urljoin(config.API_URL, f"/model/svm/{model_id_to_get}")
r = requests.get(model_root_url)
r.raise_for_status()
self._model_details = r.json()
logging.debug("Downloading model...")
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1])
r = requests.get(urljoin(model_root_url, self._model_details['files']['model']))
r.raise_for_status()
with open(temp_model_handle, 'wb') as f: # bruhtastic
f.write(r.content)
logging.debug("Downloading MEANS...")
BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1])
r = requests.get(urljoin(model_root_url, self._model_details['files']['means']))
r.raise_for_status()
with open(self._temp_means_name, 'wb') as f:
f.write(r.content)
logging.info("Loading current model...")
if self._model_details['type'] == 'knn':
self._classifier, self._mean, self._std, self._classes, \
self._mid_window, self._mid_step, self._short_window, \
self._short_step, self._compute_beat = load_model_knn(self._temp_model_name)
else:
self._classifier, self._mean, self._std, self._classes, \
self._mid_window, self._mid_step, self._short_window, \
self._short_step, self._compute_beat = load_model(self._temp_model_name)
target_class_name = self._model_details['target_class_name']
logging.info("The loaded model contains the following classes: " + ", ".join(self._classes))
if target_class_name not in self._classes:
raise ValueError(
f"The specified target class {target_class_name} is not in the possible classes (Wrong model info?)"
)
self._target_id = self._classes.index(target_class_name)
def preprocesssignal(self, file_path: str) -> bool:
"""
Classify a sound sample.
:param file_path: Access path of the sound sample up for processing.
:return:
"""
logging.debug("Running extraction...")
sampling_rate, signal = audioBasicIO.read_audio_file(file_path)
signal = audioBasicIO.stereo_to_mono(signal)
if sampling_rate == 0:
raise AssertionError("Could not read the file properly: Sampling rate zero")
if signal.shape[0] / float(sampling_rate) <= self._mid_window:
raise AssertionError("Could not read the file properly: Signal shape is not good")
# feature extraction:
mid_features, s, _ = \
MidTermFeatures.mid_feature_extraction(signal, sampling_rate,
self._mid_window * sampling_rate,
self._mid_step * sampling_rate,
round(sampling_rate * self._short_window),
round(sampling_rate * self._short_step))
# long term averaging of mid-term statistics
mid_features = mid_features.mean(axis=1)
if self._compute_beat:
beat, beat_conf = MidTermFeatures.beat_extraction(s, self._short_step)
mid_features = numpy.append(mid_features, beat)
mid_features = numpy.append(mid_features, beat_conf)
logging.debug("Running classification...")
feature_vector = (mid_features - self._mean) / self._std
class_id, probability = classifier_wrapper(
self._classifier, self._model_details['type'].lower(), feature_vector
)
class_id = int(class_id) # faszom
logging.debug(
f"Sample {file_path} identified as {self._classes[class_id]} with the probablility of {probability[class_id]}"
)
return bool((class_id == self._target_id) and (probability[class_id] > 0.5))
def __del__(self):
try:
os.remove(self._temp_model_name)
except FileNotFoundError:
pass
try:
os.remove(self._temp_means_name)
except FileNotFoundError:
pass
class SoundPreProcessorDummy(AbcPreProcessor):
def __init__(self):
print("AI is disabled! Initializing dummy sound pre-processor...")
def preprocesssignal(self, file_path) -> bool:
return True
if config.DISABLE_AI:
SoundPreProcessor = SoundPreProcessorDummy
else:
SoundPreProcessor = SoundPreProcessorLegit