#!/usr/bin/env python3 import logging import sentry_sdk from sentry_sdk.integrations.logging import LoggingIntegration from utils import config, LoopingTimer from signal_processor import SoundSignalProcessor from birbnetes_iot_platform_raspberry import BirbnetesIoTPlatformStatusDriver from actuator import Loudspeaker import paho.mqtt.client import json import sys """ Main Entrypoint """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" sentry_logging = LoggingIntegration( level=logging.INFO, event_level=logging.ERROR ) if config.SENTRY_DSN: sentry_sdk.init( dsn=config.SENTRY_DSN, integrations=[sentry_logging], traces_sample_rate=1.0, send_default_pii=True, release=config.RELEASE_ID, environment=config.RELEASEMODE, _experiments={"auto_enabling_integrations": True} ) def timer_tick(*args) -> None: """ Tick of a timer :param listofabcsignaprocessors: :return: """ for abcsignaprocessor in args: abcsignaprocessor.processcurrentsignal() def mqtt_on_connect(client, userdata, flags, rc): BirbnetesIoTPlatformStatusDriver.enqueue_pattern('green', [1]) client.subscribe(f"command/{config.DEVICE_ID}") logging.info("MQTT Connected!") def mqtt_on_disconnect(client, userdata, rc): if rc != 0: logging.error(f"Unexpected MQTT Disconnect. RC: {rc}") BirbnetesIoTPlatformStatusDriver.enqueue_pattern('red', [1]) else: logging.debug("MQTT Disconnected") def mqtt_on_command(client, userdata, message): try: msg = json.loads(message.payload.decode()) except (UnicodeError, json.JSONDecodeError) as e: logging.error(f"MQTT Invalid message recieved: {e}") return if msg.get("command") == 'doAlert': userdata.act() def main() -> None: """ Main function :return: """ logging.basicConfig(stream=sys.stdout, format="%(asctime)s - %(name)s [%(levelname)s]: %(message)s", level=logging.DEBUG if '--debug' in sys.argv else logging.INFO) BirbnetesIoTPlatformStatusDriver.init() listofabcsignaprocessors = [SoundSignalProcessor()] loopingtimer = LoopingTimer(function=timer_tick, tick_args=listofabcsignaprocessors, interval=config.TICK_INTERVAL) loopingtimer.start() client = paho.mqtt.client.Client(userdata=Loudspeaker(config.ENEMY_SOUNDS), client_id=config.DEVICE_ID) client.on_connect = mqtt_on_connect client.on_disconnect = mqtt_on_disconnect client.on_message = mqtt_on_command if config.MQTT_USERNAME: client.username_pw_set(config.MQTT_USERNAME, config.MQTT_PASSWORD) client.connect(config.MQTT_HOSTNAME, config.MQTT_PORT, 60) try: client.loop_forever() # This blocks except KeyboardInterrupt: logging.info("SIGINT recieved! Stopping...") pass client.disconnect() loopingtimer.stop() BirbnetesIoTPlatformStatusDriver.cleanup() if __name__ == "__main__": main()