Compare commits
No commits in common. "2c1113baabe10b11ccec648862a2d528a177802c" and "cbaf2f298145d3ae8f513259ee1e117703a5cd8f" have entirely different histories.
2c1113baab
...
cbaf2f2981
@ -4,9 +4,13 @@ Flask~=2.0.1
|
||||
Flask-RESTful~=0.3.9
|
||||
requests~=2.26.0
|
||||
werkzeug
|
||||
sqlalchemy~=1.4.22
|
||||
flask_sqlalchemy~=2.5.1
|
||||
xeger~=0.3.5
|
||||
pika~=1.2.0
|
||||
psycopg2-binary
|
||||
marshmallow~=3.13.0
|
||||
marshmallow-sqlalchemy~=0.26.1
|
||||
flask-marshmallow
|
||||
py-healthcheck
|
||||
Flask-InfluxDB
|
||||
@ -14,8 +18,7 @@ tzdata
|
||||
tzlocal
|
||||
apscheduler~=3.7.0
|
||||
|
||||
flask-redis~=0.4.0
|
||||
|
||||
opentracing~=2.4.0
|
||||
jaeger-client
|
||||
requests-opentracing
|
||||
Flask-Opentracing
|
17
src/app.py
17
src/app.py
@ -3,14 +3,15 @@ from flask import Flask
|
||||
from flask_restful import Api
|
||||
import sentry_sdk
|
||||
from sentry_sdk.integrations.flask import FlaskIntegration
|
||||
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
||||
from healthcheck import HealthCheck
|
||||
|
||||
from redis_client import redis_client
|
||||
from config import Config
|
||||
from db import db
|
||||
from marshm import ma
|
||||
from influxus import influx_db
|
||||
from resources import SampleResource
|
||||
from healthchecks import amqp_connection_status
|
||||
from resources import SampleResource, SampleParameterResource
|
||||
from healthchecks import health_database_status, amqp_connection_status
|
||||
|
||||
import atexit
|
||||
|
||||
@ -34,7 +35,7 @@ __version__text__ = "1"
|
||||
if Config.SENTRY_DSN:
|
||||
sentry_sdk.init(
|
||||
dsn=Config.SENTRY_DSN,
|
||||
integrations=[FlaskIntegration()],
|
||||
integrations=[FlaskIntegration(), SqlalchemyIntegration()],
|
||||
traces_sample_rate=0.0,
|
||||
send_default_pii=True,
|
||||
release=Config.RELEASE_ID,
|
||||
@ -47,10 +48,9 @@ app.config.from_object(Config)
|
||||
|
||||
api = Api(app)
|
||||
health = HealthCheck()
|
||||
db.init_app(app)
|
||||
ma.init_app(app)
|
||||
|
||||
redis_client.init_app(app)
|
||||
|
||||
# ampq magic stuff
|
||||
magic_amqp.init_app(app)
|
||||
|
||||
@ -68,7 +68,7 @@ if Config.ENABLE_INFLUXDB:
|
||||
def init_db():
|
||||
if Config.ENABLE_INFLUXDB:
|
||||
influx_db.database.create(Config.INFLUXDB_DATABASE)
|
||||
|
||||
db.create_all()
|
||||
|
||||
# Setup tracing
|
||||
def initialize_tracer():
|
||||
@ -81,7 +81,9 @@ def initialize_tracer():
|
||||
tracing = FlaskTracing(initialize_tracer, True, app)
|
||||
|
||||
api.add_resource(SampleResource, "/input")
|
||||
api.add_resource(SampleParameterResource, '/input/<tag>')
|
||||
|
||||
health.add_check(health_database_status)
|
||||
health.add_check(amqp_connection_status)
|
||||
|
||||
register_all_error_handlers(app)
|
||||
@ -90,7 +92,6 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
|
||||
|
||||
if __name__ != '__main__':
|
||||
import logging
|
||||
|
||||
gunicorn_logger = logging.getLogger('gunicorn.error')
|
||||
app.logger.handlers = gunicorn_logger.handlers
|
||||
app.logger.setLevel(gunicorn_logger.level)
|
||||
|
@ -10,6 +10,12 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "app"
|
||||
__version__text__ = "1"
|
||||
|
||||
_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
|
||||
_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
|
||||
_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
|
||||
_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
|
||||
_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
|
||||
|
||||
|
||||
class Config:
|
||||
PORT = 8080
|
||||
@ -19,10 +25,8 @@ class Config:
|
||||
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
|
||||
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
|
||||
|
||||
REDIS_URL = os.environ['CACHE_REDIS_URL']
|
||||
|
||||
EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
|
||||
EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
|
||||
EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev")
|
||||
RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract")
|
||||
|
||||
FLASK_PIKA_PARAMS = {
|
||||
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
|
||||
@ -32,6 +36,10 @@ class Config:
|
||||
'virtual_host': '/'
|
||||
}
|
||||
|
||||
SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}"
|
||||
|
||||
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
|
||||
|
||||
ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
|
||||
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
|
||||
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
|
||||
|
13
src/db.py
Normal file
13
src/db.py
Normal file
@ -0,0 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
"""
|
||||
Database api
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "db"
|
||||
__version__text__ = "1"
|
||||
|
||||
db = SQLAlchemy()
|
@ -1,4 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from db import db
|
||||
from magic_amqp import magic_amqp
|
||||
|
||||
"""
|
||||
@ -11,6 +13,17 @@ __module_name__ = "healthchecks"
|
||||
__version__text__ = "1"
|
||||
|
||||
|
||||
def health_database_status():
|
||||
is_database_working = True
|
||||
output = 'database is ok'
|
||||
try:
|
||||
db.session.execute('SELECT 1')
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
is_database_working = False
|
||||
return is_database_working, output
|
||||
|
||||
|
||||
def amqp_connection_status():
|
||||
if magic_amqp.is_healthy():
|
||||
result = True
|
||||
|
@ -26,8 +26,8 @@ class MagicAMQP:
|
||||
def init_app(self, app: Flask):
|
||||
self.app = app
|
||||
self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
|
||||
self.app.config.setdefault('EXCHANGE_NAME_META', None)
|
||||
self.app.config.setdefault('EXCHANGE_NAME_CACHE', None)
|
||||
self.app.config.setdefault('EXCHANGE_NAME', None)
|
||||
self.app.config.setdefault('RABBITMQ_QUEUE', None)
|
||||
|
||||
self._credentials = pika.PlainCredentials(
|
||||
app.config['FLASK_PIKA_PARAMS']['username'],
|
||||
@ -46,11 +46,7 @@ class MagicAMQP:
|
||||
)
|
||||
self._pika_channel = self._pika_connection.channel()
|
||||
self._pika_channel.exchange_declare(
|
||||
exchange=self.app.config['EXCHANGE_NAME_META'],
|
||||
exchange_type='direct'
|
||||
)
|
||||
self._pika_channel.exchange_declare(
|
||||
exchange=self.app.config['EXCHANGE_NAME_CACHE'],
|
||||
exchange=self.app.config['EXCHANGE_NAME'],
|
||||
exchange_type='direct'
|
||||
)
|
||||
|
||||
@ -75,7 +71,7 @@ class MagicAMQP:
|
||||
if total_time > 1:
|
||||
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
|
||||
|
||||
def _publish(self, exchange: str, payload=None):
|
||||
def publish(self, payload=None):
|
||||
"""
|
||||
Publish a simple json serialized message to the configured queue.
|
||||
If the connection is broken, then this call will block until the connection is restored
|
||||
@ -93,7 +89,7 @@ class MagicAMQP:
|
||||
while True:
|
||||
try:
|
||||
self._pika_channel.basic_publish(
|
||||
exchange=exchange,
|
||||
exchange=self.app.config['EXCHANGE_NAME'],
|
||||
routing_key='feature',
|
||||
body=json.dumps(payload).encode('UTF-8')
|
||||
)
|
||||
@ -124,12 +120,6 @@ class MagicAMQP:
|
||||
if total_time > 0.4:
|
||||
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
|
||||
|
||||
def publish_cache(self, payload=None):
|
||||
return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
|
||||
|
||||
def publish_meta(self, payload=None):
|
||||
return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
|
||||
|
||||
def is_healthy(self) -> bool:
|
||||
with self._lock:
|
||||
if not self._pika_channel:
|
||||
|
25
src/models.py
Normal file
25
src/models.py
Normal file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python3
|
||||
from db import db
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
"""
|
||||
Flask Restful endpoints
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "models"
|
||||
__version__text__ = "1"
|
||||
|
||||
|
||||
class SampleMetadata(db.Model):
|
||||
"""
|
||||
SQLAlchemy model of metadata entries
|
||||
"""
|
||||
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
|
||||
timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
|
||||
|
||||
device_id = db.Column(db.Integer, nullable=False)
|
||||
device_date = db.Column(db.DateTime, nullable=False)
|
||||
|
||||
tag = db.Column(db.String(32), nullable=False, unique=True)
|
@ -1,4 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
from flask_redis import FlaskRedis
|
||||
|
||||
redis_client = FlaskRedis()
|
174
src/resources.py
174
src/resources.py
@ -1,14 +1,18 @@
|
||||
#!/usr/bin/env python3
|
||||
import io
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
import tzlocal
|
||||
from xeger import Xeger
|
||||
from flask_restful import Resource
|
||||
from flask import request, current_app, abort
|
||||
import requests
|
||||
from magic_amqp import magic_amqp
|
||||
from db import db
|
||||
from influxus import influx_db
|
||||
from schemas import SampleSchema
|
||||
from redis_client import redis_client
|
||||
from models import SampleMetadata
|
||||
from schemas import SampleSchema, SampleMetadataSchema
|
||||
from requests_opentracing import SessionTracing
|
||||
import opentracing
|
||||
|
||||
"""
|
||||
@ -28,6 +32,7 @@ class SampleResource(Resource):
|
||||
"""
|
||||
|
||||
sampleschema = SampleSchema(many=False)
|
||||
samplemetadataschema = SampleMetadataSchema(many=True)
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
@ -43,13 +48,13 @@ class SampleResource(Resource):
|
||||
if 'description' not in request.form:
|
||||
return abort(400, "no description found")
|
||||
else:
|
||||
description_raw = request.form.get("description")
|
||||
description = request.form.get("description")
|
||||
|
||||
if soundfile.content_type != 'audio/wave':
|
||||
current_app.logger.info(f"Input file was not WAV.")
|
||||
return abort(415, 'Input file not a wave file.')
|
||||
try:
|
||||
desc = self.sampleschema.loads(description_raw)
|
||||
desc = self.sampleschema.loads(description)
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
return abort(417, 'Input JSON schema invalid')
|
||||
@ -61,34 +66,62 @@ class SampleResource(Resource):
|
||||
if len(generated_tag) > 2: # Ensure minimum length
|
||||
break
|
||||
|
||||
with opentracing.tracer.start_active_span('publishMetaMessage'):
|
||||
try:
|
||||
magic_amqp.publish_meta(
|
||||
{
|
||||
'tag': generated_tag,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'device_id': desc['device_id'],
|
||||
'device_date': desc['date'].isoformat()
|
||||
}
|
||||
# Handle mega-autismo-cliento
|
||||
soundfile_content_length = soundfile.content_length
|
||||
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
|
||||
with opentracing.tracer.start_active_span(
|
||||
'calculateContentLength'): # In an ideal scenario this span is missing
|
||||
current_app.logger.debug(
|
||||
"The uploader did not provide content-length for the sound file... Calculating manually..."
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
return abort(500, f"AMQP Publish error: {str(e)}")
|
||||
# So, this is a seekable stream, so we just seek to the end
|
||||
old_ptr = soundfile.tell()
|
||||
soundfile.seek(0, 2)
|
||||
# Check where is the end (= content length)
|
||||
soundfile_content_length = soundfile.tell()
|
||||
# Seek back to where the stream was
|
||||
soundfile.seek(old_ptr, 0)
|
||||
|
||||
with opentracing.tracer.start_active_span('readSampleToMemory'):
|
||||
buf = io.BytesIO()
|
||||
soundfile.save(buf)
|
||||
# It's insane, that you can not set this field in curl
|
||||
|
||||
with opentracing.tracer.start_active_span('putToCache'):
|
||||
redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue
|
||||
with opentracing.tracer.start_active_span('sqlalchemy.create'):
|
||||
record = SampleMetadata(
|
||||
device_id=desc['device_id'],
|
||||
device_date=desc['date'],
|
||||
tag=generated_tag
|
||||
)
|
||||
db.session.add(record)
|
||||
|
||||
with opentracing.tracer.start_active_span('uploadToStorageService'):
|
||||
files = {
|
||||
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
|
||||
'soundFile': (
|
||||
'wave.wav',
|
||||
soundfile,
|
||||
soundfile.content_type,
|
||||
{'Content-Length': soundfile_content_length})}
|
||||
|
||||
upload_started = time.time()
|
||||
r = SessionTracing(propagate=True).post(
|
||||
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
|
||||
files=files
|
||||
)
|
||||
upload_time = time.time() - upload_started
|
||||
|
||||
if upload_time > 0.8:
|
||||
current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
|
||||
|
||||
if r.status_code not in [200, 201]:
|
||||
return abort(500,
|
||||
f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
|
||||
|
||||
with opentracing.tracer.start_active_span('sqlalchemy.commit'):
|
||||
db.session.commit()
|
||||
|
||||
# Announce only after the data is successfully committed
|
||||
with opentracing.tracer.start_active_span('publishInCacheMessage'):
|
||||
with opentracing.tracer.start_active_span('publishMessage'):
|
||||
try:
|
||||
magic_amqp.publish_cache({
|
||||
'tag': generated_tag,
|
||||
'mime_type': soundfile.mimetype
|
||||
})
|
||||
magic_amqp.publish({'tag': generated_tag})
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
return abort(500, f"AMQP Publish error: {str(e)}")
|
||||
@ -112,3 +145,90 @@ class SampleResource(Resource):
|
||||
)
|
||||
|
||||
return {"tag": generated_tag}, 200
|
||||
|
||||
def get(self):
|
||||
"""
|
||||
Get all stored items
|
||||
:return:
|
||||
"""
|
||||
with opentracing.tracer.start_active_span('compileQuery'):
|
||||
query = SampleMetadata.query
|
||||
|
||||
## Compile filters ##
|
||||
|
||||
filters = []
|
||||
try:
|
||||
first = int(request.args.get('first'))
|
||||
except (ValueError, TypeError):
|
||||
first = None
|
||||
else:
|
||||
filters.append(
|
||||
SampleMetadata.id >= first
|
||||
)
|
||||
|
||||
try:
|
||||
after = datetime.fromisoformat(request.args.get('after'))
|
||||
except (ValueError, TypeError):
|
||||
after = None
|
||||
else:
|
||||
filters.append(
|
||||
SampleMetadata.timestamp > after
|
||||
)
|
||||
|
||||
try:
|
||||
before = datetime.fromisoformat(request.args.get('before'))
|
||||
except (ValueError, TypeError):
|
||||
before = None
|
||||
else:
|
||||
filters.append(
|
||||
SampleMetadata.timestamp < before
|
||||
)
|
||||
|
||||
if filters:
|
||||
query = query.filter(db.and_(*filters))
|
||||
|
||||
try:
|
||||
limit = int(request.args.get('limit'))
|
||||
except (ValueError, TypeError):
|
||||
limit = None
|
||||
else:
|
||||
query = query.limit(limit)
|
||||
|
||||
## Run query ##
|
||||
count = "count" in request.args
|
||||
tags = {
|
||||
"first": first,
|
||||
"limit": limit,
|
||||
"after": after,
|
||||
"before": before
|
||||
}
|
||||
|
||||
if count:
|
||||
with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags):
|
||||
rows = query.count()
|
||||
|
||||
return {"count": rows}, 200
|
||||
else:
|
||||
with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags):
|
||||
samples = query.all()
|
||||
|
||||
return self.samplemetadataschema.dump(list(samples)), 200
|
||||
|
||||
|
||||
class SampleParameterResource(Resource):
|
||||
"""
|
||||
Sample endpoint with parameters
|
||||
"""
|
||||
|
||||
samplemetadataschema = SampleMetadataSchema(many=False)
|
||||
|
||||
def get(self, tag: str):
|
||||
"""
|
||||
Get a specific item
|
||||
:param tag:
|
||||
:return:
|
||||
"""
|
||||
with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
|
||||
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
|
||||
|
||||
return self.samplemetadataschema.dump(sample), 200
|
||||
|
@ -1,4 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
from flask_marshmallow.sqla import auto_field
|
||||
|
||||
from models import SampleMetadata
|
||||
from marshm import ma
|
||||
from marshmallow import fields
|
||||
|
||||
@ -23,3 +26,13 @@ class SampleSchema(ma.Schema):
|
||||
|
||||
date = fields.DateTime(required=True)
|
||||
device_id = fields.Integer(required=True)
|
||||
|
||||
|
||||
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
|
||||
"""
|
||||
Marshmallow schema generated
|
||||
"""
|
||||
class Meta:
|
||||
model = SampleMetadata
|
||||
exclude = ('timestamp', 'id', 'device_date')
|
||||
date = auto_field("device_date", dump_only=False)
|
||||
|
Loading…
Reference in New Issue
Block a user