Added injetion to outgoing messages
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Pünkösd Marcell 2021-08-10 15:13:23 +02:00
parent 00e9d02478
commit 9c3f8c65fb
2 changed files with 44 additions and 33 deletions

View File

@ -5,6 +5,10 @@ import pika.exceptions
import json import json
import time import time
import opentracing
from opentracing.ext import tags
from opentracing.propagation import Format
class MagicAMQP: class MagicAMQP:
""" """
@ -72,42 +76,49 @@ class MagicAMQP:
Publish a simple json serialized message to the configured queue. Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored If the connection is broken, then this call will block until the connection is restored
""" """
lock_start = time.time() span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER}
with self._lock: with opentracing.tracer.start_active_span('magic_amqp.publish', tags=span_tags) as scope:
lock_acquire_time = time.time() - lock_start opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, payload)
if lock_acquire_time >= 0.2: lock_start = time.time()
self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec") with self._lock:
tries = 0 scope.span.log_kv({'event': 'lockAcquired'})
while True: lock_acquire_time = time.time() - lock_start
try: if lock_acquire_time >= 0.2:
self._pika_channel.basic_publish( self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec")
exchange=self.app.config['EXCHANGE_NAME'], tries = 0
routing_key='feature', while True:
body=json.dumps(payload).encode('UTF-8') try:
) self._pika_channel.basic_publish(
break # message sent successfully exchange=self.app.config['EXCHANGE_NAME'],
except pika.exceptions.AMQPConnectionError as e: routing_key='feature',
self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)") body=json.dumps(payload).encode('UTF-8')
)
self.app.logger.debug(f"Published: {payload}")
break # message sent successfully
except pika.exceptions.AMQPConnectionError as e:
scope.span.log_kv({'event': 'connectionError', 'error': str(e)})
self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)")
if tries > 30: if tries > 30:
raise # just give up raise # just give up
while True: while True:
try: try:
self._reconnect_ampq() self._reconnect_ampq()
break break
except pika.exceptions.AMQPConnectionError as e: except pika.exceptions.AMQPConnectionError as e:
self.app.logger.warning(f"Connection error during reconnection: {e} (attempting reconnect)") self.app.logger.warning(
tries += 1 f"Connection error during reconnection: {e} (attempting reconnect)")
tries += 1
if tries > 30: if tries > 30:
raise # just give up raise # just give up
if tries > 10: if tries > 10:
time.sleep(2) time.sleep(2)
total_time = time.time() - lock_start total_time = time.time() - lock_start
if total_time > 0.4: if total_time > 0.4:
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
def is_healthy(self) -> bool: def is_healthy(self) -> bool:
with self._lock: with self._lock:

View File

@ -119,7 +119,7 @@ class SampleResource(Resource):
db.session.commit() db.session.commit()
# Announce only after the data is successfully committed # Announce only after the data is successfully committed
with opentracing.tracer.start_active_span('amqp.publish'): with opentracing.tracer.start_active_span('publishMessage'):
try: try:
magic_amqp.publish({'tag': generated_tag}) magic_amqp.publish({'tag': generated_tag})
except Exception as e: except Exception as e: