Added extracting and injecting context
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Pünkösd Marcell 2021-08-04 16:17:06 +02:00
parent 39d2f0300b
commit b094ab43fa
1 changed files with 18 additions and 9 deletions

View File

@ -3,9 +3,10 @@ import time
import jaeger_client import jaeger_client
import opentracing import opentracing
from opentracing.ext import tags
from opentracing.propagation import Format
import logging import logging
import os
import sys import sys
import pika import pika
import json import json
@ -19,15 +20,21 @@ from magic_doer import MagicDoer
def message_callback(channel, method, properties, body): def message_callback(channel, method, properties, body):
with opentracing.tracer.start_active_span('main.handleMessage', finish_on_close=True) as scope: try:
try: msg = json.loads(body.decode('utf-8'))
msg = json.loads(body.decode('utf-8')) except (UnicodeDecodeError, json.JSONDecodeError) as e:
except (UnicodeDecodeError, json.JSONDecodeError) as e: logging.warning(f"Invalid message recieved: {e}")
logging.warning(f"Invalid message recieved: {e}") channel.basic_ack(delivery_tag=method.delivery_tag) # We don't want this to be requeue
channel.basic_ack(delivery_tag=method.delivery_tag) # We don't want this to be requeue return
return
scope.span.log_kv({'event': 'messageParsed', 'sampleTag': msg['tag']}) logging.debug(f"Handling message: {msg}")
span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg)
span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER}
with opentracing.tracer.start_active_span(
'main.handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags
) as scope:
with opentracing.tracer.start_active_span('magicDoer.runEverything'): with opentracing.tracer.start_active_span('magicDoer.runEverything'):
try: try:
@ -39,6 +46,8 @@ def message_callback(channel, method, properties, body):
return return
if results: if results:
opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, results)
logging.debug(f"Publishing message: {results}")
channel.basic_publish( channel.basic_publish(
exchange=Config.PIKA_OUTPUT_EXCHANGE, exchange=Config.PIKA_OUTPUT_EXCHANGE,
routing_key='classification-result', routing_key='classification-result',