Exchanges are magic
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Pünkösd Marcell 2021-08-17 18:05:32 +02:00
parent 2c0e6ec7d7
commit a118b79512
3 changed files with 24 additions and 9 deletions

View File

@ -19,7 +19,8 @@ class Config:
RELEASE_ID = os.environ.get("RELEASE_ID", "test") RELEASE_ID = os.environ.get("RELEASE_ID", "test")
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev") RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev") EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
FLASK_PIKA_PARAMS = { FLASK_PIKA_PARAMS = {
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"), 'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),

View File

@ -26,7 +26,8 @@ class MagicAMQP:
def init_app(self, app: Flask): def init_app(self, app: Flask):
self.app = app self.app = app
self.app.config.setdefault('FLASK_PIKA_PARAMS', {}) self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
self.app.config.setdefault('EXCHANGE_NAME', None) self.app.config.setdefault('EXCHANGE_NAME_META', None)
self.app.config.setdefault('EXCHANGE_NAME_CACHE', None)
self._credentials = pika.PlainCredentials( self._credentials = pika.PlainCredentials(
app.config['FLASK_PIKA_PARAMS']['username'], app.config['FLASK_PIKA_PARAMS']['username'],
@ -45,7 +46,11 @@ class MagicAMQP:
) )
self._pika_channel = self._pika_connection.channel() self._pika_channel = self._pika_connection.channel()
self._pika_channel.exchange_declare( self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME'], exchange=self.app.config['EXCHANGE_NAME_META'],
exchange_type='direct'
)
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_CACHE'],
exchange_type='direct' exchange_type='direct'
) )
@ -70,7 +75,7 @@ class MagicAMQP:
if total_time > 1: if total_time > 1:
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
def publish(self, payload=None): def _publish(self, exchange: str, payload=None):
""" """
Publish a simple json serialized message to the configured queue. Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored If the connection is broken, then this call will block until the connection is restored
@ -88,7 +93,7 @@ class MagicAMQP:
while True: while True:
try: try:
self._pika_channel.basic_publish( self._pika_channel.basic_publish(
exchange=self.app.config['EXCHANGE_NAME'], exchange=exchange,
routing_key='feature', routing_key='feature',
body=json.dumps(payload).encode('UTF-8') body=json.dumps(payload).encode('UTF-8')
) )
@ -119,6 +124,12 @@ class MagicAMQP:
if total_time > 0.4: if total_time > 0.4:
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
def publish_cache(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
def publish_meta(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
def is_healthy(self) -> bool: def is_healthy(self) -> bool:
with self._lock: with self._lock:
if not self._pika_channel: if not self._pika_channel:

View File

@ -62,8 +62,8 @@ class SampleResource(Resource):
break break
with opentracing.tracer.start_active_span('publishMetaMessage'): with opentracing.tracer.start_active_span('publishMetaMessage'):
try: # TODO change exchange try:
magic_amqp.publish( magic_amqp.publish_meta(
{ {
'tag': generated_tag, 'tag': generated_tag,
'timestamp': datetime.now().isoformat(), 'timestamp': datetime.now().isoformat(),
@ -84,8 +84,11 @@ class SampleResource(Resource):
# Announce only after the data is successfully committed # Announce only after the data is successfully committed
with opentracing.tracer.start_active_span('publishInCacheMessage'): with opentracing.tracer.start_active_span('publishInCacheMessage'):
try: # TODO change exchange try:
magic_amqp.publish({'tag': generated_tag}) magic_amqp.publish_cache({
'tag': generated_tag,
'mime_type': soundfile.mimetype
})
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}") return abort(500, f"AMQP Publish error: {str(e)}")