This commit is contained in:
		
							
								
								
									
										45
									
								
								src/app.py
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								src/app.py
									
									
									
									
									
								
							@@ -1,8 +1,10 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
import logging
 | 
			
		||||
import json
 | 
			
		||||
import sentry_sdk
 | 
			
		||||
 | 
			
		||||
from config import *
 | 
			
		||||
import pika
 | 
			
		||||
import requests
 | 
			
		||||
import config
 | 
			
		||||
from mqtt_helper import MQTT
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
Main entry point
 | 
			
		||||
@@ -13,13 +15,40 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
 | 
			
		||||
__module_name__ = "app"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
if SENTRY_DSN:
 | 
			
		||||
if config.SENTRY_DSN:
 | 
			
		||||
    sentry_sdk.init(
 | 
			
		||||
        dsn=SENTRY_DSN,
 | 
			
		||||
        dsn=config.SENTRY_DSN,
 | 
			
		||||
        send_default_pii=True,
 | 
			
		||||
        release=RELEASE_ID,
 | 
			
		||||
        environment=RELEASEMODE
 | 
			
		||||
        release=config.RELEASE_ID,
 | 
			
		||||
        environment=config.RELEASEMODE
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def setup_rabbit() -> None:
 | 
			
		||||
    credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD)
 | 
			
		||||
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME,
 | 
			
		||||
                                                                   credentials=credentials,
 | 
			
		||||
                                                                   heartbeat=0,
 | 
			
		||||
                                                                   socket_timeout=5))
 | 
			
		||||
    channel = connection.channel()
 | 
			
		||||
    exchange = channel.exchange_declare(exchange=config.RABBIT_EXCHANGE,
 | 
			
		||||
                                        exchange_type='fanout',
 | 
			
		||||
                                        durable=True,
 | 
			
		||||
                                        auto_delete=False)
 | 
			
		||||
    queue = channel.queue_declare(durable=True, auto_delete=False)
 | 
			
		||||
    queue.bind(exchange)
 | 
			
		||||
    queue.basic_consume(on_message, no_ack=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def on_message(channel, method_frame, header_frame, body):
 | 
			
		||||
    msg_json = json.loads(body)
 | 
			
		||||
    if msg_json['probability'] > 0.5:
 | 
			
		||||
        r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}")
 | 
			
		||||
        r.raise_for_status()
 | 
			
		||||
        mqtt.publish(json.dumps({"deviceID": r.json()['device_id'], "sensorID": "", "command": "doAlert"}))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    pass
 | 
			
		||||
    mqtt = MQTT()
 | 
			
		||||
    mqtt.connect()
 | 
			
		||||
    setup_rabbit()
 | 
			
		||||
 
 | 
			
		||||
@@ -12,6 +12,7 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
 | 
			
		||||
__module_name__ = "config"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
CLIENT_ID = os.environ.get("GUARD_CLIENT_ID", "guard-service")
 | 
			
		||||
 | 
			
		||||
SENTRY_DSN = os.environ.get("SENTRY_DSN")
 | 
			
		||||
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
 | 
			
		||||
@@ -23,6 +24,9 @@ RABBIT_PASSWORD = os.getenv("GUARD_RABBITMQ_PASSWORD", "guard-service")
 | 
			
		||||
RABBIT_EXCHANGE = os.getenv("GUARD_RABBITMQ_EXCHANGE", "guard-service")
 | 
			
		||||
 | 
			
		||||
MQTT_HOSTNAME = os.getenv("GUARD_MQTT_HOSTNAME", "localhost")
 | 
			
		||||
MQTT_PORT = os.getenv("GUARD_MQTT_PORT", "1883")
 | 
			
		||||
MQTT_USERNAME = os.getenv("GUARD_MQTT_USERNAME", "guard-service")
 | 
			
		||||
MQTT_PASSWORD = os.getenv("GUARD_MQTT_PASSWORD", "guard-service")
 | 
			
		||||
MQTT_EXCHANGE = os.getenv("GUARD_MQTT_EXCHANGE", "guard-service")
 | 
			
		||||
 | 
			
		||||
INPUT_HOSTNAME = os.getenv("INPUT_SVC_HOSTNAME", "input-service")
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										70
									
								
								src/mqtt_helper.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										70
									
								
								src/mqtt_helper.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,70 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
import paho.mqtt.client as mqtt
 | 
			
		||||
import config
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
MQTT class
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
__author__ = "@tormakris"
 | 
			
		||||
__copyright__ = "Copyright 2020, Birbnetes Team"
 | 
			
		||||
__module_name__ = "mqtt"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MQTT:
 | 
			
		||||
    """
 | 
			
		||||
    MQTT class used to make sending mqtt messages nice and simple
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, host=config.MQTT_HOSTNAME, port=config.MQTT_PORT, client_id=config.CLIENT_ID, qos=2,
 | 
			
		||||
                 retain=True):
 | 
			
		||||
        """
 | 
			
		||||
        Init variables
 | 
			
		||||
        :param host:
 | 
			
		||||
        :param port:
 | 
			
		||||
        :param client_id:
 | 
			
		||||
        :param qos:
 | 
			
		||||
        :param retain:
 | 
			
		||||
        """
 | 
			
		||||
        self.client = None
 | 
			
		||||
        self.host = host
 | 
			
		||||
        self.port = port
 | 
			
		||||
        self.client_id = client_id
 | 
			
		||||
        self.topic = None
 | 
			
		||||
        self.qos = qos
 | 
			
		||||
        self.retain = retain
 | 
			
		||||
 | 
			
		||||
    def get_topic(self) -> str:
 | 
			
		||||
        """
 | 
			
		||||
        Set topic
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        return self.topic
 | 
			
		||||
 | 
			
		||||
    def set_topic(self, topic: str) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Get current topic
 | 
			
		||||
        :param topic:
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        self.topic = topic
 | 
			
		||||
 | 
			
		||||
    topic = property(get_topic, set_topic)
 | 
			
		||||
 | 
			
		||||
    def connect(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Setup client and connect to broker
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        self.client = mqtt.Client(client_id=self.client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311,
 | 
			
		||||
                                  transport="tcp")
 | 
			
		||||
        self.client.connect(host=self.host, port=self.port, keepalive=60)
 | 
			
		||||
 | 
			
		||||
    def publish(self, message: str) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Publish a message
 | 
			
		||||
        :param message:
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        self.client.publish(self.topic, message, qos=self.qos, retain=self.retain)
 | 
			
		||||
		Reference in New Issue
	
	Block a user