All checks were successful
continuous-integration/drone/push Build is passing
100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
import pika
|
|
|
|
"""
|
|
Flask Rabbit Broker
|
|
"""
|
|
|
|
__author__ = '@tormakris'
|
|
__copyright__ = "Copyright 2020, Birbnetes Team"
|
|
__module_name__ = "flask_rabbit_broker"
|
|
__version__text__ = "1"
|
|
|
|
|
|
class FlaskRabbitBroker:
|
|
"""Message Broker using RabbitMQ middleware"""
|
|
|
|
def __init__(self, app=None):
|
|
"""
|
|
Create a new instance of Broker Rabbit by using
|
|
the given parameters to connect to RabbitMQ.
|
|
"""
|
|
self.app = app
|
|
self.exchange_name = None
|
|
self.username = None
|
|
self.password = None
|
|
self.rabbitmq_host = None
|
|
self.routing_key = None
|
|
self.connection = None
|
|
self.channel = None
|
|
self.exchange = None
|
|
|
|
def init_app(self, app) -> None:
|
|
"""
|
|
Init the broker with the current application context
|
|
:param app: application context
|
|
:return:
|
|
"""
|
|
self.username = app.context.get('RABBIT_USERNAME')
|
|
self.password = app.context.get('RABBIT_PASSWORD')
|
|
self.rabbitmq_host = app.context.get('RABBIT_HOST')
|
|
self.exchange_name = app.context.get('EXCHANGE_NAME')
|
|
self.routing_key = app.context.get('RABBIT_ROUTING_KEY')
|
|
self.init_connection(timeout=5)
|
|
self.init_exchange()
|
|
|
|
def init_connection(self, timeout: int = 5) -> None:
|
|
""""
|
|
Init RabbitMQ connection
|
|
:param timeout: timeout of connection
|
|
:return:
|
|
"""
|
|
credentials = pika.PlainCredentials(self.username, self.password)
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host,
|
|
credentials=credentials,
|
|
heartbeat=0,
|
|
socket_timeout=timeout))
|
|
|
|
def close_connection(self) -> None:
|
|
self.connection.close()
|
|
|
|
def init_exchange(self) -> None:
|
|
"""
|
|
Init the exchange use to send messages
|
|
:return:
|
|
"""
|
|
channel = self.connection.channel()
|
|
try:
|
|
exchange = channel.exchange_declare(exchange=self.exchange_name,
|
|
exchange_type='fanout',
|
|
durable=True,
|
|
auto_delete=False)
|
|
finally:
|
|
channel.close()
|
|
|
|
def register_callback(self, callback) -> None:
|
|
"""
|
|
Register a callback.
|
|
:param callback:
|
|
:return:
|
|
"""
|
|
channel = self.connection.channel()
|
|
queue = channel.queue_declare(durable=True, auto_delete=False)
|
|
queue.bind(self.exchange)
|
|
queue.basic_consume(callback, no_ack=True)
|
|
|
|
def send(self, message: str) -> None:
|
|
"""
|
|
Sends a message to the declared exchange.
|
|
:param message:
|
|
:return:
|
|
"""
|
|
channel = self.connection.channel()
|
|
try:
|
|
channel.basic_publish(
|
|
exchange=self.exchange_name,
|
|
routing_key=self.routing_key,
|
|
body=message.encode('utf-8'))
|
|
finally:
|
|
channel.close()
|