Did stuff with rabbitmq
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Pünkösd Marcell 2021-07-26 15:18:08 +02:00
parent f15517af62
commit e64137ca56
4 changed files with 117 additions and 17 deletions

View File

@ -15,4 +15,5 @@ flask-marshmallow
py-healthcheck
Flask-InfluxDB
tzdata
tzlocal
tzlocal
apscheduler~=3.7.0

View File

@ -13,6 +13,11 @@ from influxus import influx_db
from resources import SampleResource, SampleParameterResource
from healthchecks import health_database_status
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
from magic_ampq import magic_ampq
"""
Main Flask RESTful API
"""
@ -40,6 +45,16 @@ api = Api(app)
health = HealthCheck()
db.init_app(app)
ma.init_app(app)
# ampq magic stuff
magic_ampq.init_app(app)
ampq_loop_scheduler = BackgroundScheduler()
ampq_loop_scheduler.add_job(func=lambda: magic_ampq.loop(), trigger="interval", seconds=5)
atexit.register(lambda: ampq_loop_scheduler.shutdown())
ampq_loop_scheduler.start()
if Config.ENABLE_INFLUXDB:
influx_db.init_app(app)
@ -56,3 +71,4 @@ api.add_resource(SampleParameterResource, '/sample/<tag>')
health.add_check(health_database_status)
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())

95
src/magic_ampq.py Normal file
View File

@ -0,0 +1,95 @@
from flask import Flask
from threading import Lock
import pika
import pika.exceptions
import json
import time
class MagicAMPQ:
"""
This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant.
"""
def __init__(self, app: Flask = None):
self.app = app
if app:
self.init_app(app)
self._lock = Lock()
self._credentials = None
def init_app(self, app: Flask):
self.app = app
self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
self.app.config.setdefault('EXCHANGE_NAME', None)
self.app.config.setdefault('RABBITMQ_QUEUE', None)
self._credentials = pika.PlainCredentials(
app.config['FLASK_PIKA_PARAMS']['username'],
app.config['FLASK_PIKA_PARAMS']['password']
)
self._reconnect_ampq()
def _reconnect_ampq(self):
self._pika_connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.app.config['FLASK_PIKA_PARAMS']['host'],
credentials=self._credentials,
heartbeat=10,
socket_timeout=5)
)
self._pika_channel = self._pika_connection.channel()
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME'],
exchange_type='direct'
)
def loop(self):
"""
This method should be called periodically to keep up the connection
"""
with self._lock:
try:
self._pika_connection.process_data_events(0)
# We won't attempt retry if this fail
except pika.exceptions.AMQPConnectionError:
self._reconnect_ampq()
def publish(self, payload=None):
"""
Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored
"""
with self._lock:
tries = 0
while True:
try:
self._pika_channel.basic_publish(
exchange=self.app.config['EXCHANGE_NAME'],
routing_key='feature',
body=json.dumps(payload).encode('UTF-8')
)
break # message sent successfully
except pika.exceptions.AMQPConnectionError:
if tries > 30:
raise # just give up
while True:
try:
self._reconnect_ampq()
break
except pika.exceptions.AMQPConnectionError:
tries += 1
if tries > 30:
raise # just give up
if tries > 10:
time.sleep(2)
# instance to be used in the flask app
magic_ampq = MagicAMPQ()

View File

@ -6,7 +6,7 @@ from xeger import Xeger
from flask_restful import Resource
from flask import request, current_app, abort
import requests
import pika
from magic_ampq import magic_ampq
from db import db
from influxus import influx_db
from models import SampleMetadata
@ -94,26 +94,14 @@ class SampleResource(Resource):
r = requests.post(
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
files=files)
files=files
)
if r.status_code not in [200, 201]:
return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
try:
credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'],
current_app.config['FLASK_PIKA_PARAMS']['password'])
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'],
credentials=credentials,
heartbeat=0,
socket_timeout=5))
channel = connection.channel()
channel.exchange_declare(exchange=current_app.config['EXCHANGE_NAME'],
exchange_type='direct')
channel.basic_publish(exchange=current_app.config['EXCHANGE_NAME'],
routing_key='feature',
body=json.dumps({'tag': generated_tag}).encode('UTF-8'))
connection.close()
magic_ampq.publish({'tag': generated_tag})
except Exception as e:
current_app.logger.exception(e)
return abort(569, "AMPQ Publish error")