2020-07-18 12:34:52 +02:00
|
|
|
#!/usr/bin/env python3
|
2020-07-18 14:39:29 +02:00
|
|
|
import json
|
2020-07-30 15:33:28 +02:00
|
|
|
import logging
|
|
|
|
|
2020-07-18 12:34:52 +02:00
|
|
|
import sentry_sdk
|
2020-07-18 14:39:29 +02:00
|
|
|
import pika
|
|
|
|
import requests
|
2020-07-30 15:33:28 +02:00
|
|
|
from sentry_sdk.integrations.logging import LoggingIntegration
|
|
|
|
|
2020-07-18 14:39:29 +02:00
|
|
|
import config
|
2020-07-30 15:33:28 +02:00
|
|
|
import uuid
|
2020-07-18 14:39:29 +02:00
|
|
|
from mqtt_helper import MQTT
|
2020-07-18 12:34:52 +02:00
|
|
|
|
|
|
|
"""
|
|
|
|
Main entry point
|
|
|
|
"""
|
|
|
|
|
|
|
|
__author__ = "@tormakris"
|
|
|
|
__copyright__ = "Copyright 2020, Birbnetes Team"
|
|
|
|
__module_name__ = "app"
|
|
|
|
__version__text__ = "1"
|
|
|
|
|
2020-07-18 14:39:29 +02:00
|
|
|
if config.SENTRY_DSN:
|
2020-07-30 15:33:28 +02:00
|
|
|
sentry_logging = LoggingIntegration(
|
|
|
|
level=logging.DEBUG, # Capture info and above as breadcrumbs
|
|
|
|
event_level=logging.ERROR # Send errors as events
|
|
|
|
)
|
2020-07-18 12:34:52 +02:00
|
|
|
sentry_sdk.init(
|
2020-07-18 14:39:29 +02:00
|
|
|
dsn=config.SENTRY_DSN,
|
2020-07-18 12:34:52 +02:00
|
|
|
send_default_pii=True,
|
2020-07-30 15:33:28 +02:00
|
|
|
integrations=[sentry_logging],
|
2020-07-18 14:39:29 +02:00
|
|
|
release=config.RELEASE_ID,
|
|
|
|
environment=config.RELEASEMODE
|
2020-07-18 12:34:52 +02:00
|
|
|
)
|
|
|
|
|
2020-07-18 14:39:29 +02:00
|
|
|
|
|
|
|
def setup_rabbit() -> None:
|
|
|
|
credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD)
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME,
|
|
|
|
credentials=credentials,
|
|
|
|
heartbeat=0,
|
|
|
|
socket_timeout=5))
|
|
|
|
channel = connection.channel()
|
2020-07-30 15:33:28 +02:00
|
|
|
channel.exchange_declare(exchange=config.RABBIT_EXCHANGE,
|
|
|
|
exchange_type='fanout',
|
|
|
|
durable=True,
|
|
|
|
auto_delete=False)
|
2020-07-30 15:39:02 +02:00
|
|
|
queue = channel.queue_declare(durable=True, auto_delete=True, queue=uuid.uuid4().urn.split(':')[2],
|
2020-07-30 15:33:28 +02:00
|
|
|
exclusive=True).method.queue
|
|
|
|
channel.queue_bind(exchange=config.RABBIT_EXCHANGE, queue=queue)
|
|
|
|
channel.basic_consume(queue=queue, on_message_callback=on_message, auto_ack=True)
|
2020-07-18 14:39:29 +02:00
|
|
|
|
|
|
|
|
|
|
|
def on_message(channel, method_frame, header_frame, body):
|
|
|
|
msg_json = json.loads(body)
|
2020-07-30 15:36:09 +02:00
|
|
|
if 'probability' not in msg_json:
|
|
|
|
logging.error("Malformed message from broker")
|
2020-07-18 14:39:29 +02:00
|
|
|
if msg_json['probability'] > 0.5:
|
|
|
|
r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}")
|
|
|
|
r.raise_for_status()
|
2020-07-30 15:36:09 +02:00
|
|
|
if 'device_id' not in r.json():
|
|
|
|
logging.error("Input-service response invalid")
|
2020-07-26 19:38:10 +02:00
|
|
|
mqtt.publish(subtopic=r.json()['device_id'],
|
|
|
|
message=json.dumps({"command": "doAlert"}))
|
2020-07-18 14:39:29 +02:00
|
|
|
|
|
|
|
|
2020-07-18 12:34:52 +02:00
|
|
|
if __name__ == "__main__":
|
2020-07-18 14:39:29 +02:00
|
|
|
mqtt = MQTT()
|
2020-07-18 15:18:19 +02:00
|
|
|
mqtt.topic = config.MQTT_TOPIC
|
2020-07-18 14:39:29 +02:00
|
|
|
mqtt.connect()
|
|
|
|
setup_rabbit()
|