diff --git a/requirements.txt b/requirements.txt index 98188e3..7edd6a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,5 @@ flask-marshmallow py-healthcheck Flask-InfluxDB tzdata -tzlocal \ No newline at end of file +tzlocal +apscheduler~=3.7.0 \ No newline at end of file diff --git a/src/app.py b/src/app.py index 0132fe0..1638ae1 100644 --- a/src/app.py +++ b/src/app.py @@ -13,6 +13,11 @@ from influxus import influx_db from resources import SampleResource, SampleParameterResource from healthchecks import health_database_status +import atexit + +from apscheduler.schedulers.background import BackgroundScheduler +from magic_ampq import magic_ampq + """ Main Flask RESTful API """ @@ -40,6 +45,16 @@ api = Api(app) health = HealthCheck() db.init_app(app) ma.init_app(app) + +# ampq magic stuff +magic_ampq.init_app(app) + +ampq_loop_scheduler = BackgroundScheduler() +ampq_loop_scheduler.add_job(func=lambda: magic_ampq.loop(), trigger="interval", seconds=5) +atexit.register(lambda: ampq_loop_scheduler.shutdown()) + +ampq_loop_scheduler.start() + if Config.ENABLE_INFLUXDB: influx_db.init_app(app) @@ -56,3 +71,4 @@ api.add_resource(SampleParameterResource, '/sample/') health.add_check(health_database_status) app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) + diff --git a/src/magic_ampq.py b/src/magic_ampq.py new file mode 100644 index 0000000..97b2c70 --- /dev/null +++ b/src/magic_ampq.py @@ -0,0 +1,95 @@ +from flask import Flask +from threading import Lock +import pika +import pika.exceptions +import json +import time + + +class MagicAMPQ: + """ + This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant. + """ + + def __init__(self, app: Flask = None): + self.app = app + if app: + self.init_app(app) + + self._lock = Lock() + self._credentials = None + + def init_app(self, app: Flask): + self.app = app + self.app.config.setdefault('FLASK_PIKA_PARAMS', {}) + self.app.config.setdefault('EXCHANGE_NAME', None) + self.app.config.setdefault('RABBITMQ_QUEUE', None) + + self._credentials = pika.PlainCredentials( + app.config['FLASK_PIKA_PARAMS']['username'], + app.config['FLASK_PIKA_PARAMS']['password'] + ) + + self._reconnect_ampq() + + def _reconnect_ampq(self): + self._pika_connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=self.app.config['FLASK_PIKA_PARAMS']['host'], + credentials=self._credentials, + heartbeat=10, + socket_timeout=5) + ) + self._pika_channel = self._pika_connection.channel() + self._pika_channel.exchange_declare( + exchange=self.app.config['EXCHANGE_NAME'], + exchange_type='direct' + ) + + def loop(self): + """ + This method should be called periodically to keep up the connection + """ + with self._lock: + try: + self._pika_connection.process_data_events(0) + # We won't attempt retry if this fail + except pika.exceptions.AMQPConnectionError: + self._reconnect_ampq() + + def publish(self, payload=None): + """ + Publish a simple json serialized message to the configured queue. + If the connection is broken, then this call will block until the connection is restored + """ + with self._lock: + tries = 0 + while True: + try: + self._pika_channel.basic_publish( + exchange=self.app.config['EXCHANGE_NAME'], + routing_key='feature', + body=json.dumps(payload).encode('UTF-8') + ) + break # message sent successfully + except pika.exceptions.AMQPConnectionError: + + if tries > 30: + raise # just give up + + while True: + try: + self._reconnect_ampq() + break + except pika.exceptions.AMQPConnectionError: + tries += 1 + + if tries > 30: + raise # just give up + + if tries > 10: + time.sleep(2) + + +# instance to be used in the flask app +magic_ampq = MagicAMPQ() diff --git a/src/resources.py b/src/resources.py index 9200e12..aca3fc8 100644 --- a/src/resources.py +++ b/src/resources.py @@ -6,7 +6,7 @@ from xeger import Xeger from flask_restful import Resource from flask import request, current_app, abort import requests -import pika +from magic_ampq import magic_ampq from db import db from influxus import influx_db from models import SampleMetadata @@ -94,26 +94,14 @@ class SampleResource(Resource): r = requests.post( f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", - files=files) + files=files + ) if r.status_code not in [200, 201]: return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") try: - credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'], - current_app.config['FLASK_PIKA_PARAMS']['password']) - connection = pika.BlockingConnection( - pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'], - credentials=credentials, - heartbeat=0, - socket_timeout=5)) - channel = connection.channel() - channel.exchange_declare(exchange=current_app.config['EXCHANGE_NAME'], - exchange_type='direct') - channel.basic_publish(exchange=current_app.config['EXCHANGE_NAME'], - routing_key='feature', - body=json.dumps({'tag': generated_tag}).encode('UTF-8')) - connection.close() + magic_ampq.publish({'tag': generated_tag}) except Exception as e: current_app.logger.exception(e) return abort(569, "AMPQ Publish error")