From 9c3f8c65fba260d485a6d247891b7580cc43b007 Mon Sep 17 00:00:00 2001 From: marcsello Date: Tue, 10 Aug 2021 15:13:23 +0200 Subject: [PATCH] Added injetion to outgoing messages --- src/magic_amqp.py | 75 +++++++++++++++++++++++++++-------------------- src/resources.py | 2 +- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/src/magic_amqp.py b/src/magic_amqp.py index 36069ed..f719183 100644 --- a/src/magic_amqp.py +++ b/src/magic_amqp.py @@ -5,6 +5,10 @@ import pika.exceptions import json import time +import opentracing +from opentracing.ext import tags +from opentracing.propagation import Format + class MagicAMQP: """ @@ -72,42 +76,49 @@ class MagicAMQP: Publish a simple json serialized message to the configured queue. If the connection is broken, then this call will block until the connection is restored """ - lock_start = time.time() - with self._lock: - lock_acquire_time = time.time() - lock_start - if lock_acquire_time >= 0.2: - self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec") - tries = 0 - while True: - try: - self._pika_channel.basic_publish( - exchange=self.app.config['EXCHANGE_NAME'], - routing_key='feature', - body=json.dumps(payload).encode('UTF-8') - ) - break # message sent successfully - except pika.exceptions.AMQPConnectionError as e: - self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)") + span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER} + with opentracing.tracer.start_active_span('magic_amqp.publish', tags=span_tags) as scope: + opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, payload) + lock_start = time.time() + with self._lock: + scope.span.log_kv({'event': 'lockAcquired'}) + lock_acquire_time = time.time() - lock_start + if lock_acquire_time >= 0.2: + self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec") + tries = 0 + while True: + try: + self._pika_channel.basic_publish( + exchange=self.app.config['EXCHANGE_NAME'], + routing_key='feature', + body=json.dumps(payload).encode('UTF-8') + ) + self.app.logger.debug(f"Published: {payload}") + break # message sent successfully + except pika.exceptions.AMQPConnectionError as e: + scope.span.log_kv({'event': 'connectionError', 'error': str(e)}) + self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)") - if tries > 30: - raise # just give up + if tries > 30: + raise # just give up - while True: - try: - self._reconnect_ampq() - break - except pika.exceptions.AMQPConnectionError as e: - self.app.logger.warning(f"Connection error during reconnection: {e} (attempting reconnect)") - tries += 1 + while True: + try: + self._reconnect_ampq() + break + except pika.exceptions.AMQPConnectionError as e: + self.app.logger.warning( + f"Connection error during reconnection: {e} (attempting reconnect)") + tries += 1 - if tries > 30: - raise # just give up + if tries > 30: + raise # just give up - if tries > 10: - time.sleep(2) - total_time = time.time() - lock_start - if total_time > 0.4: - self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") + if tries > 10: + time.sleep(2) + total_time = time.time() - lock_start + if total_time > 0.4: + self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") def is_healthy(self) -> bool: with self._lock: diff --git a/src/resources.py b/src/resources.py index d8af357..8f7a9ae 100644 --- a/src/resources.py +++ b/src/resources.py @@ -119,7 +119,7 @@ class SampleResource(Resource): db.session.commit() # Announce only after the data is successfully committed - with opentracing.tracer.start_active_span('amqp.publish'): + with opentracing.tracer.start_active_span('publishMessage'): try: magic_amqp.publish({'tag': generated_tag}) except Exception as e: