norbi-update #1

Merged
tormakris merged 9 commits from norbi-update into master 2021-08-18 13:47:03 +02:00
4 changed files with 36 additions and 47 deletions
Showing only changes of commit 7f987afa7a - Show all commits

View File

@ -14,7 +14,8 @@ tzdata
tzlocal tzlocal
apscheduler~=3.7.0 apscheduler~=3.7.0
flask-redis~=0.4.0
opentracing~=2.4.0 opentracing~=2.4.0
jaeger-client jaeger-client
requests-opentracing
Flask-Opentracing Flask-Opentracing

View File

@ -6,10 +6,11 @@ from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from healthcheck import HealthCheck from healthcheck import HealthCheck
from redis_client import redis_client
from config import Config from config import Config
from marshm import ma from marshm import ma
from influxus import influx_db from influxus import influx_db
from resources import SampleResource, SampleParameterResource from resources import SampleResource
from healthchecks import amqp_connection_status from healthchecks import amqp_connection_status
import atexit import atexit
@ -49,6 +50,8 @@ api = Api(app)
health = HealthCheck() health = HealthCheck()
ma.init_app(app) ma.init_app(app)
redis_client.init_app(app)
# ampq magic stuff # ampq magic stuff
magic_amqp.init_app(app) magic_amqp.init_app(app)
@ -67,6 +70,7 @@ def init_db():
if Config.ENABLE_INFLUXDB: if Config.ENABLE_INFLUXDB:
influx_db.database.create(Config.INFLUXDB_DATABASE) influx_db.database.create(Config.INFLUXDB_DATABASE)
# Setup tracing # Setup tracing
def initialize_tracer(): def initialize_tracer():
app.logger.info("Initializing jaeger...") app.logger.info("Initializing jaeger...")
@ -88,6 +92,7 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
if __name__ != '__main__': if __name__ != '__main__':
import logging import logging
gunicorn_logger = logging.getLogger('gunicorn.error') gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level) app.logger.setLevel(gunicorn_logger.level)

4
src/redis_client.py Normal file
View File

@ -0,0 +1,4 @@
#!/usr/bin/env python3
from flask_redis import FlaskRedis
redis_client = FlaskRedis()

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import io
import time
from datetime import datetime from datetime import datetime
import tzlocal import tzlocal
from xeger import Xeger from xeger import Xeger
@ -9,7 +8,7 @@ from flask import request, current_app, abort
from magic_amqp import magic_amqp from magic_amqp import magic_amqp
from influxus import influx_db from influxus import influx_db
from schemas import SampleSchema from schemas import SampleSchema
from requests_opentracing import SessionTracing from redis_client import redis_client
import opentracing import opentracing
""" """
@ -44,13 +43,13 @@ class SampleResource(Resource):
if 'description' not in request.form: if 'description' not in request.form:
return abort(400, "no description found") return abort(400, "no description found")
else: else:
description = request.form.get("description") description_raw = request.form.get("description")
if soundfile.content_type != 'audio/wave': if soundfile.content_type != 'audio/wave':
current_app.logger.info(f"Input file was not WAV.") current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.') return abort(415, 'Input file not a wave file.')
try: try:
desc = self.sampleschema.loads(description) desc = self.sampleschema.loads(description_raw)
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
return abort(417, 'Input JSON schema invalid') return abort(417, 'Input JSON schema invalid')
@ -62,50 +61,30 @@ class SampleResource(Resource):
if len(generated_tag) > 2: # Ensure minimum length if len(generated_tag) > 2: # Ensure minimum length
break break
# Handle mega-autismo-cliento with opentracing.tracer.start_active_span('publishMetaMessage'):
soundfile_content_length = soundfile.content_length try: # TODO change exchange
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH magic_amqp.publish(
with opentracing.tracer.start_active_span( {
'calculateContentLength'): # In an ideal scenario this span is missing 'tag': generated_tag,
current_app.logger.debug( 'timestamp': datetime.now().isoformat(),
"The uploader did not provide content-length for the sound file... Calculating manually..." 'device_id': desc['device_id'],
'device_date': desc['date'].isoformat()
}
) )
# So, this is a seekable stream, so we just seek to the end except Exception as e:
old_ptr = soundfile.tell() current_app.logger.exception(e)
soundfile.seek(0, 2) return abort(500, f"AMQP Publish error: {str(e)}")
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
# It's insane, that you can not set this field in curl with opentracing.tracer.start_active_span('readSampleToMemory'):
buf = io.BytesIO()
soundfile.save(buf)
with opentracing.tracer.start_active_span('uploadToStorageService'): with opentracing.tracer.start_active_span('putToCache'):
files = { redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile_content_length})}
upload_started = time.time()
r = SessionTracing(propagate=True).post(
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
files=files
)
upload_time = time.time() - upload_started
if upload_time > 0.8:
current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
if r.status_code not in [200, 201]:
return abort(500,
f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
# Announce only after the data is successfully committed # Announce only after the data is successfully committed
with opentracing.tracer.start_active_span('publishMessage'): with opentracing.tracer.start_active_span('publishInCacheMessage'):
try: try: # TODO change exchange
magic_amqp.publish({'tag': generated_tag}) magic_amqp.publish({'tag': generated_tag})
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
@ -129,4 +108,4 @@ class SampleResource(Resource):
] ]
) )
return {"tag": generated_tag}, 200 return {"tag": generated_tag}, 200