From fdacc7a36dd02c2c10bb5d8d3e29048f9d80c3e7 Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 19 Aug 2021 03:25:30 +0200 Subject: [PATCH] Added amqp --- requirements.txt | 2 + storage_service/app.py | 10 ++- storage_service/config.py | 3 + storage_service/utils/__init__.py | 1 + storage_service/utils/healthchecks.py | 14 ++- storage_service/utils/magic_amqp.py | 120 ++++++++++++++++++++++++++ 6 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 storage_service/utils/magic_amqp.py diff --git a/requirements.txt b/requirements.txt index 608a19b..7e5adf2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,8 @@ sentry_sdk flask_minio minio~=6.0.0 py-healthcheck +pika~=1.2.0 +apscheduler~=3.7.0 jaeger-client Flask-Opentracing diff --git a/storage_service/app.py b/storage_service/app.py index dd5710f..1f5ac86 100644 --- a/storage_service/app.py +++ b/storage_service/app.py @@ -4,9 +4,11 @@ from sentry_sdk.integrations.flask import FlaskIntegration from flask import Flask from werkzeug.middleware.proxy_fix import ProxyFix +import atexit +from apscheduler.schedulers.background import BackgroundScheduler # import stuff -from utils import register_all_error_handlers, storage, register_health_checks +from utils import register_all_error_handlers, storage, register_health_checks, magic_amqp # import views from views import ObjectView @@ -37,6 +39,12 @@ app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1) # init stuff storage.init_app(app) +magic_amqp.init_app(app) + +ampq_loop_scheduler = BackgroundScheduler() +ampq_loop_scheduler.add_job(func=lambda: magic_amqp.loop(), trigger="interval", seconds=5) +atexit.register(lambda: ampq_loop_scheduler.shutdown()) + # register error handlers register_all_error_handlers(app) diff --git a/storage_service/config.py b/storage_service/config.py index 3871366..587bbda 100644 --- a/storage_service/config.py +++ b/storage_service/config.py @@ -14,3 +14,6 @@ class Config: SENTRY_DSN = os.environ.get("SENTRY_DSN") RELEASE_ID = os.environ.get("RELEASE_ID", "test") RELEASEMODE = os.environ.get("RELEASEMODE", "dev") + + PIKA_URL = os.environ['PIKA_URL'] + EXCHANGE_NAME = os.environ.get("EXCHANGE_NAME", "sample-ready") diff --git a/storage_service/utils/__init__.py b/storage_service/utils/__init__.py index 2586551..58c9ed4 100644 --- a/storage_service/utils/__init__.py +++ b/storage_service/utils/__init__.py @@ -3,3 +3,4 @@ from .require_decorators import json_required from .error_handlers import register_all_error_handlers from .storage import storage from .healthchecks import register_health_checks +from .magic_amqp import magic_amqp diff --git a/storage_service/utils/healthchecks.py b/storage_service/utils/healthchecks.py index 6d049e4..8234c95 100644 --- a/storage_service/utils/healthchecks.py +++ b/storage_service/utils/healthchecks.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from utils import storage +from utils import storage, magic_amqp from healthcheck import HealthCheck from flask import Flask @@ -15,7 +15,19 @@ def health_database_status(): return is_database_working, output +def amqp_connection_status(): + if magic_amqp.is_healthy(): + result = True + text = "amqp connection is ok" + else: + result = False + text = "amqp connection is unhealthy" + + return result, text + + def register_health_checks(app: Flask): health = HealthCheck() health.add_check(health_database_status) + health.add_check(amqp_connection_status) app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) diff --git a/storage_service/utils/magic_amqp.py b/storage_service/utils/magic_amqp.py new file mode 100644 index 0000000..1c6db48 --- /dev/null +++ b/storage_service/utils/magic_amqp.py @@ -0,0 +1,120 @@ +from flask import Flask +from threading import Lock +import pika +import pika.exceptions +import json +import time + +import opentracing +from opentracing.ext import tags +from opentracing.propagation import Format + + +class MagicAMQP: + """ + This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant. + """ + + def __init__(self, app: Flask = None): + self.app = app + if app: + self.init_app(app) + + self._lock = Lock() + self._credentials = None + + def init_app(self, app: Flask): + self.app = app + self.app.config.setdefault('PIKA_URL', None) + self.app.config.setdefault('EXCHANGE_NAME', None) + + self._reconnect_ampq() + + def _reconnect_ampq(self): + self._pika_connection = pika.BlockingConnection(pika.connection.URLParameters(self.app.config['PIKA_URL'])) + self._pika_channel = self._pika_connection.channel() + self._pika_channel.exchange_declare( + exchange=self.app.config['EXCHANGE_NAME'], + exchange_type='direct' + ) + + def loop(self): + """ + This method should be called periodically to keep up the connection + """ + lock_start = time.time() + with self._lock: + lock_acquire_time = time.time() - lock_start + if lock_acquire_time >= 0.5: + self.app.logger.warning(f"Loop: Lock acquire took {lock_acquire_time:5f} sec") + + try: + self._pika_connection.process_data_events(0) + # We won't attempt retry if this fail + except pika.exceptions.AMQPConnectionError as e: + self.app.logger.warning(f"Connection error during process loop: {e} (attempting reconnect)") + self._reconnect_ampq() + + total_time = time.time() - lock_start + if total_time > 1: + self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") + + def publish(self, payload=None): + """ + Publish a simple json serialized message to the configured queue. + If the connection is broken, then this call will block until the connection is restored + """ + span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER} + with opentracing.tracer.start_active_span('magic_amqp.publish', tags=span_tags) as scope: + opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, payload) + lock_start = time.time() + with self._lock: + scope.span.log_kv({'event': 'lockAcquired'}) + lock_acquire_time = time.time() - lock_start + if lock_acquire_time >= 0.2: + self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec") + tries = 0 + while True: + try: + self._pika_channel.basic_publish( + exchange=self.app.config['EXCHANGE_NAME'], + routing_key="sample", + body=json.dumps(payload).encode('UTF-8') + ) + self.app.logger.debug(f"Published: {payload}") + break # message sent successfully + except pika.exceptions.AMQPConnectionError as e: + scope.span.log_kv({'event': 'connectionError', 'error': str(e)}) + self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)") + + if tries > 30: + raise # just give up + + while True: + try: + self._reconnect_ampq() + break + except pika.exceptions.AMQPConnectionError as e: + self.app.logger.warning( + f"Connection error during reconnection: {e} (attempting reconnect)") + tries += 1 + + if tries > 30: + raise # just give up + + if tries > 10: + time.sleep(2) + total_time = time.time() - lock_start + if total_time > 0.4: + self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") + + def is_healthy(self) -> bool: + with self._lock: + if not self._pika_channel: + return False + + return self._pika_channel.is_open and self._pika_connection.is_open + + +# instance to be used in the flask app +magic_amqp = MagicAMQP()