modify mqtt api
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Torma Kristóf 2020-07-26 17:45:19 +02:00
parent 08f652d189
commit 040c9f2399
Signed by: tormakris
GPG Key ID: DC83C4F2C41B1047
3 changed files with 69 additions and 26 deletions

View File

@ -5,6 +5,7 @@ from flask_restful import Api
import sentry_sdk import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.flask import FlaskIntegration
from resources import *
from config import * from config import *
from db import db from db import db
from marshm import ma from marshm import ma
@ -28,14 +29,14 @@ if SENTRY_DSN:
environment=RELEASEMODE environment=RELEASEMODE
) )
app = Flask(__name__) app = Flask(__name__)
app.config['MQTT_BROKER_URL'] = MQTT_HOSTNAME app.config['MQTT_BROKER_URL'] = MQTT_HOSTNAME
app.config['MQTT_BROKER_PORT'] = MQTT_PORT app.config['MQTT_BROKER_PORT'] = MQTT_PORT
app.config['MQTT_USERNAME'] = MQTT_USERNAME app.config['MQTT_USERNAME'] = MQTT_USERNAME
app.config['MQTT_PASSWORD'] = MQTT_PASSWORD app.config['MQTT_PASSWORD'] = MQTT_PASSWORD
app.config['MQTT_REFRESH_TIME'] = 1.0 # refresh time in seconds app.config['MQTT_REFRESH_TIME'] = 1.0 # refresh time in seconds
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}" app.config[
'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
api = Api(app) api = Api(app)
db.init_app(app) db.init_app(app)
@ -64,11 +65,18 @@ def handle_logging(client, userdata, level, buf):
@mqtt.on_connect() @mqtt.on_connect()
def handle_connect(client, userdata, flags, rc): def handle_connect(client, userdata, flags, rc):
mqtt.subscribe(MQTT_STATUS_TOPIC) mqtt.subscribe(f"{MQTT_STATUS_TOPIC}/#")
# api.add_resource(SampleResource, "/sample") api.add_resource(AllDevicesResource, "/devices")
# api.add_resource(SampleParameterResource, '/sample/<tag>') api.add_resource(AllDevicesOfflineResource, "/devices/offline")
api.add_resource(AllDevicesOnlineResource, "/devices/online")
api.add_resource(DeviceResource, "/devices/{deviceid}")
api.add_resource(DeviceOfflineResrouce, "/devices/{deviceid}/offline")
api.add_resource(DeviceOnlineResrouce, "/devices/{deviceid}/online")
api.add_resource(SensorResource, "/devices/{deviceid}/{sensorid}")
api.add_resource(SensorOfflineResource, "/devices/{deviceid}/{sensorid}/offline")
api.add_resource(SensorOnlineResource, "/devices/{deviceid}/{sensorid}/online")
if __name__ == "__main__": if __name__ == "__main__":
app.run( app.run(

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import logging import logging
from mqtt_flask_instance import mqtt from mqtt_flask_instance import mqtt
import config
from db import db from db import db
from schemas import DeviceSchema from schemas import DeviceSchema, SensorSchema
""" """
MQTT Stuff MQTT Stuff
@ -14,7 +15,7 @@ __module_name__ = "mqtt_methods"
__version__text__ = "1" __version__text__ = "1"
deviceschema = DeviceSchema(many=False) deviceschema = DeviceSchema(many=False)
sensorschema = SensorSchema(many=False)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@ -26,9 +27,19 @@ def handle_status_message(client, userdata, message):
payload=message.payload.decode() payload=message.payload.decode()
) )
try: try:
status_message = deviceschema.load(data['payload'], session=db.session, transient=True).data ids = data['topic'].replace(f"{config.MQTT_STATUS_TOPIC}/", "").split("/")
LOGGER.info(f"Recieved status message from {data['payload']['id']}, persisting to db.") if len(ids) == 1:
db.session.merge(status_message) data['payload']['deviceID'] = ids[0]
status_message = deviceschema.load(data['payload'], session=db.session, transient=True).data
LOGGER.info(f"Recieved status message from {data['payload']['deviceID']}, persisting to db.")
db.session.merge(status_message)
else:
if len(ids) == 2:
data['payload']['deviceID'] = ids[0]
data['payload']['sensorID'] = ids[1]
status_message = sensorschema.load(data['payload'], session=db.session, transient=True).data
LOGGER.info(f"Recieved status message from sensor {data['payload']['sensorID']}, persisting to db.")
db.session.merge(status_message)
except Exception as e: except Exception as e:
db.session.rollback() db.session.rollback()
LOGGER.exception(e) LOGGER.exception(e)

View File

@ -21,7 +21,7 @@ LOGGER = logging.getLogger(__name__)
class AllDevicesResource(Resource): class AllDevicesResource(Resource):
""" """
Query and control all known devices Query all known devices
""" """
alldeviceschema = schemas.DeviceSchema(many=True) alldeviceschema = schemas.DeviceSchema(many=True)
@ -33,25 +33,29 @@ class AllDevicesResource(Resource):
alldevices = models.Device.query.all() alldevices = models.Device.query.all()
return self.alldeviceschema.dump(list(alldevices)), 200 return self.alldeviceschema.dump(list(alldevices)), 200
def delete(self):
class AllDevicesOfflineResource(Resource):
"""
Shut down all devices
"""
def post(self):
""" """
Shut down every device Shut down every device
:return: :return:
""" """
query_ids = db.session.query(models.Device.id).filter(models.Device.status != models.DeviceStatusEnum.offline) mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/+", {"command": "offline"})
ids = [value for value, in query_ids]
for d_id in ids:
mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": d_id, "command": "offline"})
class AllDevicesOnlineResource(Resource):
"""
Bring every device online
"""
def post(self): def post(self):
""" """
Bring every device online Bring every device online
:return: :return:
""" """
query_ids = db.session.query(models.Device.id).filter(models.Device.status != models.DeviceStatusEnum.online) mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/+", {"command": "online"})
ids = [value for value, in query_ids]
for d_id in ids:
mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": d_id, "command": "online"})
class DeviceResource(Resource): class DeviceResource(Resource):
@ -69,15 +73,25 @@ class DeviceResource(Resource):
device = models.Device.query.filter_by(id=deviceid).first_or_404() device = models.Device.query.filter_by(id=deviceid).first_or_404()
return self.deviceschema.dump(device), 200 return self.deviceschema.dump(device), 200
def delete(self, deviceid: str):
class DeviceOfflineResrouce(Resource):
"""
Bring a device offline
"""
def post(self, deviceid: str):
""" """
Shut down a device Shut down a device
:param deviceid: UUID of device :param deviceid: UUID of device
:return: :return:
""" """
device = db.session.query(models.Device.id).filter(str(models.Device.id) == deviceid).first_or_404()[0] device = db.session.query(models.Device.id).filter(str(models.Device.id) == deviceid).first_or_404()[0]
mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": device, "command": "offline"}) mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{device}", {"command": "offline"})
class DeviceOnlineResrouce(Resource):
"""
Bring a device online
"""
def post(self, deviceid: str): def post(self, deviceid: str):
""" """
Bring a device online Bring a device online
@ -85,7 +99,7 @@ class DeviceResource(Resource):
:return: :return:
""" """
device = db.session.query(models.Device.id).filter(str(models.Device.id) == deviceid).first_or_404()[0] device = db.session.query(models.Device.id).filter(str(models.Device.id) == deviceid).first_or_404()[0]
mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": device, "command": "offline"}) mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{device}", {"command": "online"})
class SensorResource(Resource): class SensorResource(Resource):
@ -104,7 +118,12 @@ class SensorResource(Resource):
sensor = models.Sensor.query.filter_by(device_id=deviceid, sensorid=sensorid).first_or_404() sensor = models.Sensor.query.filter_by(device_id=deviceid, sensorid=sensorid).first_or_404()
return self.sensorschema.dump(sensor) return self.sensorschema.dump(sensor)
def delete(self, deviceid: str, sensorid: str):
class SensorOfflineResource(Resource):
"""
Bring a sensor offline
"""
def post(self, deviceid: str, sensorid: str):
""" """
Shut down a sensor of a device Shut down a sensor of a device
:param deviceid: UUID of device :param deviceid: UUID of device
@ -113,8 +132,13 @@ class SensorResource(Resource):
""" """
sensor = db.session.query(models.Sensor.device_id, models.Sensor.id).filter( sensor = db.session.query(models.Sensor.device_id, models.Sensor.id).filter(
str(models.Sensor.device_id) == deviceid and str(models.Sensor.id) == sensorid).first_or_404() str(models.Sensor.device_id) == deviceid and str(models.Sensor.id) == sensorid).first_or_404()
mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": sensor[0], "sensorID": sensor[1], "command": "offline"}) mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{sensor[0]}/{sensor[1]}", {"command": "offline"})
class SensorOnlineResource(Resource):
"""
Bring a sensor online
"""
def post(self, deviceid: str, sensorid: str): def post(self, deviceid: str, sensorid: str):
""" """
Bring a sensor online Bring a sensor online
@ -124,4 +148,4 @@ class SensorResource(Resource):
""" """
sensor = db.session.query(models.Sensor.device_id, models.Sensor.id).filter( sensor = db.session.query(models.Sensor.device_id, models.Sensor.id).filter(
str(models.Sensor.device_id) == deviceid and str(models.Sensor.id) == sensorid).first_or_404() str(models.Sensor.device_id) == deviceid and str(models.Sensor.id) == sensorid).first_or_404()
mqtt.publish(config.MQTT_COMMAND_TOPIC, {"deviceID": sensor[0], "sensorID": sensor[1], "command": "online"}) mqtt.publish(f"{config.MQTT_COMMAND_TOPIC}/{sensor[0]}/{sensor[1]}", {"command": "online"})