diff --git a/requirements.txt b/requirements.txt index da2506c..308fa0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,8 @@ tzdata tzlocal apscheduler~=3.7.0 +flask-redis~=0.4.0 + opentracing~=2.4.0 jaeger-client -requests-opentracing Flask-Opentracing \ No newline at end of file diff --git a/src/app.py b/src/app.py index 7399b1b..4a8e5a5 100644 --- a/src/app.py +++ b/src/app.py @@ -6,10 +6,11 @@ from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from healthcheck import HealthCheck +from redis_client import redis_client from config import Config from marshm import ma from influxus import influx_db -from resources import SampleResource, SampleParameterResource +from resources import SampleResource from healthchecks import amqp_connection_status import atexit @@ -49,6 +50,8 @@ api = Api(app) health = HealthCheck() ma.init_app(app) +redis_client.init_app(app) + # ampq magic stuff magic_amqp.init_app(app) @@ -67,6 +70,7 @@ def init_db(): if Config.ENABLE_INFLUXDB: influx_db.database.create(Config.INFLUXDB_DATABASE) + # Setup tracing def initialize_tracer(): app.logger.info("Initializing jaeger...") @@ -88,6 +92,7 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) if __name__ != '__main__': import logging + gunicorn_logger = logging.getLogger('gunicorn.error') app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) diff --git a/src/redis_client.py b/src/redis_client.py new file mode 100644 index 0000000..3bba60d --- /dev/null +++ b/src/redis_client.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +from flask_redis import FlaskRedis + +redis_client = FlaskRedis() diff --git a/src/resources.py b/src/resources.py index 52241da..035ec91 100644 --- a/src/resources.py +++ b/src/resources.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import json -import time +import io from datetime import datetime import tzlocal from xeger import Xeger @@ -9,7 +8,7 @@ from flask import request, current_app, abort from magic_amqp import magic_amqp from influxus import influx_db from schemas import SampleSchema -from requests_opentracing import SessionTracing +from redis_client import redis_client import opentracing """ @@ -44,13 +43,13 @@ class SampleResource(Resource): if 'description' not in request.form: return abort(400, "no description found") else: - description = request.form.get("description") + description_raw = request.form.get("description") if soundfile.content_type != 'audio/wave': current_app.logger.info(f"Input file was not WAV.") return abort(415, 'Input file not a wave file.') try: - desc = self.sampleschema.loads(description) + desc = self.sampleschema.loads(description_raw) except Exception as e: current_app.logger.exception(e) return abort(417, 'Input JSON schema invalid') @@ -62,50 +61,30 @@ class SampleResource(Resource): if len(generated_tag) > 2: # Ensure minimum length break - # Handle mega-autismo-cliento - soundfile_content_length = soundfile.content_length - if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH - with opentracing.tracer.start_active_span( - 'calculateContentLength'): # In an ideal scenario this span is missing - current_app.logger.debug( - "The uploader did not provide content-length for the sound file... Calculating manually..." + with opentracing.tracer.start_active_span('publishMetaMessage'): + try: # TODO change exchange + magic_amqp.publish( + { + 'tag': generated_tag, + 'timestamp': datetime.now().isoformat(), + 'device_id': desc['device_id'], + 'device_date': desc['date'].isoformat() + } ) - # So, this is a seekable stream, so we just seek to the end - old_ptr = soundfile.tell() - soundfile.seek(0, 2) - # Check where is the end (= content length) - soundfile_content_length = soundfile.tell() - # Seek back to where the stream was - soundfile.seek(old_ptr, 0) + except Exception as e: + current_app.logger.exception(e) + return abort(500, f"AMQP Publish error: {str(e)}") - # It's insane, that you can not set this field in curl + with opentracing.tracer.start_active_span('readSampleToMemory'): + buf = io.BytesIO() + soundfile.save(buf) - with opentracing.tracer.start_active_span('uploadToStorageService'): - files = { - 'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), - 'soundFile': ( - 'wave.wav', - soundfile, - soundfile.content_type, - {'Content-Length': soundfile_content_length})} - - upload_started = time.time() - r = SessionTracing(propagate=True).post( - f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", - files=files - ) - upload_time = time.time() - upload_started - - if upload_time > 0.8: - current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") - - if r.status_code not in [200, 201]: - return abort(500, - f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") + with opentracing.tracer.start_active_span('putToCache'): + redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue # Announce only after the data is successfully committed - with opentracing.tracer.start_active_span('publishMessage'): - try: + with opentracing.tracer.start_active_span('publishInCacheMessage'): + try: # TODO change exchange magic_amqp.publish({'tag': generated_tag}) except Exception as e: current_app.logger.exception(e) @@ -129,4 +108,4 @@ class SampleResource(Resource): ] ) - return {"tag": generated_tag}, 200 \ No newline at end of file + return {"tag": generated_tag}, 200