diff --git a/requirements.txt b/requirements.txt index f9cf75d..308fa0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,13 +4,9 @@ Flask~=2.0.1 Flask-RESTful~=0.3.9 requests~=2.26.0 werkzeug -sqlalchemy~=1.4.22 -flask_sqlalchemy~=2.5.1 xeger~=0.3.5 pika~=1.2.0 -psycopg2-binary marshmallow~=3.13.0 -marshmallow-sqlalchemy~=0.26.1 flask-marshmallow py-healthcheck Flask-InfluxDB @@ -18,7 +14,8 @@ tzdata tzlocal apscheduler~=3.7.0 +flask-redis~=0.4.0 + opentracing~=2.4.0 jaeger-client -requests-opentracing Flask-Opentracing \ No newline at end of file diff --git a/src/app.py b/src/app.py index b1db571..2324c8f 100644 --- a/src/app.py +++ b/src/app.py @@ -3,15 +3,14 @@ from flask import Flask from flask_restful import Api import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration -from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from healthcheck import HealthCheck +from redis_client import redis_client from config import Config -from db import db from marshm import ma from influxus import influx_db -from resources import SampleResource, SampleParameterResource -from healthchecks import health_database_status, amqp_connection_status +from resources import SampleResource +from healthchecks import amqp_connection_status import atexit @@ -35,7 +34,7 @@ __version__text__ = "1" if Config.SENTRY_DSN: sentry_sdk.init( dsn=Config.SENTRY_DSN, - integrations=[FlaskIntegration(), SqlalchemyIntegration()], + integrations=[FlaskIntegration()], traces_sample_rate=0.0, send_default_pii=True, release=Config.RELEASE_ID, @@ -48,9 +47,10 @@ app.config.from_object(Config) api = Api(app) health = HealthCheck() -db.init_app(app) ma.init_app(app) +redis_client.init_app(app) + # ampq magic stuff magic_amqp.init_app(app) @@ -68,7 +68,7 @@ if Config.ENABLE_INFLUXDB: def init_db(): if Config.ENABLE_INFLUXDB: influx_db.database.create(Config.INFLUXDB_DATABASE) - db.create_all() + # Setup tracing def initialize_tracer(): @@ -81,9 +81,7 @@ def initialize_tracer(): tracing = FlaskTracing(initialize_tracer, True, app) api.add_resource(SampleResource, "/input") -api.add_resource(SampleParameterResource, '/input/') -health.add_check(health_database_status) health.add_check(amqp_connection_status) register_all_error_handlers(app) @@ -92,6 +90,7 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) if __name__ != '__main__': import logging + gunicorn_logger = logging.getLogger('gunicorn.error') app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) diff --git a/src/config.py b/src/config.py index 1d3cc11..565ab4d 100644 --- a/src/config.py +++ b/src/config.py @@ -10,12 +10,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" -_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost") -_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service") -_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service") -_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service") -_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "") - class Config: PORT = 8080 @@ -25,8 +19,10 @@ class Config: RELEASE_ID = os.environ.get("RELEASE_ID", "test") RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev") - EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev") - RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract") + REDIS_URL = os.environ['CACHE_REDIS_URL'] + + EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta") + EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache") FLASK_PIKA_PARAMS = { 'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"), @@ -36,10 +32,6 @@ class Config: 'virtual_host': '/' } - SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}" - - STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042") - ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"] INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx") INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086") diff --git a/src/db.py b/src/db.py deleted file mode 100644 index 98196e0..0000000 --- a/src/db.py +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env python3 -from flask_sqlalchemy import SQLAlchemy - -""" -Database api -""" - -__author__ = '@tormakris' -__copyright__ = "Copyright 2020, Birbnetes Team" -__module_name__ = "db" -__version__text__ = "1" - -db = SQLAlchemy() diff --git a/src/healthchecks.py b/src/healthchecks.py index d97b19c..1681ed3 100644 --- a/src/healthchecks.py +++ b/src/healthchecks.py @@ -1,6 +1,4 @@ #!/usr/bin/env python3 - -from db import db from magic_amqp import magic_amqp """ @@ -13,17 +11,6 @@ __module_name__ = "healthchecks" __version__text__ = "1" -def health_database_status(): - is_database_working = True - output = 'database is ok' - try: - db.session.execute('SELECT 1') - except Exception as e: - output = str(e) - is_database_working = False - return is_database_working, output - - def amqp_connection_status(): if magic_amqp.is_healthy(): result = True diff --git a/src/magic_amqp.py b/src/magic_amqp.py index f719183..de91dd6 100644 --- a/src/magic_amqp.py +++ b/src/magic_amqp.py @@ -26,8 +26,8 @@ class MagicAMQP: def init_app(self, app: Flask): self.app = app self.app.config.setdefault('FLASK_PIKA_PARAMS', {}) - self.app.config.setdefault('EXCHANGE_NAME', None) - self.app.config.setdefault('RABBITMQ_QUEUE', None) + self.app.config.setdefault('EXCHANGE_NAME_META', None) + self.app.config.setdefault('EXCHANGE_NAME_CACHE', None) self._credentials = pika.PlainCredentials( app.config['FLASK_PIKA_PARAMS']['username'], @@ -46,7 +46,11 @@ class MagicAMQP: ) self._pika_channel = self._pika_connection.channel() self._pika_channel.exchange_declare( - exchange=self.app.config['EXCHANGE_NAME'], + exchange=self.app.config['EXCHANGE_NAME_META'], + exchange_type='direct' + ) + self._pika_channel.exchange_declare( + exchange=self.app.config['EXCHANGE_NAME_CACHE'], exchange_type='direct' ) @@ -71,7 +75,7 @@ class MagicAMQP: if total_time > 1: self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") - def publish(self, payload=None): + def _publish(self, exchange: str, payload=None): """ Publish a simple json serialized message to the configured queue. If the connection is broken, then this call will block until the connection is restored @@ -89,7 +93,7 @@ class MagicAMQP: while True: try: self._pika_channel.basic_publish( - exchange=self.app.config['EXCHANGE_NAME'], + exchange=exchange, routing_key='feature', body=json.dumps(payload).encode('UTF-8') ) @@ -120,6 +124,12 @@ class MagicAMQP: if total_time > 0.4: self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") + def publish_cache(self, payload=None): + return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload) + + def publish_meta(self, payload=None): + return self._publish(self.app.config['EXCHANGE_NAME_META'], payload) + def is_healthy(self) -> bool: with self._lock: if not self._pika_channel: diff --git a/src/models.py b/src/models.py deleted file mode 100644 index 8a3cf36..0000000 --- a/src/models.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 -from db import db -from sqlalchemy.sql import func - -""" -Flask Restful endpoints -""" - -__author__ = '@tormakris' -__copyright__ = "Copyright 2020, Birbnetes Team" -__module_name__ = "models" -__version__text__ = "1" - - -class SampleMetadata(db.Model): - """ - SQLAlchemy model of metadata entries - """ - id = db.Column(db.Integer, primary_key=True, auto_increment=True) - timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now()) - - device_id = db.Column(db.Integer, nullable=False) - device_date = db.Column(db.DateTime, nullable=False) - - tag = db.Column(db.String(32), nullable=False, unique=True) diff --git a/src/redis_client.py b/src/redis_client.py new file mode 100644 index 0000000..3bba60d --- /dev/null +++ b/src/redis_client.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +from flask_redis import FlaskRedis + +redis_client = FlaskRedis() diff --git a/src/resources.py b/src/resources.py index 1fc24d7..1deaebb 100644 --- a/src/resources.py +++ b/src/resources.py @@ -1,18 +1,14 @@ #!/usr/bin/env python3 -import json -import time +import io from datetime import datetime import tzlocal from xeger import Xeger from flask_restful import Resource from flask import request, current_app, abort -import requests from magic_amqp import magic_amqp -from db import db from influxus import influx_db -from models import SampleMetadata -from schemas import SampleSchema, SampleMetadataSchema -from requests_opentracing import SessionTracing +from schemas import SampleSchema +from redis_client import redis_client import opentracing """ @@ -32,7 +28,6 @@ class SampleResource(Resource): """ sampleschema = SampleSchema(many=False) - samplemetadataschema = SampleMetadataSchema(many=True) def post(self): """ @@ -48,13 +43,13 @@ class SampleResource(Resource): if 'description' not in request.form: return abort(400, "no description found") else: - description = request.form.get("description") + description_raw = request.form.get("description") if soundfile.content_type != 'audio/wave': current_app.logger.info(f"Input file was not WAV.") return abort(415, 'Input file not a wave file.') try: - desc = self.sampleschema.loads(description) + desc = self.sampleschema.loads(description_raw) except Exception as e: current_app.logger.exception(e) return abort(417, 'Input JSON schema invalid') @@ -66,62 +61,34 @@ class SampleResource(Resource): if len(generated_tag) > 2: # Ensure minimum length break - # Handle mega-autismo-cliento - soundfile_content_length = soundfile.content_length - if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH - with opentracing.tracer.start_active_span( - 'calculateContentLength'): # In an ideal scenario this span is missing - current_app.logger.debug( - "The uploader did not provide content-length for the sound file... Calculating manually..." + with opentracing.tracer.start_active_span('publishMetaMessage'): + try: + magic_amqp.publish_meta( + { + 'tag': generated_tag, + 'timestamp': datetime.now().isoformat(), + 'device_id': desc['device_id'], + 'device_date': desc['date'].isoformat() + } ) - # So, this is a seekable stream, so we just seek to the end - old_ptr = soundfile.tell() - soundfile.seek(0, 2) - # Check where is the end (= content length) - soundfile_content_length = soundfile.tell() - # Seek back to where the stream was - soundfile.seek(old_ptr, 0) + except Exception as e: + current_app.logger.exception(e) + return abort(500, f"AMQP Publish error: {str(e)}") - # It's insane, that you can not set this field in curl + with opentracing.tracer.start_active_span('readSampleToMemory'): + buf = io.BytesIO() + soundfile.save(buf) - with opentracing.tracer.start_active_span('sqlalchemy.create'): - record = SampleMetadata( - device_id=desc['device_id'], - device_date=desc['date'], - tag=generated_tag - ) - db.session.add(record) - - with opentracing.tracer.start_active_span('uploadToStorageService'): - files = { - 'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), - 'soundFile': ( - 'wave.wav', - soundfile, - soundfile.content_type, - {'Content-Length': soundfile_content_length})} - - upload_started = time.time() - r = SessionTracing(propagate=True).post( - f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", - files=files - ) - upload_time = time.time() - upload_started - - if upload_time > 0.8: - current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") - - if r.status_code not in [200, 201]: - return abort(500, - f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") - - with opentracing.tracer.start_active_span('sqlalchemy.commit'): - db.session.commit() + with opentracing.tracer.start_active_span('putToCache'): + redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue # Announce only after the data is successfully committed - with opentracing.tracer.start_active_span('publishMessage'): + with opentracing.tracer.start_active_span('publishInCacheMessage'): try: - magic_amqp.publish({'tag': generated_tag}) + magic_amqp.publish_cache({ + 'tag': generated_tag, + 'mime_type': soundfile.mimetype + }) except Exception as e: current_app.logger.exception(e) return abort(500, f"AMQP Publish error: {str(e)}") @@ -145,90 +112,3 @@ class SampleResource(Resource): ) return {"tag": generated_tag}, 200 - - def get(self): - """ - Get all stored items - :return: - """ - with opentracing.tracer.start_active_span('compileQuery'): - query = SampleMetadata.query - - ## Compile filters ## - - filters = [] - try: - first = int(request.args.get('first')) - except (ValueError, TypeError): - first = None - else: - filters.append( - SampleMetadata.id >= first - ) - - try: - after = datetime.fromisoformat(request.args.get('after')) - except (ValueError, TypeError): - after = None - else: - filters.append( - SampleMetadata.timestamp > after - ) - - try: - before = datetime.fromisoformat(request.args.get('before')) - except (ValueError, TypeError): - before = None - else: - filters.append( - SampleMetadata.timestamp < before - ) - - if filters: - query = query.filter(db.and_(*filters)) - - try: - limit = int(request.args.get('limit')) - except (ValueError, TypeError): - limit = None - else: - query = query.limit(limit) - - ## Run query ## - count = "count" in request.args - tags = { - "first": first, - "limit": limit, - "after": after, - "before": before - } - - if count: - with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags): - rows = query.count() - - return {"count": rows}, 200 - else: - with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags): - samples = query.all() - - return self.samplemetadataschema.dump(list(samples)), 200 - - -class SampleParameterResource(Resource): - """ - Sample endpoint with parameters - """ - - samplemetadataschema = SampleMetadataSchema(many=False) - - def get(self, tag: str): - """ - Get a specific item - :param tag: - :return: - """ - with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}): - sample = SampleMetadata.query.filter_by(tag=tag).first_or_404() - - return self.samplemetadataschema.dump(sample), 200 diff --git a/src/schemas.py b/src/schemas.py index 3413902..f9b1cce 100644 --- a/src/schemas.py +++ b/src/schemas.py @@ -1,7 +1,4 @@ #!/usr/bin/env python3 -from flask_marshmallow.sqla import auto_field - -from models import SampleMetadata from marshm import ma from marshmallow import fields @@ -26,13 +23,3 @@ class SampleSchema(ma.Schema): date = fields.DateTime(required=True) device_id = fields.Integer(required=True) - - -class SampleMetadataSchema(ma.SQLAlchemyAutoSchema): - """ - Marshmallow schema generated - """ - class Meta: - model = SampleMetadata - exclude = ('timestamp', 'id', 'device_date') - date = auto_field("device_date", dump_only=False)