From 5b15442373767a741b5ee85f52b091ca7ee5fc9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Sat, 18 Jul 2020 14:39:29 +0200 Subject: [PATCH] complete logic --- k8s/configmap.yaml | 5 +++- requirements.txt | 2 +- src/app.py | 45 +++++++++++++++++++++++------ src/config.py | 4 +++ src/mqtt_helper.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 116 insertions(+), 10 deletions(-) create mode 100644 src/mqtt_helper.py diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml index b55881d..a2efd81 100644 --- a/k8s/configmap.yaml +++ b/k8s/configmap.yaml @@ -8,12 +8,15 @@ metadata: data: SENTRY_DSN: "https://80d27db8c74f4556a19a1bf0180b373f@sentry.kmlabz.com/23" RELEASE_ID: birb-k8s + GUARD_CLIENT_ID: guard-b50d97f6-29f9-4de5-a96e-f9b69ca69f7f GUARD_SERVICE_RELEASEMODE: release GUARD_RABBITMQ_HOSTNAME: birb-rabbitmq GUARD_RABBITMQ_EXCHANGE: "sample" GUARD_RABBITMQ_USERNAME: user GUARD_RABBITMQ_PASSWORD: 1wZVQnP5vy GUARD_MQTT_HOSTNAME: guard-postgres + GUARD_MQTT_PORT: 1883 GUARD_MQTT_USERNAME: guard-service GUARD_MQTT_PASSWORD: guard-service-supersecret - GUARD_MQTT_TOPIC: guard-service \ No newline at end of file + GUARD_MQTT_TOPIC: guard-service + INPUT_SVC_HOSTNAME: input-service \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index adc26f8..f600499 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ sentry_sdk pika -marshmallow +requests paho-mqtt \ No newline at end of file diff --git a/src/app.py b/src/app.py index 5e98948..d845be6 100644 --- a/src/app.py +++ b/src/app.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 -import logging +import json import sentry_sdk - -from config import * +import pika +import requests +import config +from mqtt_helper import MQTT """ Main entry point @@ -13,13 +15,40 @@ __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" -if SENTRY_DSN: +if config.SENTRY_DSN: sentry_sdk.init( - dsn=SENTRY_DSN, + dsn=config.SENTRY_DSN, send_default_pii=True, - release=RELEASE_ID, - environment=RELEASEMODE + release=config.RELEASE_ID, + environment=config.RELEASEMODE ) + +def setup_rabbit() -> None: + credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD) + connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME, + credentials=credentials, + heartbeat=0, + socket_timeout=5)) + channel = connection.channel() + exchange = channel.exchange_declare(exchange=config.RABBIT_EXCHANGE, + exchange_type='fanout', + durable=True, + auto_delete=False) + queue = channel.queue_declare(durable=True, auto_delete=False) + queue.bind(exchange) + queue.basic_consume(on_message, no_ack=True) + + +def on_message(channel, method_frame, header_frame, body): + msg_json = json.loads(body) + if msg_json['probability'] > 0.5: + r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}") + r.raise_for_status() + mqtt.publish(json.dumps({"deviceID": r.json()['device_id'], "sensorID": "", "command": "doAlert"})) + + if __name__ == "__main__": - pass + mqtt = MQTT() + mqtt.connect() + setup_rabbit() diff --git a/src/config.py b/src/config.py index 19b0723..29c34f9 100644 --- a/src/config.py +++ b/src/config.py @@ -12,6 +12,7 @@ __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "config" __version__text__ = "1" +CLIENT_ID = os.environ.get("GUARD_CLIENT_ID", "guard-service") SENTRY_DSN = os.environ.get("SENTRY_DSN") RELEASE_ID = os.environ.get("RELEASE_ID", "test") @@ -23,6 +24,9 @@ RABBIT_PASSWORD = os.getenv("GUARD_RABBITMQ_PASSWORD", "guard-service") RABBIT_EXCHANGE = os.getenv("GUARD_RABBITMQ_EXCHANGE", "guard-service") MQTT_HOSTNAME = os.getenv("GUARD_MQTT_HOSTNAME", "localhost") +MQTT_PORT = os.getenv("GUARD_MQTT_PORT", "1883") MQTT_USERNAME = os.getenv("GUARD_MQTT_USERNAME", "guard-service") MQTT_PASSWORD = os.getenv("GUARD_MQTT_PASSWORD", "guard-service") MQTT_EXCHANGE = os.getenv("GUARD_MQTT_EXCHANGE", "guard-service") + +INPUT_HOSTNAME = os.getenv("INPUT_SVC_HOSTNAME", "input-service") diff --git a/src/mqtt_helper.py b/src/mqtt_helper.py new file mode 100644 index 0000000..19b1363 --- /dev/null +++ b/src/mqtt_helper.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import config + +""" +MQTT class +""" + +__author__ = "@tormakris" +__copyright__ = "Copyright 2020, Birbnetes Team" +__module_name__ = "mqtt" +__version__text__ = "1" + + +class MQTT: + """ + MQTT class used to make sending mqtt messages nice and simple + """ + + def __init__(self, host=config.MQTT_HOSTNAME, port=config.MQTT_PORT, client_id=config.CLIENT_ID, qos=2, + retain=True): + """ + Init variables + :param host: + :param port: + :param client_id: + :param qos: + :param retain: + """ + self.client = None + self.host = host + self.port = port + self.client_id = client_id + self.topic = None + self.qos = qos + self.retain = retain + + def get_topic(self) -> str: + """ + Set topic + :return: + """ + return self.topic + + def set_topic(self, topic: str) -> None: + """ + Get current topic + :param topic: + :return: + """ + self.topic = topic + + topic = property(get_topic, set_topic) + + def connect(self) -> None: + """ + Setup client and connect to broker + :return: + """ + self.client = mqtt.Client(client_id=self.client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311, + transport="tcp") + self.client.connect(host=self.host, port=self.port, keepalive=60) + + def publish(self, message: str) -> None: + """ + Publish a message + :param message: + :return: + """ + self.client.publish(self.topic, message, qos=self.qos, retain=self.retain)