diff --git a/src/app.py b/src/app.py index e7c26b9..cc9b01a 100644 --- a/src/app.py +++ b/src/app.py @@ -35,7 +35,7 @@ if config.SENTRY_DSN: ) -def setup_rabbit() -> None: +def setup_rabbit(mqtt_: MQTT) -> None: logging.info("Connecting to RabbitMQ") credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD) connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME, @@ -48,23 +48,30 @@ def setup_rabbit() -> None: queue = channel.queue_declare(durable=True, auto_delete=True, queue=uuid.uuid4().urn.split(':')[2], exclusive=True).method.queue channel.queue_bind(exchange=config.RABBIT_EXCHANGE, queue=queue) - channel.basic_consume(queue=queue, on_message_callback=on_message, auto_ack=True) + channel.basic_consume(queue=queue, on_message_callback=on_message_creator(mqtt_), auto_ack=True) logging.debug("Starting consumption") channel.start_consuming() -def on_message(channel, method_frame, header_frame, body): - msg_json = json.loads(body) - if 'probability' not in msg_json: - logging.error("Malformed message from broker") - if msg_json['probability'] > 0.5: - r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}") - r.raise_for_status() - if 'device_id' not in r.json(): - logging.error("Input-service response invalid") - logging.info(f"Sending alert command to device {r.json()['device_id']}") - mqtt.publish(subtopic=r.json()['device_id'], - message=json.dumps({"command": "doAlert"})) +def on_message_creator(mqtt_: MQTT): + """ + This generator is used, so that the mqtt object can be injected just when the callback is registered + """ + + def on_message(channel, method_frame, header_frame, body): + msg_json = json.loads(body) + if 'probability' not in msg_json: + logging.error("Malformed message from broker") + if msg_json['probability'] > 0.5: + r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}") + r.raise_for_status() + if 'device_id' not in r.json(): + logging.error("Input-service response invalid") + logging.info(f"Sending alert command to device {r.json()['device_id']}") + mqtt_.publish(subtopic=r.json()['device_id'], + message=json.dumps({"command": "doAlert"})) + + return on_message if __name__ == "__main__": @@ -74,4 +81,4 @@ if __name__ == "__main__": mqtt = MQTT() mqtt.topic = config.MQTT_TOPIC mqtt.connect() - setup_rabbit() + setup_rabbit(mqtt)