diff --git a/src/app.py b/src/app.py index b7a1e75..96f037d 100644 --- a/src/app.py +++ b/src/app.py @@ -8,6 +8,7 @@ from sentry_sdk.integrations.flask import FlaskIntegration from config import * from db import db from marshm import ma +from rabbit_broker_instance import mq from resources import SampleResource, SampleParameterResource """ @@ -31,9 +32,16 @@ if SENTRY_DSN: app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}" +app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME +app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD +app.config['RABBIT_HOST'] = RABBITMQ_HOST +app.config['RABBIT_ROUTING_KEY'] = "feature" +app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE + api = Api(app) db.init_app(app) ma.init_app(app) +mq.init_app(app) with app.app_context(): db.create_all() diff --git a/src/flask_rabbit_broker.py b/src/flask_rabbit_broker.py new file mode 100644 index 0000000..01fcc6a --- /dev/null +++ b/src/flask_rabbit_broker.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +import pika + +""" +Flask Rabbit Broker +""" + +__author__ = '@tormakris' +__copyright__ = "Copyright 2020, Birbnetes Team" +__module_name__ = "flask_rabbit_broker" +__version__text__ = "1" + + +class FlaskRabbitBroker: + """Message Broker using RabbitMQ middleware""" + + def __init__(self, app=None): + """ + Create a new instance of Broker Rabbit by using + the given parameters to connect to RabbitMQ. + """ + self.app = app + self.exchange_name = None + self.username = None + self.password = None + self.rabbitmq_host = None + self.routing_key = None + self.connection = None + self.channel = None + + def init_app(self, app) -> None: + """ + Init the broker with the current application context + :param app: application context + :return: + """ + self.username = app.context.get('RABBIT_USERNAME') + self.password = app.context.get('RABBIT_PASSWORD') + self.rabbitmq_host = app.context.get('RABBIT_HOST') + self.exchange_name = app.context.get('EXCHANGE_NAME') + self.routing_key = app.context.get('RABBIT_ROUTING_KEY') + self.init_connection(timeout=5) + self.init_exchange() + + def init_connection(self, timeout: int = 5) -> None: + """" + Init RabbitMQ connection + :param timeout: timeout of connection + :return: + """ + credentials = pika.PlainCredentials(self.username, self.password) + self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host, + credentials=credentials, + heartbeat=0, + socket_timeout=timeout)) + + def close_connection(self) -> None: + self.connection.close() + + def init_exchange(self) -> None: + """ + Init the exchange use to send messages + :return: + """ + channel = self.connection.channel() + try: + channel.exchange_declare(exchange=self.exchange_name, + exchange_type='fanout', + durable=True, + auto_delete=False) + finally: + channel.close() + + def send(self, message: str) -> None: + """ + Sends a message to the declared exchange. + :param message: + :return: + """ + channel = self.connection.channel() + try: + channel.basic_publish( + exchange=self.exchange_name, + routing_key=self.routing_key, + body=message.encode('utf-8')) + finally: + channel.close() diff --git a/src/rabbit_broker_instance.py b/src/rabbit_broker_instance.py new file mode 100644 index 0000000..9c43ace --- /dev/null +++ b/src/rabbit_broker_instance.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +from flask_rabbit_broker import FlaskRabbitBroker + +""" +Rabbit Broker instance +""" + +__author__ = '@tormakris' +__copyright__ = "Copyright 2020, Birbnetes Team" +__module_name__ = "rabbit_broker_instance" +__version__text__ = "1" + +mq = FlaskRabbitBroker() diff --git a/src/resources.py b/src/resources.py index 2ee8ec7..39d3fa1 100644 --- a/src/resources.py +++ b/src/resources.py @@ -10,6 +10,7 @@ from db import db from models import SampleMetadata from schemas import SampleSchema, SampleMetadataSchema from config import * +from rabbit_broker_instance import mq """ Flask Restful endpoints @@ -76,14 +77,7 @@ class SampleResource(Resource): soundfile, soundfile.content_type, {'Content-Length': soundfile.content_length})}).raise_for_status() - credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) - rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials)) - rabbitmq_channel = rabbitmq.channel() - rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout') - rabbitmq_channel.basic_publish( - exchange=RABBITMQ_EXCHANGE, - routing_key='feature', - body=json.dumps({'tag': generated_tag}).encode('utf-8')) + mq.send(json.dumps({'tag': generated_tag})) except Exception as e: LOGGER.exception(e) db.session.rollback()