Compare commits

34 Commits

Author SHA1 Message Date
25aac079d1 add opts
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-19 16:21:53 +02:00
7e61ee8a6e update postgresql opts
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-19 16:21:17 +02:00
95b557e8ba remove sentry
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-19 16:03:19 +02:00
0c4b036b47 do not create influxdb db
Some checks failed
continuous-integration/drone/push Build is failing
2021-07-19 16:00:27 +02:00
8e0b252c92 refine influx stuff
All checks were successful
continuous-integration/drone/push Build is passing
2021-06-13 20:07:21 +02:00
211be6cf96 add influx capability
All checks were successful
continuous-integration/drone/push Build is passing
2021-06-13 20:01:23 +02:00
92884b6760 Update '.drone.yml'
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-14 20:37:27 +02:00
a08528fd99 update python
All checks were successful
continuous-integration/drone/push Build is passing
2020-12-06 11:25:46 +01:00
a1ee937f32 update helathcheck
All checks were successful
continuous-integration/drone/push Build is passing
2020-12-06 11:16:09 +01:00
5d5bb9cd92 add health endpoint
All checks were successful
continuous-integration/drone/push Build is passing
2020-11-19 00:30:26 +01:00
5b736b2844 use direct exchange
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-24 00:28:25 +02:00
29eae6bef0 Added handling missing content-length field
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-23 19:22:11 +02:00
b22236ece3 fix get
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-21 02:01:45 +02:00
5e29de0c44 remove so many transactions
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-20 00:15:37 +02:00
5a64c72cc9 do code analysis via sonar
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:50:59 +02:00
edc22a4e0a oops
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:48:59 +02:00
b47d9df26d more sessions
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:35:41 +02:00
f6273afe7b add more transactions to sentry
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:31:40 +02:00
dadb6508b3 connect on demand
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:26:59 +02:00
483c97e980 low reuse time
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 22:54:52 +02:00
c78d987c72 device id should be integer
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 22:34:41 +02:00
3563234d70 change schema
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-03 12:47:44 +02:00
099d6adffa remove cancer
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-03 12:31:19 +02:00
ccfd09f239 remove cache
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 13:40:38 +02:00
6da9abf3af auto_delete should be false as well
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 13:39:18 +02:00
8364b84450 durabe should be false
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 13:15:33 +02:00
202d065116 Fixed circular dependency
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 04:31:20 +02:00
9fb474d3be Ensure minimum length
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 04:21:33 +02:00
9a7258cb4d use flask logging
All checks were successful
continuous-integration/drone/push Build is passing
2020-09-20 02:19:14 +02:00
d3a8057820 ALWAYS
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-06 14:56:40 +02:00
f244721cd1 declare exchange
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-04 03:33:47 +02:00
80d115e488 use flask-pika
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-04 03:23:46 +02:00
5b8d88339f Merge branch 'master' of gitea:birbnetes/input-service into master
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-02 00:54:32 +02:00
434e63053b hotfix 2020-08-02 00:53:49 +02:00
18 changed files with 196 additions and 228 deletions

View File

@ -3,30 +3,6 @@ type: docker
name: default
steps:
- name: restore-cache-with-filesystem
image: meltwater/drone-cache
settings:
backend: "filesystem"
restore: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: static_analysis
image: "python:3.8"
commands:
- pip3 install --cache-dir='./.pipcache' pylint bandit mccabe
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
- find . -name "*.py" -exec python3 -m py_compile '{}' \;
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi
- name: code-analysis
image: aosapps/drone-sonar-plugin
settings:
@ -35,21 +11,6 @@ steps:
sonar_token:
from_secret: SONAR_CODE
- name: rebuild-cache-with-filesystem
image: meltwater/drone-cache:dev
pull: true
settings:
backend: "filesystem"
rebuild: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: kaniko
image: banzaicloud/drone-kaniko
settings:
@ -63,14 +24,6 @@ steps:
- latest
- ${DRONE_BUILD_NUMBER}
- name: sentry
image: tormachris/drone-sentry
settings:
sentry_project: ${DRONE_REPO_NAME}
sentry_domain: sentry.kmlabz.com
sentry_token:
from_secret: SENTRY_TOKEN
- name: ms-teams
image: kuperiu/drone-teams
settings:
@ -78,8 +31,3 @@ steps:
from_secret: TEAMS_WEBHOOK
when:
status: [ failure ]
volumes:
- name: cache
host:
path: "/tmp/cache"

View File

@ -1,4 +1,4 @@
FROM python:3.8-slim
FROM python:3.9-slim
ENV TZ Europe/Budapest
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

45
conftest.py Normal file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
import pytest
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
@pytest.fixture(scope='session')
def database(request):
"""
Create a Postgres database for the tests, and drop it when the tests are done.
"""
pg_host = DB_OPTS.get("postgresql")
pg_port = DB_OPTS.get("5432")
pg_user = DB_OPTS.get("input-service-test")
pg_db = DB_OPTS["input-service-test"]
init_postgresql_database(pg_user, pg_host, pg_port, pg_db)
@request.addfinalizer
def drop_database():
drop_postgresql_database(pg_user, pg_host, pg_port, pg_db)
@pytest.fixture(scope='session')
def app(database):
"""
Create a Flask app context for the tests.
"""
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = DB_CONN
return app
@pytest.fixture(scope='session')
def _db(app):
"""
Provide the transactional fixtures with access to the database via a Flask-SQLAlchemy
database connection.
"""
db = SQLAlchemy(app=app)
return db

View File

@ -20,6 +20,7 @@ spec:
containers:
- name: input-service
image: registry.kmlabz.com/birbnetesgit/input-service
imagePullPolicy: Always
envFrom:
- configMapRef:
name: input-service

4
requirements.test.txt Normal file
View File

@ -0,0 +1,4 @@
pytest
pytest-rabbitmq
pytest-httpserver
pytest-flask-sqlalchemy

View File

@ -12,3 +12,7 @@ psycopg2-binary
marshmallow
marshmallow-sqlalchemy
flask-marshmallow
py-healthcheck
Flask-InfluxDB
tzdata
tzlocal

View File

@ -1,18 +1,20 @@
#!/usr/bin/env python3
import logging
from flask import Flask
from flask_restful import Api
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from healthcheck import HealthCheck
from config import *
from db import db
from marshm import ma
from rabbit_broker_instance import mq
from influxus import influx_db
from resources import SampleResource, SampleParameterResource
from healthchecks import health_database_status
"""
Main Flask RESTful API
Main Flask RESTful APIm
"""
__author__ = "@tormakris"
@ -23,46 +25,41 @@ __version__text__ = "1"
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[FlaskIntegration()],
integrations=[FlaskIntegration(), SqlalchemyIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
release=RELEASE_ID,
environment=RELEASEMODE
environment=RELEASEMODE,
_experiments={"auto_enabling_integrations": True}
)
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME
app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD
app.config['RABBIT_HOST'] = RABBITMQ_HOST
app.config['RABBIT_ROUTING_KEY'] = "feature"
app.config[
'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}{POSTGRES_OPTS}"
app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE
app.config['FLASK_PIKA_PARAMS'] = {'host': RABBITMQ_HOST,
'username': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
'port': 5672,
'virtual_host': '/'}
app.config['INFLUXDB_HOST'] = INFLUXDB_HOST
app.config['INFLUXDB_PORT'] = INFLUXDB_PORT
app.config['INFLUXDB_USER'] = INFLUXDB_USERNAME
app.config['INFLUXDB_PASSWORD'] = INFLUXDB_PASSWORD
app.config['INFLUXDB_DATABASE'] = INFLUXDB_DB
api = Api(app)
health = HealthCheck()
db.init_app(app)
ma.init_app(app)
mq.init_app(app)
influx_db.init_app(app)
with app.app_context():
# influx_db.database.create(INFLUXDB_DB)
db.create_all()
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(module)s - %(message)s"
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
api.add_resource(SampleResource, "/sample")
api.add_resource(SampleParameterResource, '/sample/<tag>')
if __name__ == "__main__":
app.run(
debug=bool(DEBUG),
host="0.0.0.0",
port=int(PORT),
)
health.add_check(health_database_status)
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())

View File

@ -31,5 +31,12 @@ POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
INFLUXDB_USERNAME = os.getenv("INFLUX_USERNAME", "input-service")
INFLUXDB_PASSWORD = os.getenv("INFLUX_PASSWORD", "input-service-supersecret")
INFLUXDB_DB = os.getenv("INFLUX_DB", "input-service")

View File

@ -2,7 +2,7 @@
from flask_sqlalchemy import SQLAlchemy
"""
Flask Restful endpoints
Database api
"""
__author__ = '@tormakris'

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python3
import pika
"""
Flask Rabbit Broker
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "flask_rabbit_broker"
__version__text__ = "1"
class FlaskRabbitBroker:
"""Message Broker using RabbitMQ middleware"""
def __init__(self, app=None):
"""
Create a new instance of Broker Rabbit by using
the given parameters to connect to RabbitMQ.
"""
self.app = app
self.exchange_name = None
self.username = None
self.password = None
self.rabbitmq_host = None
self.routing_key = None
self.connection = None
self.channel = None
self.exchange = None
def init_app(self, app) -> None:
"""
Init the broker with the current application context
:param app: application context
:return:
"""
self.username = app.context.get('RABBIT_USERNAME')
self.password = app.context.get('RABBIT_PASSWORD')
self.rabbitmq_host = app.context.get('RABBIT_HOST')
self.exchange_name = app.context.get('EXCHANGE_NAME')
self.routing_key = app.context.get('RABBIT_ROUTING_KEY')
self.init_connection(timeout=5)
self.init_exchange()
def init_connection(self, timeout: int = 5) -> None:
""""
Init RabbitMQ connection
:param timeout: timeout of connection
:return:
"""
credentials = pika.PlainCredentials(self.username, self.password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host,
credentials=credentials,
heartbeat=0,
socket_timeout=timeout))
def close_connection(self) -> None:
self.connection.close()
def init_exchange(self) -> None:
"""
Init the exchange use to send messages
:return:
"""
channel = self.connection.channel()
try:
exchange = channel.exchange_declare(exchange=self.exchange_name,
exchange_type='fanout',
durable=True,
auto_delete=False)
finally:
channel.close()
def register_callback(self, callback) -> None:
"""
Register a callback.
:param callback:
:return:
"""
channel = self.connection.channel()
queue = channel.queue_declare(durable=True, auto_delete=False)
queue.bind(self.exchange)
queue.basic_consume(callback, no_ack=True)
def send(self, message: str) -> None:
"""
Sends a message to the declared exchange.
:param message:
:return:
"""
channel = self.connection.channel()
try:
channel.basic_publish(
exchange=self.exchange_name,
routing_key=self.routing_key,
body=message.encode('utf-8'))
finally:
channel.close()

23
src/healthchecks.py Normal file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
from db import db
"""
Healthchek functions
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "healthchecks"
__version__text__ = "1"
def health_database_status():
is_database_working = True
output = 'database is ok'
try:
db.session.execute('SELECT 1')
except Exception as e:
output = str(e)
is_database_working = False
return is_database_working, output

15
src/influxus.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3
from flask_influxdb import InfluxDB
"""
Influx api
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "influxus"
__version__text__ = "1"
influx_db = InfluxDB()

View File

@ -1,14 +0,0 @@
#!/usr/bin/env python3
from flask_rabbit_broker import FlaskRabbitBroker
"""
Rabbit Broker instance
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "rabbit_broker_instance"
__version__text__ = "1"
mq = FlaskRabbitBroker()

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python3
import pika
from config import *
"""
Rabbitmq setup
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "endpoints"
__version__text__ = "1"
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials))
rabbitmq_channel = rabbitmq.channel()
rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout')

View File

@ -1,16 +1,17 @@
#!/usr/bin/env python3
import logging
import json
from datetime import datetime
import tzlocal
from xeger import Xeger
from flask_restful import Resource
from flask import request
from flask import request, current_app
import requests
import pika
from db import db
from influxus import influx_db
from models import SampleMetadata
from schemas import SampleSchema, SampleMetadataSchema
from config import *
from rabbit_broker_instance import mq
"""
Flask Restful endpoints
@ -21,8 +22,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "endpoints"
__version__text__ = "1"
LOGGER = logging.getLogger(__name__)
class SampleResource(Resource):
"""
@ -49,18 +48,37 @@ class SampleResource(Resource):
description = request.form.get("description")
if soundfile.content_type != 'audio/wave':
LOGGER.info(
current_app.logger.info(
f"Input file was not WAV.")
return {'err_msg': 'Input file not a wave file.'}, 415
try:
desc = self.sampleschema.loads(description)
except Exception as e:
LOGGER.exception(e)
current_app.logger.exception(e)
return {'err_msg': 'Input JSON schema invalid'}, 417
xeger = Xeger(limit=30)
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
while True:
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
if len(generated_tag) > 2: # Ensure minimum length
break
# Handle mega-autismo-cliento
soundfile_content_length = soundfile.content_length
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
current_app.logger.debug(
"The uploader did not provide content-length for the sound file... Calculating manually..."
)
# So, this is a seekable stream, so we just seek to the end
old_ptr = soundfile.tell()
soundfile.seek(0, 2)
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
# It's insane, that you can not set this field in curl
record = SampleMetadata(
device_id=desc['device_id'],
@ -76,10 +94,37 @@ class SampleResource(Resource):
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile.content_length})}).raise_for_status()
mq.send(json.dumps({'tag': generated_tag}))
{'Content-Length': soundfile_content_length})}).raise_for_status() # Anyádat curl am
credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'],
current_app.config['FLASK_PIKA_PARAMS']['password'])
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'],
credentials=credentials,
heartbeat=0,
socket_timeout=5))
channel = connection.channel()
channel.exchange_declare(exchange=current_app.config['EXCHANGE_NAME'],
exchange_type='direct')
channel.basic_publish(exchange=current_app.config['EXCHANGE_NAME'],
routing_key='feature',
body=json.dumps({'tag': generated_tag}).encode('UTF-8'))
connection.close()
influx_db.write_points(
[
{
'time': datetime.now(tz=tzlocal.get_localzone()),
'measurement': 'cloudinput',
'tags': {
'device': desc['device_id']
},
'fields': {
'bruh': 1.0
}
}
]
)
except Exception as e:
LOGGER.exception(e)
current_app.logger.exception(e)
db.session.rollback()
return {"err_msg": str(
e), "hint": "DB or downstream service error"}, 569

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3
from flask_marshmallow.sqla import auto_field
from models import SampleMetadata
from marshm import ma
from marshmallow import fields
@ -19,11 +21,11 @@ class SampleSchema(ma.Schema):
"""
Parameters:
- date (date)
- device_id (str)
- device_id (int)
"""
date = fields.DateTime(required=True)
device_id = fields.Str(required=True)
device_id = fields.Integer(required=True)
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
@ -32,4 +34,5 @@ class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
"""
class Meta:
model = SampleMetadata
exclude = ('timestamp', 'id', 'device_date',)
date = auto_field("device_date", dump_only=False)

7
test.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env python3
"""
Unit tests
"""
import pytest