From 1acdd6d21c3bb0f7e4e3244e5876390e7a549ea4 Mon Sep 17 00:00:00 2001 From: marcsello Date: Fri, 30 Jul 2021 15:24:57 +0200 Subject: [PATCH] Added tracing to classification --- cnn_classification_service/cnn_classifier.py | 10 ++++-- cnn_classification_service/magic_doer.py | 34 ++++++++++++-------- cnn_classification_service/main.py | 30 +++++++++-------- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/cnn_classification_service/cnn_classifier.py b/cnn_classification_service/cnn_classifier.py index 35a3acd..7d35684 100644 --- a/cnn_classification_service/cnn_classifier.py +++ b/cnn_classification_service/cnn_classifier.py @@ -4,6 +4,7 @@ import tempfile import os import os.path import shutil +import opentracing import librosa import librosa.display @@ -83,10 +84,13 @@ class Classifier(object): return predicted_class_name, labeled_predictions - def predict(self, wav_filename: str) -> Tuple[str, dict]: - directory, _ = self.create_spectrogram(wav_filename) + def predict(self, wav_filename: str, span: opentracing.span.Span) -> Tuple[str, dict]: + with opentracing.tracer.start_span('createSpectrogram', child_of=span): + directory, _ = self.create_spectrogram(wav_filename) + + with opentracing.tracer.start_span('runPredictor', child_of=span): + result = self._run_predictor(directory) - result = self._run_predictor(directory) shutil.rmtree(directory) # The image is no longer needed return result diff --git a/cnn_classification_service/magic_doer.py b/cnn_classification_service/magic_doer.py index 4a191da..a858050 100644 --- a/cnn_classification_service/magic_doer.py +++ b/cnn_classification_service/magic_doer.py @@ -18,29 +18,33 @@ class MagicDoer: requests_session = SessionTracing(propagate=True) @classmethod - def run_everything(cls, parameters: dict) -> dict: + def run_everything(cls, parameters: dict, span: opentracing.span.Span) -> dict: tag = parameters['tag'] sample_file_handle, sample_file_path = tempfile.mkstemp(prefix=f"{tag}_", suffix=".wav", dir="/dev/shm") + span.log_kv({'event': 'sampleFileOpened', 'sampleTag': tag}) response = None try: - # Download Sample - object_path = urljoin(Config.STORAGE_SERVICE_URL, f"object/{tag}") + with opentracing.tracer.start_span('downloadSample', child_of=span): + # Download Sample + object_path = urljoin(Config.STORAGE_SERVICE_URL, f"object/{tag}") - logging.info(f"Downloading sample: {tag} from {object_path}") - r = cls.requests_session.get(object_path) - with open(sample_file_handle, 'wb') as f: - f.write(r.content) + logging.info(f"Downloading sample: {tag} from {object_path}") + r = cls.requests_session.get(object_path) + with open(sample_file_handle, 'wb') as f: + f.write(r.content) - logging.debug(f"Downloaded sample to {sample_file_path}") + logging.debug(f"Downloaded sample to {sample_file_path}") - # Get a classifier that uses the default model - model_details, classifier = cls.classifier_cache.get_default_classifier() + with opentracing.tracer.start_span('loadClassifier', child_of=span): + # Get a classifier that uses the default model + model_details, classifier = cls.classifier_cache.get_default_classifier() - # do the majic - classification_start_time = time.time() - predicted_class_name, labeled_predictions = classifier.predict(sample_file_path) - classification_duration = time.time() - classification_start_time + with opentracing.tracer.start_span('runClassifier', child_of=span) as child_span: + # do the majic + classification_start_time = time.time() + predicted_class_name, labeled_predictions = classifier.predict(sample_file_path, child_span) + classification_duration = time.time() - classification_start_time response = { "tag": tag, @@ -57,6 +61,8 @@ class MagicDoer: except FileNotFoundError: pass + span.log_kv({'event': 'sampleFileDeleted', 'sampleTag': tag}) + if not response: logging.error("Something went wrong during classification!") else: diff --git a/cnn_classification_service/main.py b/cnn_classification_service/main.py index 6930dac..efa51e5 100644 --- a/cnn_classification_service/main.py +++ b/cnn_classification_service/main.py @@ -17,21 +17,25 @@ from magic_doer import MagicDoer def message_callback(channel, method, properties, body): - try: - msg = json.loads(body.decode('utf-8')) - except (UnicodeDecodeError, json.JSONDecodeError) as e: - logging.warning(f"Invalid message recieved: {e}") - return + with opentracing.tracer.start_span('messageHandling') as span: + try: + msg = json.loads(body.decode('utf-8')) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + logging.warning(f"Invalid message recieved: {e}") + return - results = MagicDoer.run_everything(msg) # <- This is where the magic happens + span.log_kv({'event': 'messageParsed', 'sampleTag': msg['tag']}) - if results: - channel.basic_publish( - exchange=Config.PIKA_OUTPUT_EXCHANGE, - routing_key='classification-result', - body=json.dumps(results).encode("utf-8") - ) - channel.basic_ack(delivery_tag=method.delivery_tag) + with opentracing.tracer.start_span('runAlgorithm', child_of=span) as child_span: + results = MagicDoer.run_everything(msg, child_span) # <- This is where the magic happens + + if results: + channel.basic_publish( + exchange=Config.PIKA_OUTPUT_EXCHANGE, + routing_key='classification-result', + body=json.dumps(results).encode("utf-8") + ) + channel.basic_ack(delivery_tag=method.delivery_tag) def main():