Added amqp

This commit is contained in:
Pünkösd Marcell 2021-08-19 03:25:30 +02:00
parent 5a5c510fa0
commit fdacc7a36d
6 changed files with 148 additions and 2 deletions

View File

@ -9,6 +9,8 @@ sentry_sdk
flask_minio flask_minio
minio~=6.0.0 minio~=6.0.0
py-healthcheck py-healthcheck
pika~=1.2.0
apscheduler~=3.7.0
jaeger-client jaeger-client
Flask-Opentracing Flask-Opentracing

View File

@ -4,9 +4,11 @@ from sentry_sdk.integrations.flask import FlaskIntegration
from flask import Flask from flask import Flask
from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
# import stuff # import stuff
from utils import register_all_error_handlers, storage, register_health_checks from utils import register_all_error_handlers, storage, register_health_checks, magic_amqp
# import views # import views
from views import ObjectView from views import ObjectView
@ -37,6 +39,12 @@ app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
# init stuff # init stuff
storage.init_app(app) storage.init_app(app)
magic_amqp.init_app(app)
ampq_loop_scheduler = BackgroundScheduler()
ampq_loop_scheduler.add_job(func=lambda: magic_amqp.loop(), trigger="interval", seconds=5)
atexit.register(lambda: ampq_loop_scheduler.shutdown())
# register error handlers # register error handlers
register_all_error_handlers(app) register_all_error_handlers(app)

View File

@ -14,3 +14,6 @@ class Config:
SENTRY_DSN = os.environ.get("SENTRY_DSN") SENTRY_DSN = os.environ.get("SENTRY_DSN")
RELEASE_ID = os.environ.get("RELEASE_ID", "test") RELEASE_ID = os.environ.get("RELEASE_ID", "test")
RELEASEMODE = os.environ.get("RELEASEMODE", "dev") RELEASEMODE = os.environ.get("RELEASEMODE", "dev")
PIKA_URL = os.environ['PIKA_URL']
EXCHANGE_NAME = os.environ.get("EXCHANGE_NAME", "sample-ready")

View File

@ -3,3 +3,4 @@ from .require_decorators import json_required
from .error_handlers import register_all_error_handlers from .error_handlers import register_all_error_handlers
from .storage import storage from .storage import storage
from .healthchecks import register_health_checks from .healthchecks import register_health_checks
from .magic_amqp import magic_amqp

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from utils import storage from utils import storage, magic_amqp
from healthcheck import HealthCheck from healthcheck import HealthCheck
from flask import Flask from flask import Flask
@ -15,7 +15,19 @@ def health_database_status():
return is_database_working, output return is_database_working, output
def amqp_connection_status():
if magic_amqp.is_healthy():
result = True
text = "amqp connection is ok"
else:
result = False
text = "amqp connection is unhealthy"
return result, text
def register_health_checks(app: Flask): def register_health_checks(app: Flask):
health = HealthCheck() health = HealthCheck()
health.add_check(health_database_status) health.add_check(health_database_status)
health.add_check(amqp_connection_status)
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())

View File

@ -0,0 +1,120 @@
from flask import Flask
from threading import Lock
import pika
import pika.exceptions
import json
import time
import opentracing
from opentracing.ext import tags
from opentracing.propagation import Format
class MagicAMQP:
"""
This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant.
"""
def __init__(self, app: Flask = None):
self.app = app
if app:
self.init_app(app)
self._lock = Lock()
self._credentials = None
def init_app(self, app: Flask):
self.app = app
self.app.config.setdefault('PIKA_URL', None)
self.app.config.setdefault('EXCHANGE_NAME', None)
self._reconnect_ampq()
def _reconnect_ampq(self):
self._pika_connection = pika.BlockingConnection(pika.connection.URLParameters(self.app.config['PIKA_URL']))
self._pika_channel = self._pika_connection.channel()
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME'],
exchange_type='direct'
)
def loop(self):
"""
This method should be called periodically to keep up the connection
"""
lock_start = time.time()
with self._lock:
lock_acquire_time = time.time() - lock_start
if lock_acquire_time >= 0.5:
self.app.logger.warning(f"Loop: Lock acquire took {lock_acquire_time:5f} sec")
try:
self._pika_connection.process_data_events(0)
# We won't attempt retry if this fail
except pika.exceptions.AMQPConnectionError as e:
self.app.logger.warning(f"Connection error during process loop: {e} (attempting reconnect)")
self._reconnect_ampq()
total_time = time.time() - lock_start
if total_time > 1:
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
def publish(self, payload=None):
"""
Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored
"""
span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER}
with opentracing.tracer.start_active_span('magic_amqp.publish', tags=span_tags) as scope:
opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, payload)
lock_start = time.time()
with self._lock:
scope.span.log_kv({'event': 'lockAcquired'})
lock_acquire_time = time.time() - lock_start
if lock_acquire_time >= 0.2:
self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec")
tries = 0
while True:
try:
self._pika_channel.basic_publish(
exchange=self.app.config['EXCHANGE_NAME'],
routing_key="sample",
body=json.dumps(payload).encode('UTF-8')
)
self.app.logger.debug(f"Published: {payload}")
break # message sent successfully
except pika.exceptions.AMQPConnectionError as e:
scope.span.log_kv({'event': 'connectionError', 'error': str(e)})
self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)")
if tries > 30:
raise # just give up
while True:
try:
self._reconnect_ampq()
break
except pika.exceptions.AMQPConnectionError as e:
self.app.logger.warning(
f"Connection error during reconnection: {e} (attempting reconnect)")
tries += 1
if tries > 30:
raise # just give up
if tries > 10:
time.sleep(2)
total_time = time.time() - lock_start
if total_time > 0.4:
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
def is_healthy(self) -> bool:
with self._lock:
if not self._pika_channel:
return False
return self._pika_channel.is_open and self._pika_connection.is_open
# instance to be used in the flask app
magic_amqp = MagicAMQP()