use flask-pika
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Torma Kristóf 2020-08-04 03:23:46 +02:00
parent 5b8d88339f
commit 80d115e488
Signed by: tormakris
GPG Key ID: DC83C4F2C41B1047
7 changed files with 31 additions and 146 deletions

View File

@ -12,3 +12,4 @@ psycopg2-binary
marshmallow
marshmallow-sqlalchemy
flask-marshmallow
flask-pika

View File

@ -8,7 +8,7 @@ from sentry_sdk.integrations.flask import FlaskIntegration
from config import *
from db import db
from marshm import ma
from rabbit_broker_instance import mq
from fpika import fpika
from resources import SampleResource, SampleParameterResource
"""
@ -31,19 +31,21 @@ if SENTRY_DSN:
_experiments={"auto_enabling_integrations": True}
)
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME
app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD
app.config['RABBIT_HOST'] = RABBITMQ_HOST
app.config['RABBIT_ROUTING_KEY'] = "feature"
app.config[
'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE
app.config['FLASK_PIKA_PARAMS'] = {'host': RABBITMQ_HOST,
'username': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
'port': 5672,
'virtual_host': '/'}
app.config['FLASK_PIKA_POOL_PARAMS'] = {'pool_size': 4,
'pool_recycle': 60}
api = Api(app)
db.init_app(app)
ma.init_app(app)
mq.init_app(app)
fpika.init_app(app)
with app.app_context():
db.create_all()

View File

@ -1,102 +0,0 @@
#!/usr/bin/env python3
import uuid
import pika
"""
Flask Rabbit Broker
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "flask_rabbit_broker"
__version__text__ = "1"
class FlaskRabbitBroker:
"""Message Broker using RabbitMQ middleware"""
def __init__(self, app=None):
"""
Create a new instance of Broker Rabbit by using
the given parameters to connect to RabbitMQ.
"""
self.app = app
self.exchange_name = None
self.username = None
self.password = None
self.rabbitmq_host = None
self.routing_key = None
self.connection = None
self.channel = None
self.exchange = None
self.exchange_type = "fanout"
def init_app(self, app) -> None:
"""
Init the broker with the current application context
:param app: application context
:return:
"""
self.username = app.config.get('RABBIT_USERNAME')
self.password = app.config.get('RABBIT_PASSWORD')
self.rabbitmq_host = app.config.get('RABBIT_HOST')
self.exchange_name = app.config.get('EXCHANGE_NAME')
self.routing_key = app.config.get('RABBIT_ROUTING_KEY')
self.init_connection(timeout=5)
self.init_exchange()
def init_connection(self, timeout: int = 5) -> None:
""""
Init RabbitMQ connection
:param timeout: timeout of connection
:return:
"""
credentials = pika.PlainCredentials(self.username, self.password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host,
credentials=credentials,
heartbeat=0,
socket_timeout=timeout))
def close_connection(self) -> None:
self.connection.close()
def init_exchange(self) -> None:
"""
Init the exchange use to send messages
:return:
"""
channel = self.connection.channel()
try:
channel.exchange_declare(exchange=self.exchange_name,
exchange_type=self.exchange_type,
durable=True,
auto_delete=False)
finally:
channel.close()
def register_callback(self, callback) -> None:
"""
Register a callback.
:param callback:
:return:
"""
channel = self.connection.channel()
queue = channel.queue_declare(durable=True, auto_delete=False, exclusive=True,
queue=uuid.uuid4().urn.split(':')[2]).method.queue
channel.bind(exchange=self.exchange_name, queue=queue)
channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
def send(self, message: str) -> None:
"""
Sends a message to the declared exchange.
:param message:
:return:
"""
channel = self.connection.channel()
try:
channel.basic_publish(
exchange=self.exchange_name,
routing_key=self.routing_key,
body=message.encode('utf-8'))
finally:
channel.close()

14
src/fpika.py Normal file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""
Flask-Pika API
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "fpika"
__version__text__ = "1"
from flask_pika import Pika as Fpika
fpika = Fpika()

View File

@ -1,14 +0,0 @@
#!/usr/bin/env python3
from flask_rabbit_broker import FlaskRabbitBroker
"""
Rabbit Broker instance
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "rabbit_broker_instance"
__version__text__ = "1"
mq = FlaskRabbitBroker()

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python3
import pika
from config import *
"""
Rabbitmq setup
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "endpoints"
__version__text__ = "1"
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials))
rabbitmq_channel = rabbitmq.channel()
rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout')

View File

@ -5,12 +5,11 @@ from xeger import Xeger
from flask_restful import Resource
from flask import request
import requests
import pika
from db import db
from models import SampleMetadata
from schemas import SampleSchema, SampleMetadataSchema
from config import *
from rabbit_broker_instance import mq
from fpika import fpika
"""
Flask Restful endpoints
@ -77,7 +76,10 @@ class SampleResource(Resource):
soundfile,
soundfile.content_type,
{'Content-Length': soundfile.content_length})}).raise_for_status()
mq.send(json.dumps({'tag': generated_tag}))
ch = fpika.channel()
ch.basic_publish(exchange=RABBITMQ_EXCHANGE, routing_key='feature',
body=json.dumps({'tag': generated_tag}).encode('UTF-8'))
fpika.return_channel(ch)
except Exception as e:
LOGGER.exception(e)
db.session.rollback()