Compare commits
10 Commits
cbaf2f2981
...
2c1113baab
Author | SHA1 | Date | |
---|---|---|---|
2c1113baab | |||
3cdacc6720 | |||
04bb2722ad | |||
2431812f09 | |||
10f57913f3 | |||
738eea1da3 | |||
a118b79512 | |||
2c0e6ec7d7 | |||
7f987afa7a | |||
ca548f0863 |
@ -4,13 +4,9 @@ Flask~=2.0.1
|
|||||||
Flask-RESTful~=0.3.9
|
Flask-RESTful~=0.3.9
|
||||||
requests~=2.26.0
|
requests~=2.26.0
|
||||||
werkzeug
|
werkzeug
|
||||||
sqlalchemy~=1.4.22
|
|
||||||
flask_sqlalchemy~=2.5.1
|
|
||||||
xeger~=0.3.5
|
xeger~=0.3.5
|
||||||
pika~=1.2.0
|
pika~=1.2.0
|
||||||
psycopg2-binary
|
|
||||||
marshmallow~=3.13.0
|
marshmallow~=3.13.0
|
||||||
marshmallow-sqlalchemy~=0.26.1
|
|
||||||
flask-marshmallow
|
flask-marshmallow
|
||||||
py-healthcheck
|
py-healthcheck
|
||||||
Flask-InfluxDB
|
Flask-InfluxDB
|
||||||
@ -18,7 +14,8 @@ tzdata
|
|||||||
tzlocal
|
tzlocal
|
||||||
apscheduler~=3.7.0
|
apscheduler~=3.7.0
|
||||||
|
|
||||||
|
flask-redis~=0.4.0
|
||||||
|
|
||||||
opentracing~=2.4.0
|
opentracing~=2.4.0
|
||||||
jaeger-client
|
jaeger-client
|
||||||
requests-opentracing
|
|
||||||
Flask-Opentracing
|
Flask-Opentracing
|
17
src/app.py
17
src/app.py
@ -3,15 +3,14 @@ from flask import Flask
|
|||||||
from flask_restful import Api
|
from flask_restful import Api
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
from sentry_sdk.integrations.flask import FlaskIntegration
|
from sentry_sdk.integrations.flask import FlaskIntegration
|
||||||
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
|
||||||
from healthcheck import HealthCheck
|
from healthcheck import HealthCheck
|
||||||
|
|
||||||
|
from redis_client import redis_client
|
||||||
from config import Config
|
from config import Config
|
||||||
from db import db
|
|
||||||
from marshm import ma
|
from marshm import ma
|
||||||
from influxus import influx_db
|
from influxus import influx_db
|
||||||
from resources import SampleResource, SampleParameterResource
|
from resources import SampleResource
|
||||||
from healthchecks import health_database_status, amqp_connection_status
|
from healthchecks import amqp_connection_status
|
||||||
|
|
||||||
import atexit
|
import atexit
|
||||||
|
|
||||||
@ -35,7 +34,7 @@ __version__text__ = "1"
|
|||||||
if Config.SENTRY_DSN:
|
if Config.SENTRY_DSN:
|
||||||
sentry_sdk.init(
|
sentry_sdk.init(
|
||||||
dsn=Config.SENTRY_DSN,
|
dsn=Config.SENTRY_DSN,
|
||||||
integrations=[FlaskIntegration(), SqlalchemyIntegration()],
|
integrations=[FlaskIntegration()],
|
||||||
traces_sample_rate=0.0,
|
traces_sample_rate=0.0,
|
||||||
send_default_pii=True,
|
send_default_pii=True,
|
||||||
release=Config.RELEASE_ID,
|
release=Config.RELEASE_ID,
|
||||||
@ -48,9 +47,10 @@ app.config.from_object(Config)
|
|||||||
|
|
||||||
api = Api(app)
|
api = Api(app)
|
||||||
health = HealthCheck()
|
health = HealthCheck()
|
||||||
db.init_app(app)
|
|
||||||
ma.init_app(app)
|
ma.init_app(app)
|
||||||
|
|
||||||
|
redis_client.init_app(app)
|
||||||
|
|
||||||
# ampq magic stuff
|
# ampq magic stuff
|
||||||
magic_amqp.init_app(app)
|
magic_amqp.init_app(app)
|
||||||
|
|
||||||
@ -68,7 +68,7 @@ if Config.ENABLE_INFLUXDB:
|
|||||||
def init_db():
|
def init_db():
|
||||||
if Config.ENABLE_INFLUXDB:
|
if Config.ENABLE_INFLUXDB:
|
||||||
influx_db.database.create(Config.INFLUXDB_DATABASE)
|
influx_db.database.create(Config.INFLUXDB_DATABASE)
|
||||||
db.create_all()
|
|
||||||
|
|
||||||
# Setup tracing
|
# Setup tracing
|
||||||
def initialize_tracer():
|
def initialize_tracer():
|
||||||
@ -81,9 +81,7 @@ def initialize_tracer():
|
|||||||
tracing = FlaskTracing(initialize_tracer, True, app)
|
tracing = FlaskTracing(initialize_tracer, True, app)
|
||||||
|
|
||||||
api.add_resource(SampleResource, "/input")
|
api.add_resource(SampleResource, "/input")
|
||||||
api.add_resource(SampleParameterResource, '/input/<tag>')
|
|
||||||
|
|
||||||
health.add_check(health_database_status)
|
|
||||||
health.add_check(amqp_connection_status)
|
health.add_check(amqp_connection_status)
|
||||||
|
|
||||||
register_all_error_handlers(app)
|
register_all_error_handlers(app)
|
||||||
@ -92,6 +90,7 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
|
|||||||
|
|
||||||
if __name__ != '__main__':
|
if __name__ != '__main__':
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
gunicorn_logger = logging.getLogger('gunicorn.error')
|
gunicorn_logger = logging.getLogger('gunicorn.error')
|
||||||
app.logger.handlers = gunicorn_logger.handlers
|
app.logger.handlers = gunicorn_logger.handlers
|
||||||
app.logger.setLevel(gunicorn_logger.level)
|
app.logger.setLevel(gunicorn_logger.level)
|
||||||
|
@ -10,12 +10,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
|
|||||||
__module_name__ = "app"
|
__module_name__ = "app"
|
||||||
__version__text__ = "1"
|
__version__text__ = "1"
|
||||||
|
|
||||||
_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
|
|
||||||
_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
|
|
||||||
_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
|
|
||||||
_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
|
|
||||||
_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
|
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
PORT = 8080
|
PORT = 8080
|
||||||
@ -25,8 +19,10 @@ class Config:
|
|||||||
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
|
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
|
||||||
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
|
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
|
||||||
|
|
||||||
EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev")
|
REDIS_URL = os.environ['CACHE_REDIS_URL']
|
||||||
RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract")
|
|
||||||
|
EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
|
||||||
|
EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
|
||||||
|
|
||||||
FLASK_PIKA_PARAMS = {
|
FLASK_PIKA_PARAMS = {
|
||||||
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
|
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
|
||||||
@ -36,10 +32,6 @@ class Config:
|
|||||||
'virtual_host': '/'
|
'virtual_host': '/'
|
||||||
}
|
}
|
||||||
|
|
||||||
SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}"
|
|
||||||
|
|
||||||
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
|
|
||||||
|
|
||||||
ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
|
ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
|
||||||
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
|
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
|
||||||
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
|
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
|
||||||
|
13
src/db.py
13
src/db.py
@ -1,13 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
|
||||||
|
|
||||||
"""
|
|
||||||
Database api
|
|
||||||
"""
|
|
||||||
|
|
||||||
__author__ = '@tormakris'
|
|
||||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
|
||||||
__module_name__ = "db"
|
|
||||||
__version__text__ = "1"
|
|
||||||
|
|
||||||
db = SQLAlchemy()
|
|
@ -1,6 +1,4 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from db import db
|
|
||||||
from magic_amqp import magic_amqp
|
from magic_amqp import magic_amqp
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -13,17 +11,6 @@ __module_name__ = "healthchecks"
|
|||||||
__version__text__ = "1"
|
__version__text__ = "1"
|
||||||
|
|
||||||
|
|
||||||
def health_database_status():
|
|
||||||
is_database_working = True
|
|
||||||
output = 'database is ok'
|
|
||||||
try:
|
|
||||||
db.session.execute('SELECT 1')
|
|
||||||
except Exception as e:
|
|
||||||
output = str(e)
|
|
||||||
is_database_working = False
|
|
||||||
return is_database_working, output
|
|
||||||
|
|
||||||
|
|
||||||
def amqp_connection_status():
|
def amqp_connection_status():
|
||||||
if magic_amqp.is_healthy():
|
if magic_amqp.is_healthy():
|
||||||
result = True
|
result = True
|
||||||
|
@ -26,8 +26,8 @@ class MagicAMQP:
|
|||||||
def init_app(self, app: Flask):
|
def init_app(self, app: Flask):
|
||||||
self.app = app
|
self.app = app
|
||||||
self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
|
self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
|
||||||
self.app.config.setdefault('EXCHANGE_NAME', None)
|
self.app.config.setdefault('EXCHANGE_NAME_META', None)
|
||||||
self.app.config.setdefault('RABBITMQ_QUEUE', None)
|
self.app.config.setdefault('EXCHANGE_NAME_CACHE', None)
|
||||||
|
|
||||||
self._credentials = pika.PlainCredentials(
|
self._credentials = pika.PlainCredentials(
|
||||||
app.config['FLASK_PIKA_PARAMS']['username'],
|
app.config['FLASK_PIKA_PARAMS']['username'],
|
||||||
@ -46,7 +46,11 @@ class MagicAMQP:
|
|||||||
)
|
)
|
||||||
self._pika_channel = self._pika_connection.channel()
|
self._pika_channel = self._pika_connection.channel()
|
||||||
self._pika_channel.exchange_declare(
|
self._pika_channel.exchange_declare(
|
||||||
exchange=self.app.config['EXCHANGE_NAME'],
|
exchange=self.app.config['EXCHANGE_NAME_META'],
|
||||||
|
exchange_type='direct'
|
||||||
|
)
|
||||||
|
self._pika_channel.exchange_declare(
|
||||||
|
exchange=self.app.config['EXCHANGE_NAME_CACHE'],
|
||||||
exchange_type='direct'
|
exchange_type='direct'
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -71,7 +75,7 @@ class MagicAMQP:
|
|||||||
if total_time > 1:
|
if total_time > 1:
|
||||||
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
|
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
|
||||||
|
|
||||||
def publish(self, payload=None):
|
def _publish(self, exchange: str, payload=None):
|
||||||
"""
|
"""
|
||||||
Publish a simple json serialized message to the configured queue.
|
Publish a simple json serialized message to the configured queue.
|
||||||
If the connection is broken, then this call will block until the connection is restored
|
If the connection is broken, then this call will block until the connection is restored
|
||||||
@ -89,7 +93,7 @@ class MagicAMQP:
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
self._pika_channel.basic_publish(
|
self._pika_channel.basic_publish(
|
||||||
exchange=self.app.config['EXCHANGE_NAME'],
|
exchange=exchange,
|
||||||
routing_key='feature',
|
routing_key='feature',
|
||||||
body=json.dumps(payload).encode('UTF-8')
|
body=json.dumps(payload).encode('UTF-8')
|
||||||
)
|
)
|
||||||
@ -120,6 +124,12 @@ class MagicAMQP:
|
|||||||
if total_time > 0.4:
|
if total_time > 0.4:
|
||||||
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
|
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
|
||||||
|
|
||||||
|
def publish_cache(self, payload=None):
|
||||||
|
return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
|
||||||
|
|
||||||
|
def publish_meta(self, payload=None):
|
||||||
|
return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
|
||||||
|
|
||||||
def is_healthy(self) -> bool:
|
def is_healthy(self) -> bool:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if not self._pika_channel:
|
if not self._pika_channel:
|
||||||
|
@ -1,25 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
from db import db
|
|
||||||
from sqlalchemy.sql import func
|
|
||||||
|
|
||||||
"""
|
|
||||||
Flask Restful endpoints
|
|
||||||
"""
|
|
||||||
|
|
||||||
__author__ = '@tormakris'
|
|
||||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
|
||||||
__module_name__ = "models"
|
|
||||||
__version__text__ = "1"
|
|
||||||
|
|
||||||
|
|
||||||
class SampleMetadata(db.Model):
|
|
||||||
"""
|
|
||||||
SQLAlchemy model of metadata entries
|
|
||||||
"""
|
|
||||||
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
|
|
||||||
timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
|
|
||||||
|
|
||||||
device_id = db.Column(db.Integer, nullable=False)
|
|
||||||
device_date = db.Column(db.DateTime, nullable=False)
|
|
||||||
|
|
||||||
tag = db.Column(db.String(32), nullable=False, unique=True)
|
|
4
src/redis_client.py
Normal file
4
src/redis_client.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from flask_redis import FlaskRedis
|
||||||
|
|
||||||
|
redis_client = FlaskRedis()
|
174
src/resources.py
174
src/resources.py
@ -1,18 +1,14 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import json
|
import io
|
||||||
import time
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import tzlocal
|
import tzlocal
|
||||||
from xeger import Xeger
|
from xeger import Xeger
|
||||||
from flask_restful import Resource
|
from flask_restful import Resource
|
||||||
from flask import request, current_app, abort
|
from flask import request, current_app, abort
|
||||||
import requests
|
|
||||||
from magic_amqp import magic_amqp
|
from magic_amqp import magic_amqp
|
||||||
from db import db
|
|
||||||
from influxus import influx_db
|
from influxus import influx_db
|
||||||
from models import SampleMetadata
|
from schemas import SampleSchema
|
||||||
from schemas import SampleSchema, SampleMetadataSchema
|
from redis_client import redis_client
|
||||||
from requests_opentracing import SessionTracing
|
|
||||||
import opentracing
|
import opentracing
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -32,7 +28,6 @@ class SampleResource(Resource):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
sampleschema = SampleSchema(many=False)
|
sampleschema = SampleSchema(many=False)
|
||||||
samplemetadataschema = SampleMetadataSchema(many=True)
|
|
||||||
|
|
||||||
def post(self):
|
def post(self):
|
||||||
"""
|
"""
|
||||||
@ -48,13 +43,13 @@ class SampleResource(Resource):
|
|||||||
if 'description' not in request.form:
|
if 'description' not in request.form:
|
||||||
return abort(400, "no description found")
|
return abort(400, "no description found")
|
||||||
else:
|
else:
|
||||||
description = request.form.get("description")
|
description_raw = request.form.get("description")
|
||||||
|
|
||||||
if soundfile.content_type != 'audio/wave':
|
if soundfile.content_type != 'audio/wave':
|
||||||
current_app.logger.info(f"Input file was not WAV.")
|
current_app.logger.info(f"Input file was not WAV.")
|
||||||
return abort(415, 'Input file not a wave file.')
|
return abort(415, 'Input file not a wave file.')
|
||||||
try:
|
try:
|
||||||
desc = self.sampleschema.loads(description)
|
desc = self.sampleschema.loads(description_raw)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.exception(e)
|
current_app.logger.exception(e)
|
||||||
return abort(417, 'Input JSON schema invalid')
|
return abort(417, 'Input JSON schema invalid')
|
||||||
@ -66,62 +61,34 @@ class SampleResource(Resource):
|
|||||||
if len(generated_tag) > 2: # Ensure minimum length
|
if len(generated_tag) > 2: # Ensure minimum length
|
||||||
break
|
break
|
||||||
|
|
||||||
# Handle mega-autismo-cliento
|
with opentracing.tracer.start_active_span('publishMetaMessage'):
|
||||||
soundfile_content_length = soundfile.content_length
|
try:
|
||||||
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
|
magic_amqp.publish_meta(
|
||||||
with opentracing.tracer.start_active_span(
|
{
|
||||||
'calculateContentLength'): # In an ideal scenario this span is missing
|
'tag': generated_tag,
|
||||||
current_app.logger.debug(
|
'timestamp': datetime.now().isoformat(),
|
||||||
"The uploader did not provide content-length for the sound file... Calculating manually..."
|
'device_id': desc['device_id'],
|
||||||
|
'device_date': desc['date'].isoformat()
|
||||||
|
}
|
||||||
)
|
)
|
||||||
# So, this is a seekable stream, so we just seek to the end
|
except Exception as e:
|
||||||
old_ptr = soundfile.tell()
|
current_app.logger.exception(e)
|
||||||
soundfile.seek(0, 2)
|
return abort(500, f"AMQP Publish error: {str(e)}")
|
||||||
# Check where is the end (= content length)
|
|
||||||
soundfile_content_length = soundfile.tell()
|
|
||||||
# Seek back to where the stream was
|
|
||||||
soundfile.seek(old_ptr, 0)
|
|
||||||
|
|
||||||
# It's insane, that you can not set this field in curl
|
with opentracing.tracer.start_active_span('readSampleToMemory'):
|
||||||
|
buf = io.BytesIO()
|
||||||
|
soundfile.save(buf)
|
||||||
|
|
||||||
with opentracing.tracer.start_active_span('sqlalchemy.create'):
|
with opentracing.tracer.start_active_span('putToCache'):
|
||||||
record = SampleMetadata(
|
redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue
|
||||||
device_id=desc['device_id'],
|
|
||||||
device_date=desc['date'],
|
|
||||||
tag=generated_tag
|
|
||||||
)
|
|
||||||
db.session.add(record)
|
|
||||||
|
|
||||||
with opentracing.tracer.start_active_span('uploadToStorageService'):
|
|
||||||
files = {
|
|
||||||
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
|
|
||||||
'soundFile': (
|
|
||||||
'wave.wav',
|
|
||||||
soundfile,
|
|
||||||
soundfile.content_type,
|
|
||||||
{'Content-Length': soundfile_content_length})}
|
|
||||||
|
|
||||||
upload_started = time.time()
|
|
||||||
r = SessionTracing(propagate=True).post(
|
|
||||||
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
|
|
||||||
files=files
|
|
||||||
)
|
|
||||||
upload_time = time.time() - upload_started
|
|
||||||
|
|
||||||
if upload_time > 0.8:
|
|
||||||
current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
|
|
||||||
|
|
||||||
if r.status_code not in [200, 201]:
|
|
||||||
return abort(500,
|
|
||||||
f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
|
|
||||||
|
|
||||||
with opentracing.tracer.start_active_span('sqlalchemy.commit'):
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
# Announce only after the data is successfully committed
|
# Announce only after the data is successfully committed
|
||||||
with opentracing.tracer.start_active_span('publishMessage'):
|
with opentracing.tracer.start_active_span('publishInCacheMessage'):
|
||||||
try:
|
try:
|
||||||
magic_amqp.publish({'tag': generated_tag})
|
magic_amqp.publish_cache({
|
||||||
|
'tag': generated_tag,
|
||||||
|
'mime_type': soundfile.mimetype
|
||||||
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.exception(e)
|
current_app.logger.exception(e)
|
||||||
return abort(500, f"AMQP Publish error: {str(e)}")
|
return abort(500, f"AMQP Publish error: {str(e)}")
|
||||||
@ -145,90 +112,3 @@ class SampleResource(Resource):
|
|||||||
)
|
)
|
||||||
|
|
||||||
return {"tag": generated_tag}, 200
|
return {"tag": generated_tag}, 200
|
||||||
|
|
||||||
def get(self):
|
|
||||||
"""
|
|
||||||
Get all stored items
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
with opentracing.tracer.start_active_span('compileQuery'):
|
|
||||||
query = SampleMetadata.query
|
|
||||||
|
|
||||||
## Compile filters ##
|
|
||||||
|
|
||||||
filters = []
|
|
||||||
try:
|
|
||||||
first = int(request.args.get('first'))
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
first = None
|
|
||||||
else:
|
|
||||||
filters.append(
|
|
||||||
SampleMetadata.id >= first
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
after = datetime.fromisoformat(request.args.get('after'))
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
after = None
|
|
||||||
else:
|
|
||||||
filters.append(
|
|
||||||
SampleMetadata.timestamp > after
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
before = datetime.fromisoformat(request.args.get('before'))
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
before = None
|
|
||||||
else:
|
|
||||||
filters.append(
|
|
||||||
SampleMetadata.timestamp < before
|
|
||||||
)
|
|
||||||
|
|
||||||
if filters:
|
|
||||||
query = query.filter(db.and_(*filters))
|
|
||||||
|
|
||||||
try:
|
|
||||||
limit = int(request.args.get('limit'))
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
limit = None
|
|
||||||
else:
|
|
||||||
query = query.limit(limit)
|
|
||||||
|
|
||||||
## Run query ##
|
|
||||||
count = "count" in request.args
|
|
||||||
tags = {
|
|
||||||
"first": first,
|
|
||||||
"limit": limit,
|
|
||||||
"after": after,
|
|
||||||
"before": before
|
|
||||||
}
|
|
||||||
|
|
||||||
if count:
|
|
||||||
with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags):
|
|
||||||
rows = query.count()
|
|
||||||
|
|
||||||
return {"count": rows}, 200
|
|
||||||
else:
|
|
||||||
with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags):
|
|
||||||
samples = query.all()
|
|
||||||
|
|
||||||
return self.samplemetadataschema.dump(list(samples)), 200
|
|
||||||
|
|
||||||
|
|
||||||
class SampleParameterResource(Resource):
|
|
||||||
"""
|
|
||||||
Sample endpoint with parameters
|
|
||||||
"""
|
|
||||||
|
|
||||||
samplemetadataschema = SampleMetadataSchema(many=False)
|
|
||||||
|
|
||||||
def get(self, tag: str):
|
|
||||||
"""
|
|
||||||
Get a specific item
|
|
||||||
:param tag:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
|
|
||||||
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
|
|
||||||
|
|
||||||
return self.samplemetadataschema.dump(sample), 200
|
|
||||||
|
@ -1,7 +1,4 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
from flask_marshmallow.sqla import auto_field
|
|
||||||
|
|
||||||
from models import SampleMetadata
|
|
||||||
from marshm import ma
|
from marshm import ma
|
||||||
from marshmallow import fields
|
from marshmallow import fields
|
||||||
|
|
||||||
@ -26,13 +23,3 @@ class SampleSchema(ma.Schema):
|
|||||||
|
|
||||||
date = fields.DateTime(required=True)
|
date = fields.DateTime(required=True)
|
||||||
device_id = fields.Integer(required=True)
|
device_id = fields.Integer(required=True)
|
||||||
|
|
||||||
|
|
||||||
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
|
|
||||||
"""
|
|
||||||
Marshmallow schema generated
|
|
||||||
"""
|
|
||||||
class Meta:
|
|
||||||
model = SampleMetadata
|
|
||||||
exclude = ('timestamp', 'id', 'device_date')
|
|
||||||
date = auto_field("device_date", dump_only=False)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user