diff --git a/src/config.py b/src/config.py index 6c6a8ba..51746bf 100644 --- a/src/config.py +++ b/src/config.py @@ -19,7 +19,8 @@ class Config: RELEASE_ID = os.environ.get("RELEASE_ID", "test") RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev") - EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev") + EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta") + EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache") FLASK_PIKA_PARAMS = { 'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"), diff --git a/src/magic_amqp.py b/src/magic_amqp.py index d550aea..de91dd6 100644 --- a/src/magic_amqp.py +++ b/src/magic_amqp.py @@ -26,7 +26,8 @@ class MagicAMQP: def init_app(self, app: Flask): self.app = app self.app.config.setdefault('FLASK_PIKA_PARAMS', {}) - self.app.config.setdefault('EXCHANGE_NAME', None) + self.app.config.setdefault('EXCHANGE_NAME_META', None) + self.app.config.setdefault('EXCHANGE_NAME_CACHE', None) self._credentials = pika.PlainCredentials( app.config['FLASK_PIKA_PARAMS']['username'], @@ -45,7 +46,11 @@ class MagicAMQP: ) self._pika_channel = self._pika_connection.channel() self._pika_channel.exchange_declare( - exchange=self.app.config['EXCHANGE_NAME'], + exchange=self.app.config['EXCHANGE_NAME_META'], + exchange_type='direct' + ) + self._pika_channel.exchange_declare( + exchange=self.app.config['EXCHANGE_NAME_CACHE'], exchange_type='direct' ) @@ -70,7 +75,7 @@ class MagicAMQP: if total_time > 1: self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") - def publish(self, payload=None): + def _publish(self, exchange: str, payload=None): """ Publish a simple json serialized message to the configured queue. If the connection is broken, then this call will block until the connection is restored @@ -88,7 +93,7 @@ class MagicAMQP: while True: try: self._pika_channel.basic_publish( - exchange=self.app.config['EXCHANGE_NAME'], + exchange=exchange, routing_key='feature', body=json.dumps(payload).encode('UTF-8') ) @@ -119,6 +124,12 @@ class MagicAMQP: if total_time > 0.4: self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") + def publish_cache(self, payload=None): + return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload) + + def publish_meta(self, payload=None): + return self._publish(self.app.config['EXCHANGE_NAME_META'], payload) + def is_healthy(self) -> bool: with self._lock: if not self._pika_channel: diff --git a/src/resources.py b/src/resources.py index 035ec91..1deaebb 100644 --- a/src/resources.py +++ b/src/resources.py @@ -62,8 +62,8 @@ class SampleResource(Resource): break with opentracing.tracer.start_active_span('publishMetaMessage'): - try: # TODO change exchange - magic_amqp.publish( + try: + magic_amqp.publish_meta( { 'tag': generated_tag, 'timestamp': datetime.now().isoformat(), @@ -84,8 +84,11 @@ class SampleResource(Resource): # Announce only after the data is successfully committed with opentracing.tracer.start_active_span('publishInCacheMessage'): - try: # TODO change exchange - magic_amqp.publish({'tag': generated_tag}) + try: + magic_amqp.publish_cache({ + 'tag': generated_tag, + 'mime_type': soundfile.mimetype + }) except Exception as e: current_app.logger.exception(e) return abort(500, f"AMQP Publish error: {str(e)}")