Added more spans

This commit is contained in:
Pünkösd Marcell 2021-08-10 14:40:57 +02:00
parent 98234f0e8a
commit 00e9d02478
1 changed files with 93 additions and 77 deletions

View File

@ -12,6 +12,8 @@ from db import db
from influxus import influx_db from influxus import influx_db
from models import SampleMetadata from models import SampleMetadata
from schemas import SampleSchema, SampleMetadataSchema from schemas import SampleSchema, SampleMetadataSchema
from requests_opentracing import SessionTracing
import opentracing
""" """
Flask Restful endpoints Flask Restful endpoints
@ -37,100 +39,110 @@ class SampleResource(Resource):
Post request send to the endpoint Post request send to the endpoint
:return: :return:
""" """
if 'file' not in request.files: with opentracing.tracer.start_active_span('parseAndValidate'):
return abort(400, "no file found") if 'file' not in request.files:
else: return abort(400, "no file found")
soundfile = request.files['file'] else:
soundfile = request.files['file']
if 'description' not in request.form: if 'description' not in request.form:
return abort(400, "no description found") return abort(400, "no description found")
else: else:
description = request.form.get("description") description = request.form.get("description")
if soundfile.content_type != 'audio/wave': if soundfile.content_type != 'audio/wave':
current_app.logger.info(f"Input file was not WAV.") current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.') return abort(415, 'Input file not a wave file.')
try: try:
desc = self.sampleschema.loads(description) desc = self.sampleschema.loads(description)
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
return abort(417, 'Input JSON schema invalid') return abort(417, 'Input JSON schema invalid')
xeger = Xeger(limit=30) with opentracing.tracer.start_active_span('generateTag'):
while True: xeger = Xeger(limit=30)
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] while True:
if len(generated_tag) > 2: # Ensure minimum length generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
break if len(generated_tag) > 2: # Ensure minimum length
break
# Handle mega-autismo-cliento # Handle mega-autismo-cliento
soundfile_content_length = soundfile.content_length soundfile_content_length = soundfile.content_length
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
current_app.logger.debug( with opentracing.tracer.start_active_span(
"The uploader did not provide content-length for the sound file... Calculating manually..." 'calculateContentLength'): # In an ideal scenario this span is missing
current_app.logger.debug(
"The uploader did not provide content-length for the sound file... Calculating manually..."
)
# So, this is a seekable stream, so we just seek to the end
old_ptr = soundfile.tell()
soundfile.seek(0, 2)
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
# It's insane, that you can not set this field in curl
with opentracing.tracer.start_active_span('sqlalchemy.create'):
record = SampleMetadata(
device_id=desc['device_id'],
device_date=desc['date'],
tag=generated_tag
) )
# So, this is a seekable stream, so we just seek to the end db.session.add(record)
old_ptr = soundfile.tell()
soundfile.seek(0, 2)
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
# It's insane, that you can not set this field in curl with opentracing.tracer.start_active_span('uploadToStorageService'):
files = {
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile_content_length})}
record = SampleMetadata( upload_started = time.time()
device_id=desc['device_id'], r = SessionTracing(propagate=True).post(
device_date=desc['date'], f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
tag=generated_tag files=files
) )
db.session.add(record) upload_time = time.time() - upload_started
files = { if upload_time > 0.8:
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile_content_length})}
upload_started = time.time() if r.status_code not in [200, 201]:
r = requests.post( return abort(500,
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
files=files
)
upload_time = time.time() - upload_started
if upload_time > 0.8: with opentracing.tracer.start_active_span('sqlalchemy.commit'):
current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") db.session.commit()
if r.status_code not in [200, 201]:
return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
db.session.commit()
# Announce only after the data is successfully committed # Announce only after the data is successfully committed
try: with opentracing.tracer.start_active_span('amqp.publish'):
magic_amqp.publish({'tag': generated_tag}) try:
except Exception as e: magic_amqp.publish({'tag': generated_tag})
current_app.logger.exception(e) except Exception as e:
return abort(500, f"AMQP Publish error: {str(e)}") current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
# metrics # metrics
if current_app.config['ENABLE_INFLUXDB']: if current_app.config['ENABLE_INFLUXDB']:
influx_db.write_points( with opentracing.tracer.start_active_span('influxdb.write_points'):
[ influx_db.write_points(
{ [
'time': datetime.now(tz=tzlocal.get_localzone()), {
'measurement': 'cloudinput', 'time': datetime.now(tz=tzlocal.get_localzone()),
'tags': { 'measurement': 'cloudinput',
'device': desc['device_id'] 'tags': {
}, 'device': desc['device_id']
'fields': { },
'bruh': 1.0 'fields': {
'bruh': 1.0
}
} }
} ]
] )
)
return {"tag": generated_tag}, 200 return {"tag": generated_tag}, 200
@ -139,7 +151,9 @@ class SampleResource(Resource):
Get all stored items Get all stored items
:return: :return:
""" """
samples = SampleMetadata.query.all() with opentracing.tracer.start_active_span('sqlalchemy.select'):
samples = SampleMetadata.query.all()
return self.samplemetadataschema.dump(list(samples)), 200 return self.samplemetadataschema.dump(list(samples)), 200
@ -156,5 +170,7 @@ class SampleParameterResource(Resource):
:param tag: :param tag:
:return: :return:
""" """
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404() with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
return self.samplemetadataschema.dump(sample), 200 return self.samplemetadataschema.dump(sample), 200