Merge pull request 'norbi-update' (#1) from norbi-update into master
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				continuous-integration/drone/push Build is passing
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			Reviewed-on: #1
This commit is contained in:
		@@ -4,13 +4,9 @@ Flask~=2.0.1
 | 
				
			|||||||
Flask-RESTful~=0.3.9
 | 
					Flask-RESTful~=0.3.9
 | 
				
			||||||
requests~=2.26.0
 | 
					requests~=2.26.0
 | 
				
			||||||
werkzeug
 | 
					werkzeug
 | 
				
			||||||
sqlalchemy~=1.4.22
 | 
					 | 
				
			||||||
flask_sqlalchemy~=2.5.1
 | 
					 | 
				
			||||||
xeger~=0.3.5
 | 
					xeger~=0.3.5
 | 
				
			||||||
pika~=1.2.0
 | 
					pika~=1.2.0
 | 
				
			||||||
psycopg2-binary
 | 
					 | 
				
			||||||
marshmallow~=3.13.0
 | 
					marshmallow~=3.13.0
 | 
				
			||||||
marshmallow-sqlalchemy~=0.26.1
 | 
					 | 
				
			||||||
flask-marshmallow
 | 
					flask-marshmallow
 | 
				
			||||||
py-healthcheck
 | 
					py-healthcheck
 | 
				
			||||||
Flask-InfluxDB
 | 
					Flask-InfluxDB
 | 
				
			||||||
@@ -18,7 +14,8 @@ tzdata
 | 
				
			|||||||
tzlocal
 | 
					tzlocal
 | 
				
			||||||
apscheduler~=3.7.0
 | 
					apscheduler~=3.7.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					flask-redis~=0.4.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
opentracing~=2.4.0
 | 
					opentracing~=2.4.0
 | 
				
			||||||
jaeger-client
 | 
					jaeger-client
 | 
				
			||||||
requests-opentracing
 | 
					 | 
				
			||||||
Flask-Opentracing
 | 
					Flask-Opentracing
 | 
				
			||||||
							
								
								
									
										17
									
								
								src/app.py
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								src/app.py
									
									
									
									
									
								
							@@ -3,15 +3,14 @@ from flask import Flask
 | 
				
			|||||||
from flask_restful import Api
 | 
					from flask_restful import Api
 | 
				
			||||||
import sentry_sdk
 | 
					import sentry_sdk
 | 
				
			||||||
from sentry_sdk.integrations.flask import FlaskIntegration
 | 
					from sentry_sdk.integrations.flask import FlaskIntegration
 | 
				
			||||||
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
 | 
					 | 
				
			||||||
from healthcheck import HealthCheck
 | 
					from healthcheck import HealthCheck
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from redis_client import redis_client
 | 
				
			||||||
from config import Config
 | 
					from config import Config
 | 
				
			||||||
from db import db
 | 
					 | 
				
			||||||
from marshm import ma
 | 
					from marshm import ma
 | 
				
			||||||
from influxus import influx_db
 | 
					from influxus import influx_db
 | 
				
			||||||
from resources import SampleResource, SampleParameterResource
 | 
					from resources import SampleResource
 | 
				
			||||||
from healthchecks import health_database_status, amqp_connection_status
 | 
					from healthchecks import amqp_connection_status
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import atexit
 | 
					import atexit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -35,7 +34,7 @@ __version__text__ = "1"
 | 
				
			|||||||
if Config.SENTRY_DSN:
 | 
					if Config.SENTRY_DSN:
 | 
				
			||||||
    sentry_sdk.init(
 | 
					    sentry_sdk.init(
 | 
				
			||||||
        dsn=Config.SENTRY_DSN,
 | 
					        dsn=Config.SENTRY_DSN,
 | 
				
			||||||
        integrations=[FlaskIntegration(), SqlalchemyIntegration()],
 | 
					        integrations=[FlaskIntegration()],
 | 
				
			||||||
        traces_sample_rate=0.0,
 | 
					        traces_sample_rate=0.0,
 | 
				
			||||||
        send_default_pii=True,
 | 
					        send_default_pii=True,
 | 
				
			||||||
        release=Config.RELEASE_ID,
 | 
					        release=Config.RELEASE_ID,
 | 
				
			||||||
@@ -48,9 +47,10 @@ app.config.from_object(Config)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
api = Api(app)
 | 
					api = Api(app)
 | 
				
			||||||
health = HealthCheck()
 | 
					health = HealthCheck()
 | 
				
			||||||
db.init_app(app)
 | 
					 | 
				
			||||||
ma.init_app(app)
 | 
					ma.init_app(app)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					redis_client.init_app(app)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# ampq magic stuff
 | 
					# ampq magic stuff
 | 
				
			||||||
magic_amqp.init_app(app)
 | 
					magic_amqp.init_app(app)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -68,7 +68,7 @@ if Config.ENABLE_INFLUXDB:
 | 
				
			|||||||
def init_db():
 | 
					def init_db():
 | 
				
			||||||
    if Config.ENABLE_INFLUXDB:
 | 
					    if Config.ENABLE_INFLUXDB:
 | 
				
			||||||
        influx_db.database.create(Config.INFLUXDB_DATABASE)
 | 
					        influx_db.database.create(Config.INFLUXDB_DATABASE)
 | 
				
			||||||
    db.create_all()
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Setup tracing
 | 
					# Setup tracing
 | 
				
			||||||
def initialize_tracer():
 | 
					def initialize_tracer():
 | 
				
			||||||
@@ -81,9 +81,7 @@ def initialize_tracer():
 | 
				
			|||||||
tracing = FlaskTracing(initialize_tracer, True, app)
 | 
					tracing = FlaskTracing(initialize_tracer, True, app)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
api.add_resource(SampleResource, "/input")
 | 
					api.add_resource(SampleResource, "/input")
 | 
				
			||||||
api.add_resource(SampleParameterResource, '/input/<tag>')
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
health.add_check(health_database_status)
 | 
					 | 
				
			||||||
health.add_check(amqp_connection_status)
 | 
					health.add_check(amqp_connection_status)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
register_all_error_handlers(app)
 | 
					register_all_error_handlers(app)
 | 
				
			||||||
@@ -92,6 +90,7 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
if __name__ != '__main__':
 | 
					if __name__ != '__main__':
 | 
				
			||||||
    import logging
 | 
					    import logging
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    gunicorn_logger = logging.getLogger('gunicorn.error')
 | 
					    gunicorn_logger = logging.getLogger('gunicorn.error')
 | 
				
			||||||
    app.logger.handlers = gunicorn_logger.handlers
 | 
					    app.logger.handlers = gunicorn_logger.handlers
 | 
				
			||||||
    app.logger.setLevel(gunicorn_logger.level)
 | 
					    app.logger.setLevel(gunicorn_logger.level)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -10,12 +10,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
 | 
				
			|||||||
__module_name__ = "app"
 | 
					__module_name__ = "app"
 | 
				
			||||||
__version__text__ = "1"
 | 
					__version__text__ = "1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
 | 
					 | 
				
			||||||
_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
 | 
					 | 
				
			||||||
_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
 | 
					 | 
				
			||||||
_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
 | 
					 | 
				
			||||||
_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Config:
 | 
					class Config:
 | 
				
			||||||
    PORT = 8080
 | 
					    PORT = 8080
 | 
				
			||||||
@@ -25,8 +19,10 @@ class Config:
 | 
				
			|||||||
    RELEASE_ID = os.environ.get("RELEASE_ID", "test")
 | 
					    RELEASE_ID = os.environ.get("RELEASE_ID", "test")
 | 
				
			||||||
    RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
 | 
					    RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev")
 | 
					    REDIS_URL = os.environ['CACHE_REDIS_URL']
 | 
				
			||||||
    RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract")
 | 
					
 | 
				
			||||||
 | 
					    EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
 | 
				
			||||||
 | 
					    EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    FLASK_PIKA_PARAMS = {
 | 
					    FLASK_PIKA_PARAMS = {
 | 
				
			||||||
        'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
 | 
					        'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
 | 
				
			||||||
@@ -36,10 +32,6 @@ class Config:
 | 
				
			|||||||
        'virtual_host': '/'
 | 
					        'virtual_host': '/'
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
 | 
					    ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
 | 
				
			||||||
    INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
 | 
					    INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
 | 
				
			||||||
    INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
 | 
					    INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										13
									
								
								src/db.py
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								src/db.py
									
									
									
									
									
								
							@@ -1,13 +0,0 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					 | 
				
			||||||
from flask_sqlalchemy import SQLAlchemy
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
"""
 | 
					 | 
				
			||||||
Database api
 | 
					 | 
				
			||||||
"""
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
__author__ = '@tormakris'
 | 
					 | 
				
			||||||
__copyright__ = "Copyright 2020, Birbnetes Team"
 | 
					 | 
				
			||||||
__module_name__ = "db"
 | 
					 | 
				
			||||||
__version__text__ = "1"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
db = SQLAlchemy()
 | 
					 | 
				
			||||||
@@ -1,6 +1,4 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					 | 
				
			||||||
from db import db
 | 
					 | 
				
			||||||
from magic_amqp import magic_amqp
 | 
					from magic_amqp import magic_amqp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
@@ -13,17 +11,6 @@ __module_name__ = "healthchecks"
 | 
				
			|||||||
__version__text__ = "1"
 | 
					__version__text__ = "1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def health_database_status():
 | 
					 | 
				
			||||||
    is_database_working = True
 | 
					 | 
				
			||||||
    output = 'database is ok'
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        db.session.execute('SELECT 1')
 | 
					 | 
				
			||||||
    except Exception as e:
 | 
					 | 
				
			||||||
        output = str(e)
 | 
					 | 
				
			||||||
        is_database_working = False
 | 
					 | 
				
			||||||
    return is_database_working, output
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def amqp_connection_status():
 | 
					def amqp_connection_status():
 | 
				
			||||||
    if magic_amqp.is_healthy():
 | 
					    if magic_amqp.is_healthy():
 | 
				
			||||||
        result = True
 | 
					        result = True
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,8 +26,8 @@ class MagicAMQP:
 | 
				
			|||||||
    def init_app(self, app: Flask):
 | 
					    def init_app(self, app: Flask):
 | 
				
			||||||
        self.app = app
 | 
					        self.app = app
 | 
				
			||||||
        self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
 | 
					        self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
 | 
				
			||||||
        self.app.config.setdefault('EXCHANGE_NAME', None)
 | 
					        self.app.config.setdefault('EXCHANGE_NAME_META', None)
 | 
				
			||||||
        self.app.config.setdefault('RABBITMQ_QUEUE', None)
 | 
					        self.app.config.setdefault('EXCHANGE_NAME_CACHE', None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self._credentials = pika.PlainCredentials(
 | 
					        self._credentials = pika.PlainCredentials(
 | 
				
			||||||
            app.config['FLASK_PIKA_PARAMS']['username'],
 | 
					            app.config['FLASK_PIKA_PARAMS']['username'],
 | 
				
			||||||
@@ -46,7 +46,11 @@ class MagicAMQP:
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
        self._pika_channel = self._pika_connection.channel()
 | 
					        self._pika_channel = self._pika_connection.channel()
 | 
				
			||||||
        self._pika_channel.exchange_declare(
 | 
					        self._pika_channel.exchange_declare(
 | 
				
			||||||
            exchange=self.app.config['EXCHANGE_NAME'],
 | 
					            exchange=self.app.config['EXCHANGE_NAME_META'],
 | 
				
			||||||
 | 
					            exchange_type='direct'
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        self._pika_channel.exchange_declare(
 | 
				
			||||||
 | 
					            exchange=self.app.config['EXCHANGE_NAME_CACHE'],
 | 
				
			||||||
            exchange_type='direct'
 | 
					            exchange_type='direct'
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -71,7 +75,7 @@ class MagicAMQP:
 | 
				
			|||||||
            if total_time > 1:
 | 
					            if total_time > 1:
 | 
				
			||||||
                self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
 | 
					                self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def publish(self, payload=None):
 | 
					    def _publish(self, exchange: str, payload=None):
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        Publish a simple json serialized message to the configured queue.
 | 
					        Publish a simple json serialized message to the configured queue.
 | 
				
			||||||
        If the connection is broken, then this call will block until the connection is restored
 | 
					        If the connection is broken, then this call will block until the connection is restored
 | 
				
			||||||
@@ -89,7 +93,7 @@ class MagicAMQP:
 | 
				
			|||||||
                while True:
 | 
					                while True:
 | 
				
			||||||
                    try:
 | 
					                    try:
 | 
				
			||||||
                        self._pika_channel.basic_publish(
 | 
					                        self._pika_channel.basic_publish(
 | 
				
			||||||
                            exchange=self.app.config['EXCHANGE_NAME'],
 | 
					                            exchange=exchange,
 | 
				
			||||||
                            routing_key='feature',
 | 
					                            routing_key='feature',
 | 
				
			||||||
                            body=json.dumps(payload).encode('UTF-8')
 | 
					                            body=json.dumps(payload).encode('UTF-8')
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
@@ -120,6 +124,12 @@ class MagicAMQP:
 | 
				
			|||||||
                if total_time > 0.4:
 | 
					                if total_time > 0.4:
 | 
				
			||||||
                    self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
 | 
					                    self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def publish_cache(self, payload=None):
 | 
				
			||||||
 | 
					        return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def publish_meta(self, payload=None):
 | 
				
			||||||
 | 
					        return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def is_healthy(self) -> bool:
 | 
					    def is_healthy(self) -> bool:
 | 
				
			||||||
        with self._lock:
 | 
					        with self._lock:
 | 
				
			||||||
            if not self._pika_channel:
 | 
					            if not self._pika_channel:
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,25 +0,0 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					 | 
				
			||||||
from db import db
 | 
					 | 
				
			||||||
from sqlalchemy.sql import func
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
"""
 | 
					 | 
				
			||||||
Flask Restful endpoints
 | 
					 | 
				
			||||||
"""
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
__author__ = '@tormakris'
 | 
					 | 
				
			||||||
__copyright__ = "Copyright 2020, Birbnetes Team"
 | 
					 | 
				
			||||||
__module_name__ = "models"
 | 
					 | 
				
			||||||
__version__text__ = "1"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class SampleMetadata(db.Model):
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    SQLAlchemy model of metadata entries
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    id = db.Column(db.Integer, primary_key=True, auto_increment=True)
 | 
					 | 
				
			||||||
    timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    device_id = db.Column(db.Integer, nullable=False)
 | 
					 | 
				
			||||||
    device_date = db.Column(db.DateTime, nullable=False)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    tag = db.Column(db.String(32), nullable=False, unique=True)
 | 
					 | 
				
			||||||
							
								
								
									
										4
									
								
								src/redis_client.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								src/redis_client.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,4 @@
 | 
				
			|||||||
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					from flask_redis import FlaskRedis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					redis_client = FlaskRedis()
 | 
				
			||||||
							
								
								
									
										174
									
								
								src/resources.py
									
									
									
									
									
								
							
							
						
						
									
										174
									
								
								src/resources.py
									
									
									
									
									
								
							@@ -1,18 +1,14 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
import json
 | 
					import io
 | 
				
			||||||
import time
 | 
					 | 
				
			||||||
from datetime import datetime
 | 
					from datetime import datetime
 | 
				
			||||||
import tzlocal
 | 
					import tzlocal
 | 
				
			||||||
from xeger import Xeger
 | 
					from xeger import Xeger
 | 
				
			||||||
from flask_restful import Resource
 | 
					from flask_restful import Resource
 | 
				
			||||||
from flask import request, current_app, abort
 | 
					from flask import request, current_app, abort
 | 
				
			||||||
import requests
 | 
					 | 
				
			||||||
from magic_amqp import magic_amqp
 | 
					from magic_amqp import magic_amqp
 | 
				
			||||||
from db import db
 | 
					 | 
				
			||||||
from influxus import influx_db
 | 
					from influxus import influx_db
 | 
				
			||||||
from models import SampleMetadata
 | 
					from schemas import SampleSchema
 | 
				
			||||||
from schemas import SampleSchema, SampleMetadataSchema
 | 
					from redis_client import redis_client
 | 
				
			||||||
from requests_opentracing import SessionTracing
 | 
					 | 
				
			||||||
import opentracing
 | 
					import opentracing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
@@ -32,7 +28,6 @@ class SampleResource(Resource):
 | 
				
			|||||||
    """
 | 
					    """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    sampleschema = SampleSchema(many=False)
 | 
					    sampleschema = SampleSchema(many=False)
 | 
				
			||||||
    samplemetadataschema = SampleMetadataSchema(many=True)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def post(self):
 | 
					    def post(self):
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
@@ -48,13 +43,13 @@ class SampleResource(Resource):
 | 
				
			|||||||
            if 'description' not in request.form:
 | 
					            if 'description' not in request.form:
 | 
				
			||||||
                return abort(400, "no description found")
 | 
					                return abort(400, "no description found")
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                description = request.form.get("description")
 | 
					                description_raw = request.form.get("description")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if soundfile.content_type != 'audio/wave':
 | 
					            if soundfile.content_type != 'audio/wave':
 | 
				
			||||||
                current_app.logger.info(f"Input file was not WAV.")
 | 
					                current_app.logger.info(f"Input file was not WAV.")
 | 
				
			||||||
                return abort(415, 'Input file not a wave file.')
 | 
					                return abort(415, 'Input file not a wave file.')
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                desc = self.sampleschema.loads(description)
 | 
					                desc = self.sampleschema.loads(description_raw)
 | 
				
			||||||
            except Exception as e:
 | 
					            except Exception as e:
 | 
				
			||||||
                current_app.logger.exception(e)
 | 
					                current_app.logger.exception(e)
 | 
				
			||||||
                return abort(417, 'Input JSON schema invalid')
 | 
					                return abort(417, 'Input JSON schema invalid')
 | 
				
			||||||
@@ -66,62 +61,34 @@ class SampleResource(Resource):
 | 
				
			|||||||
                if len(generated_tag) > 2:  # Ensure minimum length
 | 
					                if len(generated_tag) > 2:  # Ensure minimum length
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Handle mega-autismo-cliento
 | 
					        with opentracing.tracer.start_active_span('publishMetaMessage'):
 | 
				
			||||||
        soundfile_content_length = soundfile.content_length
 | 
					            try:
 | 
				
			||||||
        if soundfile_content_length <= 0:  # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
 | 
					                magic_amqp.publish_meta(
 | 
				
			||||||
            with opentracing.tracer.start_active_span(
 | 
					                    {
 | 
				
			||||||
                    'calculateContentLength'):  # In an ideal scenario this span is missing
 | 
					                        'tag': generated_tag,
 | 
				
			||||||
                current_app.logger.debug(
 | 
					                        'timestamp': datetime.now().isoformat(),
 | 
				
			||||||
                    "The uploader did not provide content-length for the sound file... Calculating manually..."
 | 
					                        'device_id': desc['device_id'],
 | 
				
			||||||
 | 
					                        'device_date': desc['date'].isoformat()
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                # So, this is a seekable stream, so we just seek to the end
 | 
					            except Exception as e:
 | 
				
			||||||
                old_ptr = soundfile.tell()
 | 
					                current_app.logger.exception(e)
 | 
				
			||||||
                soundfile.seek(0, 2)
 | 
					                return abort(500, f"AMQP Publish error: {str(e)}")
 | 
				
			||||||
                # Check where is the end (= content length)
 | 
					 | 
				
			||||||
                soundfile_content_length = soundfile.tell()
 | 
					 | 
				
			||||||
                # Seek back to where the stream was
 | 
					 | 
				
			||||||
                soundfile.seek(old_ptr, 0)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # It's insane, that you can not set this field in curl
 | 
					        with opentracing.tracer.start_active_span('readSampleToMemory'):
 | 
				
			||||||
 | 
					            buf = io.BytesIO()
 | 
				
			||||||
 | 
					            soundfile.save(buf)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        with opentracing.tracer.start_active_span('sqlalchemy.create'):
 | 
					        with opentracing.tracer.start_active_span('putToCache'):
 | 
				
			||||||
            record = SampleMetadata(
 | 
					            redis_client.set(generated_tag, buf.getbuffer())  # getbuffer is quick as it does not copy like getvalue
 | 
				
			||||||
                device_id=desc['device_id'],
 | 
					 | 
				
			||||||
                device_date=desc['date'],
 | 
					 | 
				
			||||||
                tag=generated_tag
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            db.session.add(record)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        with opentracing.tracer.start_active_span('uploadToStorageService'):
 | 
					 | 
				
			||||||
            files = {
 | 
					 | 
				
			||||||
                'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
 | 
					 | 
				
			||||||
                'soundFile': (
 | 
					 | 
				
			||||||
                    'wave.wav',
 | 
					 | 
				
			||||||
                    soundfile,
 | 
					 | 
				
			||||||
                    soundfile.content_type,
 | 
					 | 
				
			||||||
                    {'Content-Length': soundfile_content_length})}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            upload_started = time.time()
 | 
					 | 
				
			||||||
            r = SessionTracing(propagate=True).post(
 | 
					 | 
				
			||||||
                f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
 | 
					 | 
				
			||||||
                files=files
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            upload_time = time.time() - upload_started
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if upload_time > 0.8:
 | 
					 | 
				
			||||||
                current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if r.status_code not in [200, 201]:
 | 
					 | 
				
			||||||
                return abort(500,
 | 
					 | 
				
			||||||
                             f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        with opentracing.tracer.start_active_span('sqlalchemy.commit'):
 | 
					 | 
				
			||||||
            db.session.commit()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Announce only after the data is successfully committed
 | 
					        # Announce only after the data is successfully committed
 | 
				
			||||||
        with opentracing.tracer.start_active_span('publishMessage'):
 | 
					        with opentracing.tracer.start_active_span('publishInCacheMessage'):
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                magic_amqp.publish({'tag': generated_tag})
 | 
					                magic_amqp.publish_cache({
 | 
				
			||||||
 | 
					                    'tag': generated_tag,
 | 
				
			||||||
 | 
					                    'mime_type': soundfile.mimetype
 | 
				
			||||||
 | 
					                })
 | 
				
			||||||
            except Exception as e:
 | 
					            except Exception as e:
 | 
				
			||||||
                current_app.logger.exception(e)
 | 
					                current_app.logger.exception(e)
 | 
				
			||||||
                return abort(500, f"AMQP Publish error: {str(e)}")
 | 
					                return abort(500, f"AMQP Publish error: {str(e)}")
 | 
				
			||||||
@@ -145,90 +112,3 @@ class SampleResource(Resource):
 | 
				
			|||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return {"tag": generated_tag}, 200
 | 
					        return {"tag": generated_tag}, 200
 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get(self):
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        Get all stored items
 | 
					 | 
				
			||||||
        :return:
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        with opentracing.tracer.start_active_span('compileQuery'):
 | 
					 | 
				
			||||||
            query = SampleMetadata.query
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            ## Compile filters ##
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            filters = []
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                first = int(request.args.get('first'))
 | 
					 | 
				
			||||||
            except (ValueError, TypeError):
 | 
					 | 
				
			||||||
                first = None
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                filters.append(
 | 
					 | 
				
			||||||
                    SampleMetadata.id >= first
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                after = datetime.fromisoformat(request.args.get('after'))
 | 
					 | 
				
			||||||
            except (ValueError, TypeError):
 | 
					 | 
				
			||||||
                after = None
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                filters.append(
 | 
					 | 
				
			||||||
                    SampleMetadata.timestamp > after
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                before = datetime.fromisoformat(request.args.get('before'))
 | 
					 | 
				
			||||||
            except (ValueError, TypeError):
 | 
					 | 
				
			||||||
                before = None
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                filters.append(
 | 
					 | 
				
			||||||
                    SampleMetadata.timestamp < before
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if filters:
 | 
					 | 
				
			||||||
                query = query.filter(db.and_(*filters))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                limit = int(request.args.get('limit'))
 | 
					 | 
				
			||||||
            except (ValueError, TypeError):
 | 
					 | 
				
			||||||
                limit = None
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                query = query.limit(limit)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        ## Run query ##
 | 
					 | 
				
			||||||
        count = "count" in request.args
 | 
					 | 
				
			||||||
        tags = {
 | 
					 | 
				
			||||||
            "first": first,
 | 
					 | 
				
			||||||
            "limit": limit,
 | 
					 | 
				
			||||||
            "after": after,
 | 
					 | 
				
			||||||
            "before": before
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if count:
 | 
					 | 
				
			||||||
            with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags):
 | 
					 | 
				
			||||||
                rows = query.count()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            return {"count": rows}, 200
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags):
 | 
					 | 
				
			||||||
                samples = query.all()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            return self.samplemetadataschema.dump(list(samples)), 200
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class SampleParameterResource(Resource):
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    Sample endpoint with parameters
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    samplemetadataschema = SampleMetadataSchema(many=False)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def get(self, tag: str):
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        Get a specific item
 | 
					 | 
				
			||||||
        :param tag:
 | 
					 | 
				
			||||||
        :return:
 | 
					 | 
				
			||||||
        """
 | 
					 | 
				
			||||||
        with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
 | 
					 | 
				
			||||||
            sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return self.samplemetadataschema.dump(sample), 200
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,7 +1,4 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
from flask_marshmallow.sqla import auto_field
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from models import SampleMetadata
 | 
					 | 
				
			||||||
from marshm import ma
 | 
					from marshm import ma
 | 
				
			||||||
from marshmallow import fields
 | 
					from marshmallow import fields
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -26,13 +23,3 @@ class SampleSchema(ma.Schema):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    date = fields.DateTime(required=True)
 | 
					    date = fields.DateTime(required=True)
 | 
				
			||||||
    device_id = fields.Integer(required=True)
 | 
					    device_id = fields.Integer(required=True)
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    Marshmallow schema generated
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    class Meta:
 | 
					 | 
				
			||||||
        model = SampleMetadata
 | 
					 | 
				
			||||||
        exclude = ('timestamp', 'id', 'device_date')
 | 
					 | 
				
			||||||
    date = auto_field("device_date", dump_only=False)
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user