From 80d115e488d2098f8cc051a72432ab153c9ae9ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Tue, 4 Aug 2020 03:23:46 +0200 Subject: [PATCH] use flask-pika --- requirements.txt | 1 + src/app.py | 20 ++++--- src/flask_rabbit_broker.py | 102 ---------------------------------- src/fpika.py | 14 +++++ src/rabbit_broker_instance.py | 14 ----- src/rabbitmqqueue.py | 18 ------ src/resources.py | 8 ++- 7 files changed, 31 insertions(+), 146 deletions(-) delete mode 100644 src/flask_rabbit_broker.py create mode 100644 src/fpika.py delete mode 100644 src/rabbit_broker_instance.py delete mode 100644 src/rabbitmqqueue.py diff --git a/requirements.txt b/requirements.txt index 9448263..7ce7b88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ psycopg2-binary marshmallow marshmallow-sqlalchemy flask-marshmallow +flask-pika diff --git a/src/app.py b/src/app.py index ff7ab45..7859aa1 100644 --- a/src/app.py +++ b/src/app.py @@ -8,7 +8,7 @@ from sentry_sdk.integrations.flask import FlaskIntegration from config import * from db import db from marshm import ma -from rabbit_broker_instance import mq +from fpika import fpika from resources import SampleResource, SampleParameterResource """ @@ -31,19 +31,21 @@ if SENTRY_DSN: _experiments={"auto_enabling_integrations": True} ) - app = Flask(__name__) -app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}" -app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME -app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD -app.config['RABBIT_HOST'] = RABBITMQ_HOST -app.config['RABBIT_ROUTING_KEY'] = "feature" +app.config[ + 'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}" app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE - +app.config['FLASK_PIKA_PARAMS'] = {'host': RABBITMQ_HOST, + 'username': RABBITMQ_USERNAME, + 'password': RABBITMQ_PASSWORD, + 'port': 5672, + 'virtual_host': '/'} +app.config['FLASK_PIKA_POOL_PARAMS'] = {'pool_size': 4, + 'pool_recycle': 60} api = Api(app) db.init_app(app) ma.init_app(app) -mq.init_app(app) +fpika.init_app(app) with app.app_context(): db.create_all() diff --git a/src/flask_rabbit_broker.py b/src/flask_rabbit_broker.py deleted file mode 100644 index 5eff525..0000000 --- a/src/flask_rabbit_broker.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python3 -import uuid -import pika - -""" -Flask Rabbit Broker -""" - -__author__ = '@tormakris' -__copyright__ = "Copyright 2020, Birbnetes Team" -__module_name__ = "flask_rabbit_broker" -__version__text__ = "1" - - -class FlaskRabbitBroker: - """Message Broker using RabbitMQ middleware""" - - def __init__(self, app=None): - """ - Create a new instance of Broker Rabbit by using - the given parameters to connect to RabbitMQ. - """ - self.app = app - self.exchange_name = None - self.username = None - self.password = None - self.rabbitmq_host = None - self.routing_key = None - self.connection = None - self.channel = None - self.exchange = None - self.exchange_type = "fanout" - - def init_app(self, app) -> None: - """ - Init the broker with the current application context - :param app: application context - :return: - """ - self.username = app.config.get('RABBIT_USERNAME') - self.password = app.config.get('RABBIT_PASSWORD') - self.rabbitmq_host = app.config.get('RABBIT_HOST') - self.exchange_name = app.config.get('EXCHANGE_NAME') - self.routing_key = app.config.get('RABBIT_ROUTING_KEY') - self.init_connection(timeout=5) - self.init_exchange() - - def init_connection(self, timeout: int = 5) -> None: - """" - Init RabbitMQ connection - :param timeout: timeout of connection - :return: - """ - credentials = pika.PlainCredentials(self.username, self.password) - self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host, - credentials=credentials, - heartbeat=0, - socket_timeout=timeout)) - - def close_connection(self) -> None: - self.connection.close() - - def init_exchange(self) -> None: - """ - Init the exchange use to send messages - :return: - """ - channel = self.connection.channel() - try: - channel.exchange_declare(exchange=self.exchange_name, - exchange_type=self.exchange_type, - durable=True, - auto_delete=False) - finally: - channel.close() - - def register_callback(self, callback) -> None: - """ - Register a callback. - :param callback: - :return: - """ - channel = self.connection.channel() - queue = channel.queue_declare(durable=True, auto_delete=False, exclusive=True, - queue=uuid.uuid4().urn.split(':')[2]).method.queue - channel.bind(exchange=self.exchange_name, queue=queue) - channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) - - def send(self, message: str) -> None: - """ - Sends a message to the declared exchange. - :param message: - :return: - """ - channel = self.connection.channel() - try: - channel.basic_publish( - exchange=self.exchange_name, - routing_key=self.routing_key, - body=message.encode('utf-8')) - finally: - channel.close() diff --git a/src/fpika.py b/src/fpika.py new file mode 100644 index 0000000..23b7175 --- /dev/null +++ b/src/fpika.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +""" +Flask-Pika API +""" + +__author__ = '@tormakris' +__copyright__ = "Copyright 2020, Birbnetes Team" +__module_name__ = "fpika" +__version__text__ = "1" + +from flask_pika import Pika as Fpika + +fpika = Fpika() diff --git a/src/rabbit_broker_instance.py b/src/rabbit_broker_instance.py deleted file mode 100644 index 9c43ace..0000000 --- a/src/rabbit_broker_instance.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python3 - -from flask_rabbit_broker import FlaskRabbitBroker - -""" -Rabbit Broker instance -""" - -__author__ = '@tormakris' -__copyright__ = "Copyright 2020, Birbnetes Team" -__module_name__ = "rabbit_broker_instance" -__version__text__ = "1" - -mq = FlaskRabbitBroker() diff --git a/src/rabbitmqqueue.py b/src/rabbitmqqueue.py deleted file mode 100644 index 1ad8dcf..0000000 --- a/src/rabbitmqqueue.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 - -import pika -from config import * - -""" -Rabbitmq setup -""" - -__author__ = '@tormakris' -__copyright__ = "Copyright 2020, Birbnetes Team" -__module_name__ = "endpoints" -__version__text__ = "1" - -credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) -rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials)) -rabbitmq_channel = rabbitmq.channel() -rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout') diff --git a/src/resources.py b/src/resources.py index 39d3fa1..338bd93 100644 --- a/src/resources.py +++ b/src/resources.py @@ -5,12 +5,11 @@ from xeger import Xeger from flask_restful import Resource from flask import request import requests -import pika from db import db from models import SampleMetadata from schemas import SampleSchema, SampleMetadataSchema from config import * -from rabbit_broker_instance import mq +from fpika import fpika """ Flask Restful endpoints @@ -77,7 +76,10 @@ class SampleResource(Resource): soundfile, soundfile.content_type, {'Content-Length': soundfile.content_length})}).raise_for_status() - mq.send(json.dumps({'tag': generated_tag})) + ch = fpika.channel() + ch.basic_publish(exchange=RABBITMQ_EXCHANGE, routing_key='feature', + body=json.dumps({'tag': generated_tag}).encode('UTF-8')) + fpika.return_channel(ch) except Exception as e: LOGGER.exception(e) db.session.rollback()