Compare commits

..

No commits in common. "2c1113baabe10b11ccec648862a2d528a177802c" and "cbaf2f298145d3ae8f513259ee1e117703a5cd8f" have entirely different histories.

10 changed files with 242 additions and 60 deletions

View File

@ -4,9 +4,13 @@ Flask~=2.0.1
Flask-RESTful~=0.3.9 Flask-RESTful~=0.3.9
requests~=2.26.0 requests~=2.26.0
werkzeug werkzeug
sqlalchemy~=1.4.22
flask_sqlalchemy~=2.5.1
xeger~=0.3.5 xeger~=0.3.5
pika~=1.2.0 pika~=1.2.0
psycopg2-binary
marshmallow~=3.13.0 marshmallow~=3.13.0
marshmallow-sqlalchemy~=0.26.1
flask-marshmallow flask-marshmallow
py-healthcheck py-healthcheck
Flask-InfluxDB Flask-InfluxDB
@ -14,8 +18,7 @@ tzdata
tzlocal tzlocal
apscheduler~=3.7.0 apscheduler~=3.7.0
flask-redis~=0.4.0
opentracing~=2.4.0 opentracing~=2.4.0
jaeger-client jaeger-client
requests-opentracing
Flask-Opentracing Flask-Opentracing

View File

@ -3,14 +3,15 @@ from flask import Flask
from flask_restful import Api from flask_restful import Api
import sentry_sdk import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from healthcheck import HealthCheck from healthcheck import HealthCheck
from redis_client import redis_client
from config import Config from config import Config
from db import db
from marshm import ma from marshm import ma
from influxus import influx_db from influxus import influx_db
from resources import SampleResource from resources import SampleResource, SampleParameterResource
from healthchecks import amqp_connection_status from healthchecks import health_database_status, amqp_connection_status
import atexit import atexit
@ -34,7 +35,7 @@ __version__text__ = "1"
if Config.SENTRY_DSN: if Config.SENTRY_DSN:
sentry_sdk.init( sentry_sdk.init(
dsn=Config.SENTRY_DSN, dsn=Config.SENTRY_DSN,
integrations=[FlaskIntegration()], integrations=[FlaskIntegration(), SqlalchemyIntegration()],
traces_sample_rate=0.0, traces_sample_rate=0.0,
send_default_pii=True, send_default_pii=True,
release=Config.RELEASE_ID, release=Config.RELEASE_ID,
@ -47,10 +48,9 @@ app.config.from_object(Config)
api = Api(app) api = Api(app)
health = HealthCheck() health = HealthCheck()
db.init_app(app)
ma.init_app(app) ma.init_app(app)
redis_client.init_app(app)
# ampq magic stuff # ampq magic stuff
magic_amqp.init_app(app) magic_amqp.init_app(app)
@ -68,7 +68,7 @@ if Config.ENABLE_INFLUXDB:
def init_db(): def init_db():
if Config.ENABLE_INFLUXDB: if Config.ENABLE_INFLUXDB:
influx_db.database.create(Config.INFLUXDB_DATABASE) influx_db.database.create(Config.INFLUXDB_DATABASE)
db.create_all()
# Setup tracing # Setup tracing
def initialize_tracer(): def initialize_tracer():
@ -81,7 +81,9 @@ def initialize_tracer():
tracing = FlaskTracing(initialize_tracer, True, app) tracing = FlaskTracing(initialize_tracer, True, app)
api.add_resource(SampleResource, "/input") api.add_resource(SampleResource, "/input")
api.add_resource(SampleParameterResource, '/input/<tag>')
health.add_check(health_database_status)
health.add_check(amqp_connection_status) health.add_check(amqp_connection_status)
register_all_error_handlers(app) register_all_error_handlers(app)
@ -90,7 +92,6 @@ app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
if __name__ != '__main__': if __name__ != '__main__':
import logging import logging
gunicorn_logger = logging.getLogger('gunicorn.error') gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level) app.logger.setLevel(gunicorn_logger.level)

View File

@ -10,6 +10,12 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "app" __module_name__ = "app"
__version__text__ = "1" __version__text__ = "1"
_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
class Config: class Config:
PORT = 8080 PORT = 8080
@ -19,10 +25,8 @@ class Config:
RELEASE_ID = os.environ.get("RELEASE_ID", "test") RELEASE_ID = os.environ.get("RELEASE_ID", "test")
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev") RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
REDIS_URL = os.environ['CACHE_REDIS_URL'] EXCHANGE_NAME = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev")
RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract")
EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
FLASK_PIKA_PARAMS = { FLASK_PIKA_PARAMS = {
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"), 'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
@ -32,6 +36,10 @@ class Config:
'virtual_host': '/' 'virtual_host': '/'
} }
SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}"
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"] ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx") INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086") INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")

13
src/db.py Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
from flask_sqlalchemy import SQLAlchemy
"""
Database api
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "db"
__version__text__ = "1"
db = SQLAlchemy()

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from db import db
from magic_amqp import magic_amqp from magic_amqp import magic_amqp
""" """
@ -11,6 +13,17 @@ __module_name__ = "healthchecks"
__version__text__ = "1" __version__text__ = "1"
def health_database_status():
is_database_working = True
output = 'database is ok'
try:
db.session.execute('SELECT 1')
except Exception as e:
output = str(e)
is_database_working = False
return is_database_working, output
def amqp_connection_status(): def amqp_connection_status():
if magic_amqp.is_healthy(): if magic_amqp.is_healthy():
result = True result = True

View File

@ -26,8 +26,8 @@ class MagicAMQP:
def init_app(self, app: Flask): def init_app(self, app: Flask):
self.app = app self.app = app
self.app.config.setdefault('FLASK_PIKA_PARAMS', {}) self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
self.app.config.setdefault('EXCHANGE_NAME_META', None) self.app.config.setdefault('EXCHANGE_NAME', None)
self.app.config.setdefault('EXCHANGE_NAME_CACHE', None) self.app.config.setdefault('RABBITMQ_QUEUE', None)
self._credentials = pika.PlainCredentials( self._credentials = pika.PlainCredentials(
app.config['FLASK_PIKA_PARAMS']['username'], app.config['FLASK_PIKA_PARAMS']['username'],
@ -46,11 +46,7 @@ class MagicAMQP:
) )
self._pika_channel = self._pika_connection.channel() self._pika_channel = self._pika_connection.channel()
self._pika_channel.exchange_declare( self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_META'], exchange=self.app.config['EXCHANGE_NAME'],
exchange_type='direct'
)
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_CACHE'],
exchange_type='direct' exchange_type='direct'
) )
@ -75,7 +71,7 @@ class MagicAMQP:
if total_time > 1: if total_time > 1:
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
def _publish(self, exchange: str, payload=None): def publish(self, payload=None):
""" """
Publish a simple json serialized message to the configured queue. Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored If the connection is broken, then this call will block until the connection is restored
@ -93,7 +89,7 @@ class MagicAMQP:
while True: while True:
try: try:
self._pika_channel.basic_publish( self._pika_channel.basic_publish(
exchange=exchange, exchange=self.app.config['EXCHANGE_NAME'],
routing_key='feature', routing_key='feature',
body=json.dumps(payload).encode('UTF-8') body=json.dumps(payload).encode('UTF-8')
) )
@ -124,12 +120,6 @@ class MagicAMQP:
if total_time > 0.4: if total_time > 0.4:
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
def publish_cache(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
def publish_meta(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
def is_healthy(self) -> bool: def is_healthy(self) -> bool:
with self._lock: with self._lock:
if not self._pika_channel: if not self._pika_channel:

25
src/models.py Normal file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env python3
from db import db
from sqlalchemy.sql import func
"""
Flask Restful endpoints
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "models"
__version__text__ = "1"
class SampleMetadata(db.Model):
"""
SQLAlchemy model of metadata entries
"""
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
device_id = db.Column(db.Integer, nullable=False)
device_date = db.Column(db.DateTime, nullable=False)
tag = db.Column(db.String(32), nullable=False, unique=True)

View File

@ -1,4 +0,0 @@
#!/usr/bin/env python3
from flask_redis import FlaskRedis
redis_client = FlaskRedis()

View File

@ -1,14 +1,18 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import io import json
import time
from datetime import datetime from datetime import datetime
import tzlocal import tzlocal
from xeger import Xeger from xeger import Xeger
from flask_restful import Resource from flask_restful import Resource
from flask import request, current_app, abort from flask import request, current_app, abort
import requests
from magic_amqp import magic_amqp from magic_amqp import magic_amqp
from db import db
from influxus import influx_db from influxus import influx_db
from schemas import SampleSchema from models import SampleMetadata
from redis_client import redis_client from schemas import SampleSchema, SampleMetadataSchema
from requests_opentracing import SessionTracing
import opentracing import opentracing
""" """
@ -28,6 +32,7 @@ class SampleResource(Resource):
""" """
sampleschema = SampleSchema(many=False) sampleschema = SampleSchema(many=False)
samplemetadataschema = SampleMetadataSchema(many=True)
def post(self): def post(self):
""" """
@ -43,13 +48,13 @@ class SampleResource(Resource):
if 'description' not in request.form: if 'description' not in request.form:
return abort(400, "no description found") return abort(400, "no description found")
else: else:
description_raw = request.form.get("description") description = request.form.get("description")
if soundfile.content_type != 'audio/wave': if soundfile.content_type != 'audio/wave':
current_app.logger.info(f"Input file was not WAV.") current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.') return abort(415, 'Input file not a wave file.')
try: try:
desc = self.sampleschema.loads(description_raw) desc = self.sampleschema.loads(description)
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
return abort(417, 'Input JSON schema invalid') return abort(417, 'Input JSON schema invalid')
@ -61,34 +66,62 @@ class SampleResource(Resource):
if len(generated_tag) > 2: # Ensure minimum length if len(generated_tag) > 2: # Ensure minimum length
break break
with opentracing.tracer.start_active_span('publishMetaMessage'): # Handle mega-autismo-cliento
try: soundfile_content_length = soundfile.content_length
magic_amqp.publish_meta( if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
{ with opentracing.tracer.start_active_span(
'tag': generated_tag, 'calculateContentLength'): # In an ideal scenario this span is missing
'timestamp': datetime.now().isoformat(), current_app.logger.debug(
'device_id': desc['device_id'], "The uploader did not provide content-length for the sound file... Calculating manually..."
'device_date': desc['date'].isoformat()
}
) )
except Exception as e: # So, this is a seekable stream, so we just seek to the end
current_app.logger.exception(e) old_ptr = soundfile.tell()
return abort(500, f"AMQP Publish error: {str(e)}") soundfile.seek(0, 2)
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
with opentracing.tracer.start_active_span('readSampleToMemory'): # It's insane, that you can not set this field in curl
buf = io.BytesIO()
soundfile.save(buf)
with opentracing.tracer.start_active_span('putToCache'): with opentracing.tracer.start_active_span('sqlalchemy.create'):
redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue record = SampleMetadata(
device_id=desc['device_id'],
device_date=desc['date'],
tag=generated_tag
)
db.session.add(record)
with opentracing.tracer.start_active_span('uploadToStorageService'):
files = {
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile_content_length})}
upload_started = time.time()
r = SessionTracing(propagate=True).post(
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
files=files
)
upload_time = time.time() - upload_started
if upload_time > 0.8:
current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
if r.status_code not in [200, 201]:
return abort(500,
f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
with opentracing.tracer.start_active_span('sqlalchemy.commit'):
db.session.commit()
# Announce only after the data is successfully committed # Announce only after the data is successfully committed
with opentracing.tracer.start_active_span('publishInCacheMessage'): with opentracing.tracer.start_active_span('publishMessage'):
try: try:
magic_amqp.publish_cache({ magic_amqp.publish({'tag': generated_tag})
'tag': generated_tag,
'mime_type': soundfile.mimetype
})
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}") return abort(500, f"AMQP Publish error: {str(e)}")
@ -112,3 +145,90 @@ class SampleResource(Resource):
) )
return {"tag": generated_tag}, 200 return {"tag": generated_tag}, 200
def get(self):
"""
Get all stored items
:return:
"""
with opentracing.tracer.start_active_span('compileQuery'):
query = SampleMetadata.query
## Compile filters ##
filters = []
try:
first = int(request.args.get('first'))
except (ValueError, TypeError):
first = None
else:
filters.append(
SampleMetadata.id >= first
)
try:
after = datetime.fromisoformat(request.args.get('after'))
except (ValueError, TypeError):
after = None
else:
filters.append(
SampleMetadata.timestamp > after
)
try:
before = datetime.fromisoformat(request.args.get('before'))
except (ValueError, TypeError):
before = None
else:
filters.append(
SampleMetadata.timestamp < before
)
if filters:
query = query.filter(db.and_(*filters))
try:
limit = int(request.args.get('limit'))
except (ValueError, TypeError):
limit = None
else:
query = query.limit(limit)
## Run query ##
count = "count" in request.args
tags = {
"first": first,
"limit": limit,
"after": after,
"before": before
}
if count:
with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags):
rows = query.count()
return {"count": rows}, 200
else:
with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags):
samples = query.all()
return self.samplemetadataschema.dump(list(samples)), 200
class SampleParameterResource(Resource):
"""
Sample endpoint with parameters
"""
samplemetadataschema = SampleMetadataSchema(many=False)
def get(self, tag: str):
"""
Get a specific item
:param tag:
:return:
"""
with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
return self.samplemetadataschema.dump(sample), 200

View File

@ -1,4 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from flask_marshmallow.sqla import auto_field
from models import SampleMetadata
from marshm import ma from marshm import ma
from marshmallow import fields from marshmallow import fields
@ -23,3 +26,13 @@ class SampleSchema(ma.Schema):
date = fields.DateTime(required=True) date = fields.DateTime(required=True)
device_id = fields.Integer(required=True) device_id = fields.Integer(required=True)
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
"""
Marshmallow schema generated
"""
class Meta:
model = SampleMetadata
exclude = ('timestamp', 'id', 'device_date')
date = auto_field("device_date", dump_only=False)