guard-service/src/mqtt_helper.py

82 lines
2.3 KiB
Python

#!/usr/bin/env python3
import logging
import paho.mqtt.client as mqtt
import config
"""
MQTT class
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "mqtt"
__version__text__ = "1"
class MQTT:
"""
MQTT class used to make sending mqtt messages nice and simple
"""
def __init__(self, host=config.MQTT_HOSTNAME, port=config.MQTT_PORT, client_id=config.CLIENT_ID, qos=2,
retain=False, username=config.MQTT_USERNAME, password=config.MQTT_PASSWORD):
"""
Init variables
:param host:
:param port:
:param client_id:
:param qos:
:param retain:
"""
self.client = None
self.host = host
self.port = port
self.client_id = client_id
self._topic = None
self.qos = qos
self.retain = retain
self.username = username
self.password = password
def get_topic(self) -> str:
"""
Set topic
:return:
"""
return self._topic
def set_topic(self, value: str) -> None:
"""
Get current topic
:param value:
:return:
"""
self._topic = value
topic = property(get_topic, set_topic)
def connect(self) -> None:
"""
Setup client and connect to broker
:return:
"""
logging.info("Connecting to MQTT")
self.client = mqtt.Client(client_id=self.client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311,
transport="tcp")
self.client.username_pw_set(username=self.username,password=self.password)
self.port = int(self.port)
self.client.connect(host=self.host, port=self.port, keepalive=60)
def publish(self, message: str, subtopic: str = "") -> None:
"""
Publish a message
:param message:
:param subtopic:
:return:
"""
logging.debug(f"MQTT Topic: {self.topic}/{subtopic} Message: {message} QOS: {self.qos} Retain: {self.retain}")
if subtopic:
self.client.publish(f"{self.topic}/{subtopic}", message, qos=self.qos, retain=self.retain)
else:
self.client.publish(self.topic, message, qos=self.qos, retain=self.retain)