From 040c9f23990996dbf94097db4509d73e85313994 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Sun, 26 Jul 2020 17:45:19 +0200 Subject: [PATCH] modify mqtt api --- src/app.py | 18 +++++++++++---- src/mqtt_methods.py | 21 +++++++++++++---- src/resources.py | 56 ++++++++++++++++++++++++++++++++------------- 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/src/app.py b/src/app.py index a41b63c..df0300a 100644 --- a/src/app.py +++ b/src/app.py @@ -5,6 +5,7 @@ from flask_restful import Api import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration +from resources import * from config import * from db import db from marshm import ma @@ -28,14 +29,14 @@ if SENTRY_DSN: environment=RELEASEMODE ) - app = Flask(__name__) app.config['MQTT_BROKER_URL'] = MQTT_HOSTNAME app.config['MQTT_BROKER_PORT'] = MQTT_PORT app.config['MQTT_USERNAME'] = MQTT_USERNAME app.config['MQTT_PASSWORD'] = MQTT_PASSWORD app.config['MQTT_REFRESH_TIME'] = 1.0 # refresh time in seconds -app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}" +app.config[ + 'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}" api = Api(app) db.init_app(app) @@ -64,11 +65,18 @@ def handle_logging(client, userdata, level, buf): @mqtt.on_connect() def handle_connect(client, userdata, flags, rc): - mqtt.subscribe(MQTT_STATUS_TOPIC) + mqtt.subscribe(f"{MQTT_STATUS_TOPIC}/#") -# api.add_resource(SampleResource, "/sample") -# api.add_resource(SampleParameterResource, '/sample/') +api.add_resource(AllDevicesResource, "/devices") +api.add_resource(AllDevicesOfflineResource, "/devices/offline") +api.add_resource(AllDevicesOnlineResource, "/devices/online") +api.add_resource(DeviceResource, "/devices/{deviceid}") +api.add_resource(DeviceOfflineResrouce, "/devices/{deviceid}/offline") +api.add_resource(DeviceOnlineResrouce, "/devices/{deviceid}/online") +api.add_resource(SensorResource, "/devices/{deviceid}/{sensorid}") +api.add_resource(SensorOfflineResource, "/devices/{deviceid}/{sensorid}/offline") +api.add_resource(SensorOnlineResource, "/devices/{deviceid}/{sensorid}/online") if __name__ == "__main__": app.run( diff --git a/src/mqtt_methods.py b/src/mqtt_methods.py index 3e152a9..35a40af 100644 --- a/src/mqtt_methods.py +++ b/src/mqtt_methods.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 import logging from mqtt_flask_instance import mqtt +import config from db import db -from schemas import DeviceSchema +from schemas import DeviceSchema, SensorSchema """ MQTT Stuff @@ -14,7 +15,7 @@ __module_name__ = "mqtt_methods" __version__text__ = "1" deviceschema = DeviceSchema(many=False) - +sensorschema = SensorSchema(many=False) LOGGER = logging.getLogger(__name__) @@ -26,9 +27,19 @@ def handle_status_message(client, userdata, message): payload=message.payload.decode() ) try: - status_message = deviceschema.load(data['payload'], session=db.session, transient=True).data - LOGGER.info(f"Recieved status message from {data['payload']['id']}, persisting to db.") - db.session.merge(status_message) + ids = data['topic'].replace(f"{config.MQTT_STATUS_TOPIC}/", "").split("/") + if len(ids) == 1: + data['payload']['deviceID'] = ids[0] + status_message = deviceschema.load(data['payload'], session=db.session, transient=True).data + LOGGER.info(f"Recieved status message from {data['payload']['deviceID']}, persisting to db.") + db.session.merge(status_message) + else: + if len(ids) == 2: + data['payload']['deviceID'] = ids[0] + data['payload']['sensorID'] = ids[1] + status_message = sensorschema.load(data['payload'], session=db.session, transient=True).data + LOGGER.info(f"Recieved status message from sensor {data['payload']['sensorID']}, persisting to db.") + db.session.merge(status_message) except Exception as e: db.session.rollback() LOGGER.exception(e) diff --git a/src/resources.py b/src/resources.py index 0920586..909d955 100644 --- a/src/resources.py +++ b/src/resources.py @@ -21,7 +21,7 @@ LOGGER = logging.getLogger(__name__) class AllDevicesResource(Resource): """ - Query and control all known devices + Query all known devices """ alldeviceschema = schemas.DeviceSchema(many=True) @@ -33,25 +33,29 @@ class AllDevicesResource(Resource): alldevices = models.Device.query.all() return self.alldeviceschema.dump(list(alldevices)), 200 - def delete(self): + +class AllDevicesOfflineResource(Resource): + """ + Shut down all devices + """ + def post(self): """ Shut down every device :return: """ - query_ids = db.session.query(models.Device.id).filter(models.Device.status != models.DeviceStatusEnum.offline) - ids = [value for value, in query_ids] - for d_id in ids: - mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": d_id, "command": "offline"}) + mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/+", {"command": "offline"}) + +class AllDevicesOnlineResource(Resource): + """ + Bring every device online + """ def post(self): """ Bring every device online :return: """ - query_ids = db.session.query(models.Device.id).filter(models.Device.status != models.DeviceStatusEnum.online) - ids = [value for value, in query_ids] - for d_id in ids: - mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": d_id, "command": "online"}) + mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/+", {"command": "online"}) class DeviceResource(Resource): @@ -69,15 +73,25 @@ class DeviceResource(Resource): device = models.Device.query.filter_by(id=deviceid).first_or_404() return self.deviceschema.dump(device), 200 - def delete(self, deviceid: str): + +class DeviceOfflineResrouce(Resource): + """ + Bring a device offline + """ + def post(self, deviceid: str): """ Shut down a device :param deviceid: UUID of device :return: """ device = db.session.query(models.Device.id).filter(str(models.Device.id) == deviceid).first_or_404()[0] - mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": device, "command": "offline"}) + mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{device}", {"command": "offline"}) + +class DeviceOnlineResrouce(Resource): + """ + Bring a device online + """ def post(self, deviceid: str): """ Bring a device online @@ -85,7 +99,7 @@ class DeviceResource(Resource): :return: """ device = db.session.query(models.Device.id).filter(str(models.Device.id) == deviceid).first_or_404()[0] - mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": device, "command": "offline"}) + mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{device}", {"command": "online"}) class SensorResource(Resource): @@ -104,7 +118,12 @@ class SensorResource(Resource): sensor = models.Sensor.query.filter_by(device_id=deviceid, sensorid=sensorid).first_or_404() return self.sensorschema.dump(sensor) - def delete(self, deviceid: str, sensorid: str): + +class SensorOfflineResource(Resource): + """ + Bring a sensor offline + """ + def post(self, deviceid: str, sensorid: str): """ Shut down a sensor of a device :param deviceid: UUID of device @@ -113,8 +132,13 @@ class SensorResource(Resource): """ sensor = db.session.query(models.Sensor.device_id, models.Sensor.id).filter( str(models.Sensor.device_id) == deviceid and str(models.Sensor.id) == sensorid).first_or_404() - mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": sensor[0], "sensorID": sensor[1], "command": "offline"}) + mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{sensor[0]}/{sensor[1]}", {"command": "offline"}) + +class SensorOnlineResource(Resource): + """ + Bring a sensor online + """ def post(self, deviceid: str, sensorid: str): """ Bring a sensor online @@ -124,4 +148,4 @@ class SensorResource(Resource): """ sensor = db.session.query(models.Sensor.device_id, models.Sensor.id).filter( str(models.Sensor.device_id) == deviceid and str(models.Sensor.id) == sensorid).first_or_404() - mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": sensor[0], "sensorID": sensor[1], "command": "online"}) + mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{sensor[0]}/{sensor[1]}", {"command": "online"})