Added tracing to classification
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Pünkösd Marcell 2021-07-30 15:24:57 +02:00
parent 0245cd7b6a
commit 1acdd6d21c
3 changed files with 44 additions and 30 deletions

View File

@ -4,6 +4,7 @@ import tempfile
import os import os
import os.path import os.path
import shutil import shutil
import opentracing
import librosa import librosa
import librosa.display import librosa.display
@ -83,10 +84,13 @@ class Classifier(object):
return predicted_class_name, labeled_predictions return predicted_class_name, labeled_predictions
def predict(self, wav_filename: str) -> Tuple[str, dict]: def predict(self, wav_filename: str, span: opentracing.span.Span) -> Tuple[str, dict]:
directory, _ = self.create_spectrogram(wav_filename) with opentracing.tracer.start_span('createSpectrogram', child_of=span):
directory, _ = self.create_spectrogram(wav_filename)
with opentracing.tracer.start_span('runPredictor', child_of=span):
result = self._run_predictor(directory)
result = self._run_predictor(directory)
shutil.rmtree(directory) # The image is no longer needed shutil.rmtree(directory) # The image is no longer needed
return result return result

View File

@ -18,29 +18,33 @@ class MagicDoer:
requests_session = SessionTracing(propagate=True) requests_session = SessionTracing(propagate=True)
@classmethod @classmethod
def run_everything(cls, parameters: dict) -> dict: def run_everything(cls, parameters: dict, span: opentracing.span.Span) -> dict:
tag = parameters['tag'] tag = parameters['tag']
sample_file_handle, sample_file_path = tempfile.mkstemp(prefix=f"{tag}_", suffix=".wav", dir="/dev/shm") sample_file_handle, sample_file_path = tempfile.mkstemp(prefix=f"{tag}_", suffix=".wav", dir="/dev/shm")
span.log_kv({'event': 'sampleFileOpened', 'sampleTag': tag})
response = None response = None
try: try:
# Download Sample with opentracing.tracer.start_span('downloadSample', child_of=span):
object_path = urljoin(Config.STORAGE_SERVICE_URL, f"object/{tag}") # Download Sample
object_path = urljoin(Config.STORAGE_SERVICE_URL, f"object/{tag}")
logging.info(f"Downloading sample: {tag} from {object_path}") logging.info(f"Downloading sample: {tag} from {object_path}")
r = cls.requests_session.get(object_path) r = cls.requests_session.get(object_path)
with open(sample_file_handle, 'wb') as f: with open(sample_file_handle, 'wb') as f:
f.write(r.content) f.write(r.content)
logging.debug(f"Downloaded sample to {sample_file_path}") logging.debug(f"Downloaded sample to {sample_file_path}")
# Get a classifier that uses the default model with opentracing.tracer.start_span('loadClassifier', child_of=span):
model_details, classifier = cls.classifier_cache.get_default_classifier() # Get a classifier that uses the default model
model_details, classifier = cls.classifier_cache.get_default_classifier()
# do the majic with opentracing.tracer.start_span('runClassifier', child_of=span) as child_span:
classification_start_time = time.time() # do the majic
predicted_class_name, labeled_predictions = classifier.predict(sample_file_path) classification_start_time = time.time()
classification_duration = time.time() - classification_start_time predicted_class_name, labeled_predictions = classifier.predict(sample_file_path, child_span)
classification_duration = time.time() - classification_start_time
response = { response = {
"tag": tag, "tag": tag,
@ -57,6 +61,8 @@ class MagicDoer:
except FileNotFoundError: except FileNotFoundError:
pass pass
span.log_kv({'event': 'sampleFileDeleted', 'sampleTag': tag})
if not response: if not response:
logging.error("Something went wrong during classification!") logging.error("Something went wrong during classification!")
else: else:

View File

@ -17,21 +17,25 @@ from magic_doer import MagicDoer
def message_callback(channel, method, properties, body): def message_callback(channel, method, properties, body):
try: with opentracing.tracer.start_span('messageHandling') as span:
msg = json.loads(body.decode('utf-8')) try:
except (UnicodeDecodeError, json.JSONDecodeError) as e: msg = json.loads(body.decode('utf-8'))
logging.warning(f"Invalid message recieved: {e}") except (UnicodeDecodeError, json.JSONDecodeError) as e:
return logging.warning(f"Invalid message recieved: {e}")
return
results = MagicDoer.run_everything(msg) # <- This is where the magic happens span.log_kv({'event': 'messageParsed', 'sampleTag': msg['tag']})
if results: with opentracing.tracer.start_span('runAlgorithm', child_of=span) as child_span:
channel.basic_publish( results = MagicDoer.run_everything(msg, child_span) # <- This is where the magic happens
exchange=Config.PIKA_OUTPUT_EXCHANGE,
routing_key='classification-result', if results:
body=json.dumps(results).encode("utf-8") channel.basic_publish(
) exchange=Config.PIKA_OUTPUT_EXCHANGE,
channel.basic_ack(delivery_tag=method.delivery_tag) routing_key='classification-result',
body=json.dumps(results).encode("utf-8")
)
channel.basic_ack(delivery_tag=method.delivery_tag)
def main(): def main():