#!/usr/bin/env python3 import logging import paho.mqtt.client as mqtt import config """ MQTT class """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "mqtt" __version__text__ = "1" class MQTT: """ MQTT class used to make sending mqtt messages nice and simple """ def __init__(self, host=config.MQTT_HOSTNAME, port=config.MQTT_PORT, client_id=config.CLIENT_ID, qos=2, retain=False, username=config.MQTT_USERNAME, password=config.MQTT_PASSWORD): """ Init variables :param host: :param port: :param client_id: :param qos: :param retain: """ self.client = None self.host = host self.port = port self.client_id = client_id self._topic = None self.qos = qos self.retain = retain self.username = username self.password = password def get_topic(self) -> str: """ Set topic :return: """ return self._topic def set_topic(self, value: str) -> None: """ Get current topic :param value: :return: """ self._topic = value topic = property(get_topic, set_topic) def connect(self) -> None: """ Setup client and connect to broker :return: """ logging.info("Connecting to MQTT") self.client = mqtt.Client(client_id=self.client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp") self.client.username_pw_set(username=self.username,password=self.password) self.port = int(self.port) self.client.connect(host=self.host, port=self.port, keepalive=60) def publish(self, message: str, subtopic: str = "") -> None: """ Publish a message :param message: :param subtopic: :return: """ logging.debug(f"MQTT Topic: {self.topic}/{subtopic} Message: {message} QOS: {self.qos} Retain: {self.retain}") if subtopic: self.client.publish(f"{self.topic}/{subtopic}", message, qos=self.qos, retain=self.retain) else: self.client.publish(self.topic, message, qos=self.qos, retain=self.retain)