From 00e9d02478c1b4cee5d3e6631c0363e02d598270 Mon Sep 17 00:00:00 2001 From: marcsello Date: Tue, 10 Aug 2021 14:40:57 +0200 Subject: [PATCH] Added more spans --- src/resources.py | 170 ++++++++++++++++++++++++++--------------------- 1 file changed, 93 insertions(+), 77 deletions(-) diff --git a/src/resources.py b/src/resources.py index dbcdd27..d8af357 100644 --- a/src/resources.py +++ b/src/resources.py @@ -12,6 +12,8 @@ from db import db from influxus import influx_db from models import SampleMetadata from schemas import SampleSchema, SampleMetadataSchema +from requests_opentracing import SessionTracing +import opentracing """ Flask Restful endpoints @@ -37,100 +39,110 @@ class SampleResource(Resource): Post request send to the endpoint :return: """ - if 'file' not in request.files: - return abort(400, "no file found") - else: - soundfile = request.files['file'] + with opentracing.tracer.start_active_span('parseAndValidate'): + if 'file' not in request.files: + return abort(400, "no file found") + else: + soundfile = request.files['file'] - if 'description' not in request.form: - return abort(400, "no description found") - else: - description = request.form.get("description") + if 'description' not in request.form: + return abort(400, "no description found") + else: + description = request.form.get("description") - if soundfile.content_type != 'audio/wave': - current_app.logger.info(f"Input file was not WAV.") - return abort(415, 'Input file not a wave file.') - try: - desc = self.sampleschema.loads(description) - except Exception as e: - current_app.logger.exception(e) - return abort(417, 'Input JSON schema invalid') + if soundfile.content_type != 'audio/wave': + current_app.logger.info(f"Input file was not WAV.") + return abort(415, 'Input file not a wave file.') + try: + desc = self.sampleschema.loads(description) + except Exception as e: + current_app.logger.exception(e) + return abort(417, 'Input JSON schema invalid') - xeger = Xeger(limit=30) - while True: - generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] - if len(generated_tag) > 2: # Ensure minimum length - break + with opentracing.tracer.start_active_span('generateTag'): + xeger = Xeger(limit=30) + while True: + generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] + if len(generated_tag) > 2: # Ensure minimum length + break # Handle mega-autismo-cliento soundfile_content_length = soundfile.content_length if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH - current_app.logger.debug( - "The uploader did not provide content-length for the sound file... Calculating manually..." + with opentracing.tracer.start_active_span( + 'calculateContentLength'): # In an ideal scenario this span is missing + current_app.logger.debug( + "The uploader did not provide content-length for the sound file... Calculating manually..." + ) + # So, this is a seekable stream, so we just seek to the end + old_ptr = soundfile.tell() + soundfile.seek(0, 2) + # Check where is the end (= content length) + soundfile_content_length = soundfile.tell() + # Seek back to where the stream was + soundfile.seek(old_ptr, 0) + + # It's insane, that you can not set this field in curl + + with opentracing.tracer.start_active_span('sqlalchemy.create'): + record = SampleMetadata( + device_id=desc['device_id'], + device_date=desc['date'], + tag=generated_tag ) - # So, this is a seekable stream, so we just seek to the end - old_ptr = soundfile.tell() - soundfile.seek(0, 2) - # Check where is the end (= content length) - soundfile_content_length = soundfile.tell() - # Seek back to where the stream was - soundfile.seek(old_ptr, 0) + db.session.add(record) - # It's insane, that you can not set this field in curl + with opentracing.tracer.start_active_span('uploadToStorageService'): + files = { + 'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), + 'soundFile': ( + 'wave.wav', + soundfile, + soundfile.content_type, + {'Content-Length': soundfile_content_length})} - record = SampleMetadata( - device_id=desc['device_id'], - device_date=desc['date'], - tag=generated_tag - ) - db.session.add(record) + upload_started = time.time() + r = SessionTracing(propagate=True).post( + f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", + files=files + ) + upload_time = time.time() - upload_started - files = { - 'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), - 'soundFile': ( - 'wave.wav', - soundfile, - soundfile.content_type, - {'Content-Length': soundfile_content_length})} + if upload_time > 0.8: + current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") - upload_started = time.time() - r = requests.post( - f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", - files=files - ) - upload_time = time.time() - upload_started + if r.status_code not in [200, 201]: + return abort(500, + f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") - if upload_time > 0.8: - current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") - - if r.status_code not in [200, 201]: - return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") - - db.session.commit() + with opentracing.tracer.start_active_span('sqlalchemy.commit'): + db.session.commit() # Announce only after the data is successfully committed - try: - magic_amqp.publish({'tag': generated_tag}) - except Exception as e: - current_app.logger.exception(e) - return abort(500, f"AMQP Publish error: {str(e)}") + with opentracing.tracer.start_active_span('amqp.publish'): + try: + magic_amqp.publish({'tag': generated_tag}) + except Exception as e: + current_app.logger.exception(e) + return abort(500, f"AMQP Publish error: {str(e)}") # metrics if current_app.config['ENABLE_INFLUXDB']: - influx_db.write_points( - [ - { - 'time': datetime.now(tz=tzlocal.get_localzone()), - 'measurement': 'cloudinput', - 'tags': { - 'device': desc['device_id'] - }, - 'fields': { - 'bruh': 1.0 + with opentracing.tracer.start_active_span('influxdb.write_points'): + influx_db.write_points( + [ + { + 'time': datetime.now(tz=tzlocal.get_localzone()), + 'measurement': 'cloudinput', + 'tags': { + 'device': desc['device_id'] + }, + 'fields': { + 'bruh': 1.0 + } } - } - ] - ) + ] + ) return {"tag": generated_tag}, 200 @@ -139,7 +151,9 @@ class SampleResource(Resource): Get all stored items :return: """ - samples = SampleMetadata.query.all() + with opentracing.tracer.start_active_span('sqlalchemy.select'): + samples = SampleMetadata.query.all() + return self.samplemetadataschema.dump(list(samples)), 200 @@ -156,5 +170,7 @@ class SampleParameterResource(Resource): :param tag: :return: """ - sample = SampleMetadata.query.filter_by(tag=tag).first_or_404() + with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}): + sample = SampleMetadata.query.filter_by(tag=tag).first_or_404() + return self.samplemetadataschema.dump(sample), 200