Compare commits
32 Commits
5b8d88339f
...
influx
Author | SHA1 | Date | |
---|---|---|---|
25aac079d1 | |||
7e61ee8a6e | |||
95b557e8ba | |||
0c4b036b47 | |||
8e0b252c92
|
|||
211be6cf96
|
|||
92884b6760 | |||
a08528fd99 | |||
a1ee937f32 | |||
5d5bb9cd92 | |||
5b736b2844 | |||
29eae6bef0 | |||
b22236ece3
|
|||
5e29de0c44
|
|||
5a64c72cc9
|
|||
edc22a4e0a
|
|||
b47d9df26d
|
|||
f6273afe7b
|
|||
dadb6508b3
|
|||
483c97e980
|
|||
c78d987c72 | |||
3563234d70
|
|||
099d6adffa | |||
ccfd09f239 | |||
6da9abf3af | |||
8364b84450 | |||
202d065116 | |||
9fb474d3be | |||
9a7258cb4d
|
|||
d3a8057820 | |||
f244721cd1
|
|||
80d115e488
|
52
.drone.yml
52
.drone.yml
@ -3,30 +3,6 @@ type: docker
|
||||
name: default
|
||||
|
||||
steps:
|
||||
- name: restore-cache-with-filesystem
|
||||
image: meltwater/drone-cache
|
||||
settings:
|
||||
backend: "filesystem"
|
||||
restore: true
|
||||
cache_key: "{{ .Repo.Name }}"
|
||||
archive_format: "gzip"
|
||||
filesystem_cache_root: "/tmp/cache"
|
||||
mount:
|
||||
- '.pipcache'
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /tmp/cache
|
||||
|
||||
- name: static_analysis
|
||||
image: "python:3.8"
|
||||
commands:
|
||||
- pip3 install --cache-dir='./.pipcache' pylint bandit mccabe
|
||||
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
|
||||
- find . -name "*.py" -exec python3 -m py_compile '{}' \;
|
||||
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
|
||||
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
|
||||
- bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi
|
||||
|
||||
- name: code-analysis
|
||||
image: aosapps/drone-sonar-plugin
|
||||
settings:
|
||||
@ -35,21 +11,6 @@ steps:
|
||||
sonar_token:
|
||||
from_secret: SONAR_CODE
|
||||
|
||||
- name: rebuild-cache-with-filesystem
|
||||
image: meltwater/drone-cache:dev
|
||||
pull: true
|
||||
settings:
|
||||
backend: "filesystem"
|
||||
rebuild: true
|
||||
cache_key: "{{ .Repo.Name }}"
|
||||
archive_format: "gzip"
|
||||
filesystem_cache_root: "/tmp/cache"
|
||||
mount:
|
||||
- '.pipcache'
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /tmp/cache
|
||||
|
||||
- name: kaniko
|
||||
image: banzaicloud/drone-kaniko
|
||||
settings:
|
||||
@ -63,14 +24,6 @@ steps:
|
||||
- latest
|
||||
- ${DRONE_BUILD_NUMBER}
|
||||
|
||||
- name: sentry
|
||||
image: tormachris/drone-sentry
|
||||
settings:
|
||||
sentry_project: ${DRONE_REPO_NAME}
|
||||
sentry_domain: sentry.kmlabz.com
|
||||
sentry_token:
|
||||
from_secret: SENTRY_TOKEN
|
||||
|
||||
- name: ms-teams
|
||||
image: kuperiu/drone-teams
|
||||
settings:
|
||||
@ -78,8 +31,3 @@ steps:
|
||||
from_secret: TEAMS_WEBHOOK
|
||||
when:
|
||||
status: [ failure ]
|
||||
|
||||
volumes:
|
||||
- name: cache
|
||||
host:
|
||||
path: "/tmp/cache"
|
||||
|
@ -1,4 +1,4 @@
|
||||
FROM python:3.8-slim
|
||||
FROM python:3.9-slim
|
||||
|
||||
ENV TZ Europe/Budapest
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
@ -20,6 +20,7 @@ spec:
|
||||
containers:
|
||||
- name: input-service
|
||||
image: registry.kmlabz.com/birbnetesgit/input-service
|
||||
imagePullPolicy: Always
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: input-service
|
||||
|
@ -12,3 +12,7 @@ psycopg2-binary
|
||||
marshmallow
|
||||
marshmallow-sqlalchemy
|
||||
flask-marshmallow
|
||||
py-healthcheck
|
||||
Flask-InfluxDB
|
||||
tzdata
|
||||
tzlocal
|
51
src/app.py
51
src/app.py
@ -1,18 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
from flask import Flask
|
||||
from flask_restful import Api
|
||||
import sentry_sdk
|
||||
from sentry_sdk.integrations.flask import FlaskIntegration
|
||||
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
||||
from healthcheck import HealthCheck
|
||||
|
||||
from config import *
|
||||
from db import db
|
||||
from marshm import ma
|
||||
from rabbit_broker_instance import mq
|
||||
from influxus import influx_db
|
||||
from resources import SampleResource, SampleParameterResource
|
||||
from healthchecks import health_database_status
|
||||
|
||||
"""
|
||||
Main Flask RESTful API
|
||||
Main Flask RESTful APIm
|
||||
"""
|
||||
|
||||
__author__ = "@tormakris"
|
||||
@ -23,7 +25,7 @@ __version__text__ = "1"
|
||||
if SENTRY_DSN:
|
||||
sentry_sdk.init(
|
||||
dsn=SENTRY_DSN,
|
||||
integrations=[FlaskIntegration()],
|
||||
integrations=[FlaskIntegration(), SqlalchemyIntegration()],
|
||||
traces_sample_rate=1.0,
|
||||
send_default_pii=True,
|
||||
release=RELEASE_ID,
|
||||
@ -31,40 +33,33 @@ if SENTRY_DSN:
|
||||
_experiments={"auto_enabling_integrations": True}
|
||||
)
|
||||
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
|
||||
app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME
|
||||
app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD
|
||||
app.config['RABBIT_HOST'] = RABBITMQ_HOST
|
||||
app.config['RABBIT_ROUTING_KEY'] = "feature"
|
||||
app.config[
|
||||
'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}{POSTGRES_OPTS}"
|
||||
app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE
|
||||
app.config['FLASK_PIKA_PARAMS'] = {'host': RABBITMQ_HOST,
|
||||
'username': RABBITMQ_USERNAME,
|
||||
'password': RABBITMQ_PASSWORD,
|
||||
'port': 5672,
|
||||
'virtual_host': '/'}
|
||||
app.config['INFLUXDB_HOST'] = INFLUXDB_HOST
|
||||
app.config['INFLUXDB_PORT'] = INFLUXDB_PORT
|
||||
app.config['INFLUXDB_USER'] = INFLUXDB_USERNAME
|
||||
app.config['INFLUXDB_PASSWORD'] = INFLUXDB_PASSWORD
|
||||
app.config['INFLUXDB_DATABASE'] = INFLUXDB_DB
|
||||
|
||||
api = Api(app)
|
||||
health = HealthCheck()
|
||||
db.init_app(app)
|
||||
ma.init_app(app)
|
||||
mq.init_app(app)
|
||||
influx_db.init_app(app)
|
||||
|
||||
with app.app_context():
|
||||
# influx_db.database.create(INFLUXDB_DB)
|
||||
db.create_all()
|
||||
|
||||
formatter = logging.Formatter(
|
||||
fmt="%(asctime)s - %(levelname)s - %(module)s - %(message)s"
|
||||
)
|
||||
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(handler)
|
||||
|
||||
api.add_resource(SampleResource, "/sample")
|
||||
api.add_resource(SampleParameterResource, '/sample/<tag>')
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(
|
||||
debug=bool(DEBUG),
|
||||
host="0.0.0.0",
|
||||
port=int(PORT),
|
||||
)
|
||||
health.add_check(health_database_status)
|
||||
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
|
||||
|
@ -31,5 +31,12 @@ POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
|
||||
POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
|
||||
POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
|
||||
POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
|
||||
POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
|
||||
|
||||
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
|
||||
|
||||
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
|
||||
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
|
||||
INFLUXDB_USERNAME = os.getenv("INFLUX_USERNAME", "input-service")
|
||||
INFLUXDB_PASSWORD = os.getenv("INFLUX_PASSWORD", "input-service-supersecret")
|
||||
INFLUXDB_DB = os.getenv("INFLUX_DB", "input-service")
|
||||
|
@ -2,7 +2,7 @@
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
"""
|
||||
Flask Restful endpoints
|
||||
Database api
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
|
@ -1,102 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import uuid
|
||||
import pika
|
||||
|
||||
"""
|
||||
Flask Rabbit Broker
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "flask_rabbit_broker"
|
||||
__version__text__ = "1"
|
||||
|
||||
|
||||
class FlaskRabbitBroker:
|
||||
"""Message Broker using RabbitMQ middleware"""
|
||||
|
||||
def __init__(self, app=None):
|
||||
"""
|
||||
Create a new instance of Broker Rabbit by using
|
||||
the given parameters to connect to RabbitMQ.
|
||||
"""
|
||||
self.app = app
|
||||
self.exchange_name = None
|
||||
self.username = None
|
||||
self.password = None
|
||||
self.rabbitmq_host = None
|
||||
self.routing_key = None
|
||||
self.connection = None
|
||||
self.channel = None
|
||||
self.exchange = None
|
||||
self.exchange_type = "fanout"
|
||||
|
||||
def init_app(self, app) -> None:
|
||||
"""
|
||||
Init the broker with the current application context
|
||||
:param app: application context
|
||||
:return:
|
||||
"""
|
||||
self.username = app.config.get('RABBIT_USERNAME')
|
||||
self.password = app.config.get('RABBIT_PASSWORD')
|
||||
self.rabbitmq_host = app.config.get('RABBIT_HOST')
|
||||
self.exchange_name = app.config.get('EXCHANGE_NAME')
|
||||
self.routing_key = app.config.get('RABBIT_ROUTING_KEY')
|
||||
self.init_connection(timeout=5)
|
||||
self.init_exchange()
|
||||
|
||||
def init_connection(self, timeout: int = 5) -> None:
|
||||
""""
|
||||
Init RabbitMQ connection
|
||||
:param timeout: timeout of connection
|
||||
:return:
|
||||
"""
|
||||
credentials = pika.PlainCredentials(self.username, self.password)
|
||||
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host,
|
||||
credentials=credentials,
|
||||
heartbeat=0,
|
||||
socket_timeout=timeout))
|
||||
|
||||
def close_connection(self) -> None:
|
||||
self.connection.close()
|
||||
|
||||
def init_exchange(self) -> None:
|
||||
"""
|
||||
Init the exchange use to send messages
|
||||
:return:
|
||||
"""
|
||||
channel = self.connection.channel()
|
||||
try:
|
||||
channel.exchange_declare(exchange=self.exchange_name,
|
||||
exchange_type=self.exchange_type,
|
||||
durable=True,
|
||||
auto_delete=False)
|
||||
finally:
|
||||
channel.close()
|
||||
|
||||
def register_callback(self, callback) -> None:
|
||||
"""
|
||||
Register a callback.
|
||||
:param callback:
|
||||
:return:
|
||||
"""
|
||||
channel = self.connection.channel()
|
||||
queue = channel.queue_declare(durable=True, auto_delete=False, exclusive=True,
|
||||
queue=uuid.uuid4().urn.split(':')[2]).method.queue
|
||||
channel.bind(exchange=self.exchange_name, queue=queue)
|
||||
channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
def send(self, message: str) -> None:
|
||||
"""
|
||||
Sends a message to the declared exchange.
|
||||
:param message:
|
||||
:return:
|
||||
"""
|
||||
channel = self.connection.channel()
|
||||
try:
|
||||
channel.basic_publish(
|
||||
exchange=self.exchange_name,
|
||||
routing_key=self.routing_key,
|
||||
body=message.encode('utf-8'))
|
||||
finally:
|
||||
channel.close()
|
23
src/healthchecks.py
Normal file
23
src/healthchecks.py
Normal file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from db import db
|
||||
|
||||
"""
|
||||
Healthchek functions
|
||||
"""
|
||||
|
||||
__author__ = "@tormakris"
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "healthchecks"
|
||||
__version__text__ = "1"
|
||||
|
||||
|
||||
def health_database_status():
|
||||
is_database_working = True
|
||||
output = 'database is ok'
|
||||
try:
|
||||
db.session.execute('SELECT 1')
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
is_database_working = False
|
||||
return is_database_working, output
|
15
src/influxus.py
Normal file
15
src/influxus.py
Normal file
@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from flask_influxdb import InfluxDB
|
||||
|
||||
|
||||
"""
|
||||
Influx api
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "influxus"
|
||||
__version__text__ = "1"
|
||||
|
||||
influx_db = InfluxDB()
|
@ -1,14 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from flask_rabbit_broker import FlaskRabbitBroker
|
||||
|
||||
"""
|
||||
Rabbit Broker instance
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "rabbit_broker_instance"
|
||||
__version__text__ = "1"
|
||||
|
||||
mq = FlaskRabbitBroker()
|
@ -1,18 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pika
|
||||
from config import *
|
||||
|
||||
"""
|
||||
Rabbitmq setup
|
||||
"""
|
||||
|
||||
__author__ = '@tormakris'
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "endpoints"
|
||||
__version__text__ = "1"
|
||||
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
|
||||
rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials))
|
||||
rabbitmq_channel = rabbitmq.channel()
|
||||
rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout')
|
@ -1,16 +1,17 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
import json
|
||||
from datetime import datetime
|
||||
import tzlocal
|
||||
from xeger import Xeger
|
||||
from flask_restful import Resource
|
||||
from flask import request
|
||||
from flask import request, current_app
|
||||
import requests
|
||||
import pika
|
||||
from db import db
|
||||
from influxus import influx_db
|
||||
from models import SampleMetadata
|
||||
from schemas import SampleSchema, SampleMetadataSchema
|
||||
from config import *
|
||||
from rabbit_broker_instance import mq
|
||||
|
||||
"""
|
||||
Flask Restful endpoints
|
||||
@ -21,8 +22,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "endpoints"
|
||||
__version__text__ = "1"
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SampleResource(Resource):
|
||||
"""
|
||||
@ -49,18 +48,37 @@ class SampleResource(Resource):
|
||||
description = request.form.get("description")
|
||||
|
||||
if soundfile.content_type != 'audio/wave':
|
||||
LOGGER.info(
|
||||
current_app.logger.info(
|
||||
f"Input file was not WAV.")
|
||||
return {'err_msg': 'Input file not a wave file.'}, 415
|
||||
|
||||
try:
|
||||
desc = self.sampleschema.loads(description)
|
||||
except Exception as e:
|
||||
LOGGER.exception(e)
|
||||
current_app.logger.exception(e)
|
||||
return {'err_msg': 'Input JSON schema invalid'}, 417
|
||||
|
||||
xeger = Xeger(limit=30)
|
||||
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
|
||||
while True:
|
||||
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
|
||||
if len(generated_tag) > 2: # Ensure minimum length
|
||||
break
|
||||
|
||||
# Handle mega-autismo-cliento
|
||||
soundfile_content_length = soundfile.content_length
|
||||
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
|
||||
current_app.logger.debug(
|
||||
"The uploader did not provide content-length for the sound file... Calculating manually..."
|
||||
)
|
||||
# So, this is a seekable stream, so we just seek to the end
|
||||
old_ptr = soundfile.tell()
|
||||
soundfile.seek(0, 2)
|
||||
# Check where is the end (= content length)
|
||||
soundfile_content_length = soundfile.tell()
|
||||
# Seek back to where the stream was
|
||||
soundfile.seek(old_ptr, 0)
|
||||
|
||||
# It's insane, that you can not set this field in curl
|
||||
|
||||
record = SampleMetadata(
|
||||
device_id=desc['device_id'],
|
||||
@ -76,10 +94,37 @@ class SampleResource(Resource):
|
||||
'wave.wav',
|
||||
soundfile,
|
||||
soundfile.content_type,
|
||||
{'Content-Length': soundfile.content_length})}).raise_for_status()
|
||||
mq.send(json.dumps({'tag': generated_tag}))
|
||||
{'Content-Length': soundfile_content_length})}).raise_for_status() # Anyádat curl am
|
||||
credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'],
|
||||
current_app.config['FLASK_PIKA_PARAMS']['password'])
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'],
|
||||
credentials=credentials,
|
||||
heartbeat=0,
|
||||
socket_timeout=5))
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(exchange=current_app.config['EXCHANGE_NAME'],
|
||||
exchange_type='direct')
|
||||
channel.basic_publish(exchange=current_app.config['EXCHANGE_NAME'],
|
||||
routing_key='feature',
|
||||
body=json.dumps({'tag': generated_tag}).encode('UTF-8'))
|
||||
connection.close()
|
||||
influx_db.write_points(
|
||||
[
|
||||
{
|
||||
'time': datetime.now(tz=tzlocal.get_localzone()),
|
||||
'measurement': 'cloudinput',
|
||||
'tags': {
|
||||
'device': desc['device_id']
|
||||
},
|
||||
'fields': {
|
||||
'bruh': 1.0
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
except Exception as e:
|
||||
LOGGER.exception(e)
|
||||
current_app.logger.exception(e)
|
||||
db.session.rollback()
|
||||
return {"err_msg": str(
|
||||
e), "hint": "DB or downstream service error"}, 569
|
||||
|
@ -1,4 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from flask_marshmallow.sqla import auto_field
|
||||
|
||||
from models import SampleMetadata
|
||||
from marshm import ma
|
||||
from marshmallow import fields
|
||||
@ -19,11 +21,11 @@ class SampleSchema(ma.Schema):
|
||||
"""
|
||||
Parameters:
|
||||
- date (date)
|
||||
- device_id (str)
|
||||
- device_id (int)
|
||||
"""
|
||||
|
||||
date = fields.DateTime(required=True)
|
||||
device_id = fields.Str(required=True)
|
||||
device_id = fields.Integer(required=True)
|
||||
|
||||
|
||||
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
|
||||
@ -32,4 +34,5 @@ class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
|
||||
"""
|
||||
class Meta:
|
||||
model = SampleMetadata
|
||||
|
||||
exclude = ('timestamp', 'id', 'device_date',)
|
||||
date = auto_field("device_date", dump_only=False)
|
||||
|
Reference in New Issue
Block a user