This commit is contained in:
parent
d76b0b8a9e
commit
23197e0ba8
@ -8,6 +8,7 @@ from sentry_sdk.integrations.flask import FlaskIntegration
|
||||
from config import *
|
||||
from db import db
|
||||
from marshm import ma
|
||||
from rabbit_broker_instance import mq
|
||||
from resources import SampleResource, SampleParameterResource
|
||||
|
||||
"""
|
||||
@ -31,9 +32,16 @@ if SENTRY_DSN:
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
|
||||
app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME
|
||||
app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD
|
||||
app.config['RABBIT_HOST'] = RABBITMQ_HOST
|
||||
app.config['RABBIT_ROUTING_KEY'] = "feature"
|
||||
app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE
|
||||
|
||||
api = Api(app)
|
||||
db.init_app(app)
|
||||
ma.init_app(app)
|
||||
mq.init_app(app)
|
||||
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
|
87
src/flask_rabbit_broker.py
Normal file
87
src/flask_rabbit_broker.py
Normal file
@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
import pika
|
||||
|
||||
"""
|
||||
Flask Rabbit Broker
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "flask_rabbit_broker"
|
||||
__version__text__ = "1"
|
||||
|
||||
|
||||
class FlaskRabbitBroker:
|
||||
"""Message Broker using RabbitMQ middleware"""
|
||||
|
||||
def __init__(self, app=None):
|
||||
"""
|
||||
Create a new instance of Broker Rabbit by using
|
||||
the given parameters to connect to RabbitMQ.
|
||||
"""
|
||||
self.app = app
|
||||
self.exchange_name = None
|
||||
self.username = None
|
||||
self.password = None
|
||||
self.rabbitmq_host = None
|
||||
self.routing_key = None
|
||||
self.connection = None
|
||||
self.channel = None
|
||||
|
||||
def init_app(self, app) -> None:
|
||||
"""
|
||||
Init the broker with the current application context
|
||||
:param app: application context
|
||||
:return:
|
||||
"""
|
||||
self.username = app.context.get('RABBIT_USERNAME')
|
||||
self.password = app.context.get('RABBIT_PASSWORD')
|
||||
self.rabbitmq_host = app.context.get('RABBIT_HOST')
|
||||
self.exchange_name = app.context.get('EXCHANGE_NAME')
|
||||
self.routing_key = app.context.get('RABBIT_ROUTING_KEY')
|
||||
self.init_connection(timeout=5)
|
||||
self.init_exchange()
|
||||
|
||||
def init_connection(self, timeout: int = 5) -> None:
|
||||
""""
|
||||
Init RabbitMQ connection
|
||||
:param timeout: timeout of connection
|
||||
:return:
|
||||
"""
|
||||
credentials = pika.PlainCredentials(self.username, self.password)
|
||||
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host,
|
||||
credentials=credentials,
|
||||
heartbeat=0,
|
||||
socket_timeout=timeout))
|
||||
|
||||
def close_connection(self) -> None:
|
||||
self.connection.close()
|
||||
|
||||
def init_exchange(self) -> None:
|
||||
"""
|
||||
Init the exchange use to send messages
|
||||
:return:
|
||||
"""
|
||||
channel = self.connection.channel()
|
||||
try:
|
||||
channel.exchange_declare(exchange=self.exchange_name,
|
||||
exchange_type='fanout',
|
||||
durable=True,
|
||||
auto_delete=False)
|
||||
finally:
|
||||
channel.close()
|
||||
|
||||
def send(self, message: str) -> None:
|
||||
"""
|
||||
Sends a message to the declared exchange.
|
||||
:param message:
|
||||
:return:
|
||||
"""
|
||||
channel = self.connection.channel()
|
||||
try:
|
||||
channel.basic_publish(
|
||||
exchange=self.exchange_name,
|
||||
routing_key=self.routing_key,
|
||||
body=message.encode('utf-8'))
|
||||
finally:
|
||||
channel.close()
|
14
src/rabbit_broker_instance.py
Normal file
14
src/rabbit_broker_instance.py
Normal file
@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from flask_rabbit_broker import FlaskRabbitBroker
|
||||
|
||||
"""
|
||||
Rabbit Broker instance
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "rabbit_broker_instance"
|
||||
__version__text__ = "1"
|
||||
|
||||
mq = FlaskRabbitBroker()
|
@ -10,6 +10,7 @@ from db import db
|
||||
from models import SampleMetadata
|
||||
from schemas import SampleSchema, SampleMetadataSchema
|
||||
from config import *
|
||||
from rabbit_broker_instance import mq
|
||||
|
||||
"""
|
||||
Flask Restful endpoints
|
||||
@ -76,14 +77,7 @@ class SampleResource(Resource):
|
||||
soundfile,
|
||||
soundfile.content_type,
|
||||
{'Content-Length': soundfile.content_length})}).raise_for_status()
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
|
||||
rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials))
|
||||
rabbitmq_channel = rabbitmq.channel()
|
||||
rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout')
|
||||
rabbitmq_channel.basic_publish(
|
||||
exchange=RABBITMQ_EXCHANGE,
|
||||
routing_key='feature',
|
||||
body=json.dumps({'tag': generated_tag}).encode('utf-8'))
|
||||
mq.send(json.dumps({'tag': generated_tag}))
|
||||
except Exception as e:
|
||||
LOGGER.exception(e)
|
||||
db.session.rollback()
|
||||
|
Loading…
Reference in New Issue
Block a user