diff --git a/cnn_classification_service/main.py b/cnn_classification_service/main.py index c9c4d52..13dcd75 100644 --- a/cnn_classification_service/main.py +++ b/cnn_classification_service/main.py @@ -3,9 +3,10 @@ import time import jaeger_client import opentracing +from opentracing.ext import tags +from opentracing.propagation import Format import logging -import os import sys import pika import json @@ -19,15 +20,21 @@ from magic_doer import MagicDoer def message_callback(channel, method, properties, body): - with opentracing.tracer.start_active_span('main.handleMessage', finish_on_close=True) as scope: - try: - msg = json.loads(body.decode('utf-8')) - except (UnicodeDecodeError, json.JSONDecodeError) as e: - logging.warning(f"Invalid message recieved: {e}") - channel.basic_ack(delivery_tag=method.delivery_tag) # We don't want this to be requeue - return + try: + msg = json.loads(body.decode('utf-8')) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + logging.warning(f"Invalid message recieved: {e}") + channel.basic_ack(delivery_tag=method.delivery_tag) # We don't want this to be requeue + return - scope.span.log_kv({'event': 'messageParsed', 'sampleTag': msg['tag']}) + logging.debug(f"Handling message: {msg}") + + span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg) + span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER} + + with opentracing.tracer.start_active_span( + 'main.handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags + ) as scope: with opentracing.tracer.start_active_span('magicDoer.runEverything'): try: @@ -39,6 +46,8 @@ def message_callback(channel, method, properties, body): return if results: + opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, results) + logging.debug(f"Publishing message: {results}") channel.basic_publish( exchange=Config.PIKA_OUTPUT_EXCHANGE, routing_key='classification-result',