Compare commits

..

No commits in common. "2c1113baabe10b11ccec648862a2d528a177802c" and "cbaf2f298145d3ae8f513259ee1e117703a5cd8f" have entirely different histories.

10 changed files with 242 additions and 60 deletions

View File

@ -4,9 +4,13 @@ Flask~=2.0.1
Flask-RESTful~=0.3.9
requests~=2.26.0
werkzeug
sqlalchemy~=1.4.22
flask_sqlalchemy~=2.5.1
xeger~=0.3.5
pika~=1.2.0
psycopg2-binary
marshmallow~=3.13.0
marshmallow-sqlalchemy~=0.26.1
flask-marshmallow
py-healthcheck
Flask-InfluxDB
@ -14,8 +18,7 @@ tzdata
tzlocal
apscheduler~=3.7.0
flask-redis~=0.4.0
opentracing~=2.4.0
jaeger-client
requests-opentracing
Flask-Opentracing

View File

@ -3,14 +3,15 @@ from flask import Flask
from flask_restful import Api
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from healthcheck import HealthCheck
from redis_client import redis_client
from config import Config
from db import db
from marshm import ma
from influxus import influx_db
from resources import SampleResource
from healthchecks import amqp_connection_status
from resources import SampleResource, SampleParameterResource
from healthchecks import health_database_status, amqp_connection_status
import atexit
@ -34,7 +35,7 @@ __version__text__ = "1"
if Config.SENTRY_DSN:
sentry_sdk.init(
dsn=Config.SENTRY_DSN,
integrations=[FlaskIntegration()],
integrations=[FlaskIntegration(), SqlalchemyIntegration()],
traces_sample_rate=0.0,
send_default_pii=True,
release=Config.RELEASE_ID,
@ -47,10 +48,9 @@ app.config.from_object(Config)
api = Api(app)
health = HealthCheck()
db.init_app(app)
ma.init_app(app)
redis_client.init_app(app)
# ampq magic stuff
magic_amqp.init_app(app)
@ -68,7 +68,7 @@ if Config.ENABLE_INFLUXDB:
def init_db():
if Config.ENABLE_INFLUXDB:
influx_db.database.create(Config.INFLUXDB_DATABASE)
db.create_all()
# Setup tracing
def initialize_tracer():
@ -81,7 +81,9 @@ def initialize_tracer():
tracing = FlaskTracing(initialize_tracer, True, app)
api.add_resource(SampleResource, "/input")
api.add_resource(SampleParameterResource, '/input/<tag>')
health.add_check(health_database_status)
health.add_check(amqp_connection_status)
register_all_error_handlers(app)
@ -90,7 +92,6 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
if __name__ != '__main__':
import logging
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)

View File

@ -10,6 +10,12 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "app"
__version__text__ = "1"
_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
class Config:
PORT = 8080
@ -19,10 +25,8 @@ class Config:
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
REDIS_URL = os.environ['CACHE_REDIS_URL']
EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev")
RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract")
FLASK_PIKA_PARAMS = {
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
@ -32,6 +36,10 @@ class Config:
'virtual_host': '/'
}
SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}"
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")

13
src/db.py Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
from flask_sqlalchemy import SQLAlchemy
"""
Database api
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "db"
__version__text__ = "1"
db = SQLAlchemy()

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3
from db import db
from magic_amqp import magic_amqp
"""
@ -11,6 +13,17 @@ __module_name__ = "healthchecks"
__version__text__ = "1"
def health_database_status():
is_database_working = True
output = 'database is ok'
try:
db.session.execute('SELECT 1')
except Exception as e:
output = str(e)
is_database_working = False
return is_database_working, output
def amqp_connection_status():
if magic_amqp.is_healthy():
result = True

View File

@ -26,8 +26,8 @@ class MagicAMQP:
def init_app(self, app: Flask):
self.app = app
self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
self.app.config.setdefault('EXCHANGE_NAME_META', None)
self.app.config.setdefault('EXCHANGE_NAME_CACHE', None)
self.app.config.setdefault('EXCHANGE_NAME', None)
self.app.config.setdefault('RABBITMQ_QUEUE', None)
self._credentials = pika.PlainCredentials(
app.config['FLASK_PIKA_PARAMS']['username'],
@ -46,11 +46,7 @@ class MagicAMQP:
)
self._pika_channel = self._pika_connection.channel()
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_META'],
exchange_type='direct'
)
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_CACHE'],
exchange=self.app.config['EXCHANGE_NAME'],
exchange_type='direct'
)
@ -75,7 +71,7 @@ class MagicAMQP:
if total_time > 1:
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
def _publish(self, exchange: str, payload=None):
def publish(self, payload=None):
"""
Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored
@ -93,7 +89,7 @@ class MagicAMQP:
while True:
try:
self._pika_channel.basic_publish(
exchange=exchange,
exchange=self.app.config['EXCHANGE_NAME'],
routing_key='feature',
body=json.dumps(payload).encode('UTF-8')
)
@ -124,12 +120,6 @@ class MagicAMQP:
if total_time > 0.4:
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
def publish_cache(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
def publish_meta(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
def is_healthy(self) -> bool:
with self._lock:
if not self._pika_channel:

25
src/models.py Normal file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
from db import db
from sqlalchemy.sql import func
"""
Flask Restful endpoints
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "models"
__version__text__ = "1"
class SampleMetadata(db.Model):
"""
SQLAlchemy model of metadata entries
"""
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
device_id = db.Column(db.Integer, nullable=False)
device_date = db.Column(db.DateTime, nullable=False)
tag = db.Column(db.String(32), nullable=False, unique=True)

View File

@ -1,4 +0,0 @@
#!/usr/bin/env python3
from flask_redis import FlaskRedis
redis_client = FlaskRedis()

View File

@ -1,14 +1,18 @@
#!/usr/bin/env python3
import io
import json
import time
from datetime import datetime
import tzlocal
from xeger import Xeger
from flask_restful import Resource
from flask import request, current_app, abort
import requests
from magic_amqp import magic_amqp
from db import db
from influxus import influx_db
from schemas import SampleSchema
from redis_client import redis_client
from models import SampleMetadata
from schemas import SampleSchema, SampleMetadataSchema
from requests_opentracing import SessionTracing
import opentracing
"""
@ -28,6 +32,7 @@ class SampleResource(Resource):
"""
sampleschema = SampleSchema(many=False)
samplemetadataschema = SampleMetadataSchema(many=True)
def post(self):
"""
@ -43,13 +48,13 @@ class SampleResource(Resource):
if 'description' not in request.form:
return abort(400, "no description found")
else:
description_raw = request.form.get("description")
description = request.form.get("description")
if soundfile.content_type != 'audio/wave':
current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.')
try:
desc = self.sampleschema.loads(description_raw)
desc = self.sampleschema.loads(description)
except Exception as e:
current_app.logger.exception(e)
return abort(417, 'Input JSON schema invalid')
@ -61,34 +66,62 @@ class SampleResource(Resource):
if len(generated_tag) > 2: # Ensure minimum length
break
with opentracing.tracer.start_active_span('publishMetaMessage'):
try:
magic_amqp.publish_meta(
{
'tag': generated_tag,
'timestamp': datetime.now().isoformat(),
'device_id': desc['device_id'],
'device_date': desc['date'].isoformat()
}
# Handle mega-autismo-cliento
soundfile_content_length = soundfile.content_length
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
with opentracing.tracer.start_active_span(
'calculateContentLength'): # In an ideal scenario this span is missing
current_app.logger.debug(
"The uploader did not provide content-length for the sound file... Calculating manually..."
)
except Exception as e:
current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
# So, this is a seekable stream, so we just seek to the end
old_ptr = soundfile.tell()
soundfile.seek(0, 2)
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
with opentracing.tracer.start_active_span('readSampleToMemory'):
buf = io.BytesIO()
soundfile.save(buf)
# It's insane, that you can not set this field in curl
with opentracing.tracer.start_active_span('putToCache'):
redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue
with opentracing.tracer.start_active_span('sqlalchemy.create'):
record = SampleMetadata(
device_id=desc['device_id'],
device_date=desc['date'],
tag=generated_tag
)
db.session.add(record)
with opentracing.tracer.start_active_span('uploadToStorageService'):
files = {
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile_content_length})}
upload_started = time.time()
r = SessionTracing(propagate=True).post(
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
files=files
)
upload_time = time.time() - upload_started
if upload_time > 0.8:
current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
if r.status_code not in [200, 201]:
return abort(500,
f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
with opentracing.tracer.start_active_span('sqlalchemy.commit'):
db.session.commit()
# Announce only after the data is successfully committed
with opentracing.tracer.start_active_span('publishInCacheMessage'):
with opentracing.tracer.start_active_span('publishMessage'):
try:
magic_amqp.publish_cache({
'tag': generated_tag,
'mime_type': soundfile.mimetype
})
magic_amqp.publish({'tag': generated_tag})
except Exception as e:
current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
@ -112,3 +145,90 @@ class SampleResource(Resource):
)
return {"tag": generated_tag}, 200
def get(self):
"""
Get all stored items
:return:
"""
with opentracing.tracer.start_active_span('compileQuery'):
query = SampleMetadata.query
## Compile filters ##
filters = []
try:
first = int(request.args.get('first'))
except (ValueError, TypeError):
first = None
else:
filters.append(
SampleMetadata.id >= first
)
try:
after = datetime.fromisoformat(request.args.get('after'))
except (ValueError, TypeError):
after = None
else:
filters.append(
SampleMetadata.timestamp > after
)
try:
before = datetime.fromisoformat(request.args.get('before'))
except (ValueError, TypeError):
before = None
else:
filters.append(
SampleMetadata.timestamp < before
)
if filters:
query = query.filter(db.and_(*filters))
try:
limit = int(request.args.get('limit'))
except (ValueError, TypeError):
limit = None
else:
query = query.limit(limit)
## Run query ##
count = "count" in request.args
tags = {
"first": first,
"limit": limit,
"after": after,
"before": before
}
if count:
with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags):
rows = query.count()
return {"count": rows}, 200
else:
with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags):
samples = query.all()
return self.samplemetadataschema.dump(list(samples)), 200
class SampleParameterResource(Resource):
"""
Sample endpoint with parameters
"""
samplemetadataschema = SampleMetadataSchema(many=False)
def get(self, tag: str):
"""
Get a specific item
:param tag:
:return:
"""
with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
return self.samplemetadataschema.dump(sample), 200

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3
from flask_marshmallow.sqla import auto_field
from models import SampleMetadata
from marshm import ma
from marshmallow import fields
@ -23,3 +26,13 @@ class SampleSchema(ma.Schema):
date = fields.DateTime(required=True)
device_id = fields.Integer(required=True)
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
"""
Marshmallow schema generated
"""
class Meta:
model = SampleMetadata
exclude = ('timestamp', 'id', 'device_date')
date = auto_field("device_date", dump_only=False)