Compare commits

63 Commits

Author SHA1 Message Date
947ac144b1 Updated k8s stuff
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-18 14:05:27 +02:00
3cdacc6720 Fixed bad import
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2021-08-17 18:26:08 +02:00
04bb2722ad Removed unused resource
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2021-08-17 18:20:20 +02:00
2431812f09 Removed sentry sqlalchemy integration
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-17 18:19:30 +02:00
10f57913f3 the missing link
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-17 18:18:09 +02:00
738eea1da3 removed unused config 2021-08-17 18:15:02 +02:00
a118b79512 Exchanges are magic
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-17 18:05:32 +02:00
2c0e6ec7d7 Removed queue as we do not scrubscribe 2021-08-17 17:56:52 +02:00
7f987afa7a Added redis stuffs 2021-08-17 17:53:16 +02:00
ca548f0863 Removed db stuffs 2021-08-17 17:16:01 +02:00
cbaf2f2981 change path
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-16 15:19:51 +02:00
3d21e3a543 jomáhát
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-11 13:22:49 +02:00
2030230258 oopsie woopsie
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-11 13:07:29 +02:00
f90571afc3 Fixed stuff
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-11 12:56:56 +02:00
459b3fa6df Added pagination features
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-11 12:10:42 +02:00
9c3f8c65fb Added injetion to outgoing messages
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-10 15:13:23 +02:00
00e9d02478 Added more spans 2021-08-10 14:40:57 +02:00
98234f0e8a Merge branch 'master' of ssh://git.kmlabz.com:2222/birbnetes/input-service
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-10 14:16:38 +02:00
4e3efb7295 Added basic tracing 2021-08-10 14:16:32 +02:00
9bfedf0090 remove workers and threads
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-09 11:34:23 +02:00
3d423c71c6 Changed the order of things
All checks were successful
continuous-integration/drone/push Build is passing
also fixed spelling of amqp
2021-07-28 15:13:44 +02:00
414330b3d5 Lowered warning tresholds
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 17:07:55 +02:00
67c5d723ca Fixed log handler
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 17:06:15 +02:00
a844a13608 Added more error handling and reporting
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 17:01:10 +02:00
ba69b9c2b1 Fixed indentation
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 16:19:05 +02:00
eb7f6498ab eh?
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 15:36:17 +02:00
57b757cb41 Added amqp to health check
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 15:24:17 +02:00
e64137ca56 Did stuff with rabbitmq
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 15:18:08 +02:00
f15517af62 made InfluxDB optional
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-26 12:51:52 +02:00
3c10a351ba Merge branch 'influx' 2021-07-26 12:45:01 +02:00
30525ac967 Small code cleanups
Some checks failed
continuous-integration/drone/push Build is failing
2021-07-26 12:32:36 +02:00
25aac079d1 add opts
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-19 16:21:53 +02:00
7e61ee8a6e update postgresql opts
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-19 16:21:17 +02:00
95b557e8ba remove sentry
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-19 16:03:19 +02:00
0c4b036b47 do not create influxdb db
Some checks failed
continuous-integration/drone/push Build is failing
2021-07-19 16:00:27 +02:00
8e0b252c92 refine influx stuff
All checks were successful
continuous-integration/drone/push Build is passing
2021-06-13 20:07:21 +02:00
211be6cf96 add influx capability
All checks were successful
continuous-integration/drone/push Build is passing
2021-06-13 20:01:23 +02:00
92884b6760 Update '.drone.yml'
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-14 20:37:27 +02:00
a08528fd99 update python
All checks were successful
continuous-integration/drone/push Build is passing
2020-12-06 11:25:46 +01:00
a1ee937f32 update helathcheck
All checks were successful
continuous-integration/drone/push Build is passing
2020-12-06 11:16:09 +01:00
5d5bb9cd92 add health endpoint
All checks were successful
continuous-integration/drone/push Build is passing
2020-11-19 00:30:26 +01:00
5b736b2844 use direct exchange
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-24 00:28:25 +02:00
29eae6bef0 Added handling missing content-length field
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-23 19:22:11 +02:00
b22236ece3 fix get
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-21 02:01:45 +02:00
5e29de0c44 remove so many transactions
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-20 00:15:37 +02:00
5a64c72cc9 do code analysis via sonar
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:50:59 +02:00
edc22a4e0a oops
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:48:59 +02:00
b47d9df26d more sessions
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:35:41 +02:00
f6273afe7b add more transactions to sentry
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:31:40 +02:00
dadb6508b3 connect on demand
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 23:26:59 +02:00
483c97e980 low reuse time
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 22:54:52 +02:00
c78d987c72 device id should be integer
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-19 22:34:41 +02:00
3563234d70 change schema
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-03 12:47:44 +02:00
099d6adffa remove cancer
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-03 12:31:19 +02:00
ccfd09f239 remove cache
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 13:40:38 +02:00
6da9abf3af auto_delete should be false as well
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 13:39:18 +02:00
8364b84450 durabe should be false
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 13:15:33 +02:00
202d065116 Fixed circular dependency
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 04:31:20 +02:00
9fb474d3be Ensure minimum length
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-02 04:21:33 +02:00
9a7258cb4d use flask logging
All checks were successful
continuous-integration/drone/push Build is passing
2020-09-20 02:19:14 +02:00
d3a8057820 ALWAYS
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-06 14:56:40 +02:00
f244721cd1 declare exchange
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-04 03:33:47 +02:00
80d115e488 use flask-pika
All checks were successful
continuous-integration/drone/push Build is passing
2020-08-04 03:23:46 +02:00
19 changed files with 415 additions and 385 deletions

View File

@ -3,30 +3,6 @@ type: docker
name: default
steps:
- name: restore-cache-with-filesystem
image: meltwater/drone-cache
settings:
backend: "filesystem"
restore: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: static_analysis
image: "python:3.8"
commands:
- pip3 install --cache-dir='./.pipcache' pylint bandit mccabe
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
- find . -name "*.py" -exec python3 -m py_compile '{}' \;
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi
- name: code-analysis
image: aosapps/drone-sonar-plugin
settings:
@ -35,21 +11,6 @@ steps:
sonar_token:
from_secret: SONAR_CODE
- name: rebuild-cache-with-filesystem
image: meltwater/drone-cache:dev
pull: true
settings:
backend: "filesystem"
rebuild: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: kaniko
image: banzaicloud/drone-kaniko
settings:
@ -63,14 +24,6 @@ steps:
- latest
- ${DRONE_BUILD_NUMBER}
- name: sentry
image: tormachris/drone-sentry
settings:
sentry_project: ${DRONE_REPO_NAME}
sentry_domain: sentry.kmlabz.com
sentry_token:
from_secret: SENTRY_TOKEN
- name: ms-teams
image: kuperiu/drone-teams
settings:
@ -78,8 +31,3 @@ steps:
from_secret: TEAMS_WEBHOOK
when:
status: [ failure ]
volumes:
- name: cache
host:
path: "/tmp/cache"

View File

@ -1,4 +1,4 @@
FROM python:3.8-slim
FROM python:3.9-slim
ENV TZ Europe/Budapest
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
@ -16,4 +16,4 @@ COPY ./src .
EXPOSE 8080
ENTRYPOINT ["gunicorn", "-b", "0.0.0.0:8080", "--workers", "1", "--threads", "1", "app:app"]
ENTRYPOINT ["gunicorn", "-b", "0.0.0.0:8080", "app:app"]

View File

@ -6,16 +6,17 @@ metadata:
app: input-service
namespace: birbnetes
data:
SENTRY_DSN: https://b181edf362e549f4967c6cd42629693d@sentry.kmlabz.com/3
RELEASE_ID: luna-k8s
SENTRY_DSN: https://fce9e078e1494bf4b959b45d0b435386@sentry.kmlabz.com/2
RELEASE_ID: kmlabz-k8s
INPUT_SERVICE_RELEASEMODE: release
INPUT_RABBITMQ_HOSTNAME: birb-rabbitmq
INPUT_RABBITMQ_EXCHANGE: "wave"
INPUT_RABBITMQ_QUEUE: wave-ready
INPUT_RABBITMQ_EXCHANGE_META: "sample-meta"
INPUT_RABBITMQ_EXCHANGE_CACHE: "sample-cache"
INPUT_RABBITMQ_USERNAME: user
INPUT_RABBITMQ_PASSWORD: 1wZVQnP5vy
INPUT_POSTGRES_HOSTNAME: input-postgres
INPUT_POSTGRES_USERNAME: input-service
INPUT_POSTGRES_PASSWORD: input-service-supersecret
INPUT_POSTGRES_DB: input-service
INPUT_STORAGE_HOSTNAME: storage-service
INPUT_RABBITMQ_PASSWORD: ZgCiSiSO8t
INFLUX_HOST: input-influx
INFLUX_PORT: "8086"
INFLUX_USERNAME: input-service
INFLUX_PASSWORD: input-service-supersecret
INFLUX_DB: input-service
CACHE_REDIS_URL: "redis://input-redis:6379/0"

View File

@ -19,11 +19,34 @@ spec:
spec:
containers:
- name: input-service
image: registry.kmlabz.com/birbnetesgit/input-service
image: registry.kmlabz.com/birbnetes/input-service
imagePullPolicy: Always
envFrom:
- configMapRef:
name: input-service
ports:
- containerPort: 8080
- name: jaeger-agent
image: jaegertracing/jaeger-agent:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 5775
name: zk-compact-trft
protocol: UDP
- containerPort: 5778
name: config-rest
protocol: TCP
- containerPort: 6831
name: jg-compact-trft
protocol: UDP
- containerPort: 6832
name: jg-binary-trft
protocol: UDP
- containerPort: 14271
name: admin-http
protocol: TCP
args:
- --reporter.grpc.host-port=dns:///woolsey.tormakristof.eu:14250
- --reporter.type=grpc
imagePullSecrets:
- name: regcred

View File

@ -1,14 +1,21 @@
sentry_sdk[flask]
gunicorn
Flask
Flask-RESTful
requests
gunicorn~=20.1.0
Flask~=2.0.1
Flask-RESTful~=0.3.9
requests~=2.26.0
werkzeug
sqlalchemy
flask_sqlalchemy
xeger
pika
psycopg2-binary
marshmallow
marshmallow-sqlalchemy
xeger~=0.3.5
pika~=1.2.0
marshmallow~=3.13.0
flask-marshmallow
py-healthcheck
Flask-InfluxDB
tzdata
tzlocal
apscheduler~=3.7.0
flask-redis~=0.4.0
opentracing~=2.4.0
jaeger-client
Flask-Opentracing

View File

@ -1,15 +1,26 @@
#!/usr/bin/env python3
import logging
from flask import Flask
from flask_restful import Api
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from healthcheck import HealthCheck
from config import *
from db import db
from redis_client import redis_client
from config import Config
from marshm import ma
from rabbit_broker_instance import mq
from resources import SampleResource, SampleParameterResource
from influxus import influx_db
from resources import SampleResource
from healthchecks import amqp_connection_status
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
from magic_amqp import magic_amqp
from error_handlers import register_all_error_handlers
import jaeger_client
import opentracing
from flask_opentracing import FlaskTracing
"""
Main Flask RESTful API
@ -20,51 +31,66 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "app"
__version__text__ = "1"
if SENTRY_DSN:
if Config.SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
dsn=Config.SENTRY_DSN,
integrations=[FlaskIntegration()],
traces_sample_rate=1.0,
traces_sample_rate=0.0,
send_default_pii=True,
release=RELEASE_ID,
environment=RELEASEMODE,
release=Config.RELEASE_ID,
environment=Config.RELEASEMODE,
_experiments={"auto_enabling_integrations": True}
)
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}"
app.config['RABBIT_USERNAME'] = RABBITMQ_USERNAME
app.config['RABBIT_PASSWORD'] = RABBITMQ_PASSWORD
app.config['RABBIT_HOST'] = RABBITMQ_HOST
app.config['RABBIT_ROUTING_KEY'] = "feature"
app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE
app.config.from_object(Config)
api = Api(app)
db.init_app(app)
health = HealthCheck()
ma.init_app(app)
mq.init_app(app)
with app.app_context():
db.create_all()
redis_client.init_app(app)
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(module)s - %(message)s"
)
# ampq magic stuff
magic_amqp.init_app(app)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
ampq_loop_scheduler = BackgroundScheduler()
ampq_loop_scheduler.add_job(func=lambda: magic_amqp.loop(), trigger="interval", seconds=5)
atexit.register(lambda: ampq_loop_scheduler.shutdown())
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
ampq_loop_scheduler.start()
api.add_resource(SampleResource, "/sample")
api.add_resource(SampleParameterResource, '/sample/<tag>')
if Config.ENABLE_INFLUXDB:
influx_db.init_app(app)
if __name__ == "__main__":
app.run(
debug=bool(DEBUG),
host="0.0.0.0",
port=int(PORT),
)
@app.before_first_request
def init_db():
if Config.ENABLE_INFLUXDB:
influx_db.database.create(Config.INFLUXDB_DATABASE)
# Setup tracing
def initialize_tracer():
app.logger.info("Initializing jaeger...")
jaeger_cfg = jaeger_client.Config(config={}, service_name='input-service', validate=True)
tracer = jaeger_cfg.initialize_tracer()
return tracer
tracing = FlaskTracing(initialize_tracer, True, app)
api.add_resource(SampleResource, "/input")
health.add_check(amqp_connection_status)
register_all_error_handlers(app)
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
if __name__ != '__main__':
import logging
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)

View File

@ -1,35 +1,40 @@
#!/usr/bin/env python3
import os
"""
Main Flask RESTful API
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "app"
__version__text__ = "1"
PORT = os.environ.get("INPUT_SERVICE_PORT", 8080)
DEBUG = os.environ.get("INPUT_SERVICE_DEBUG", True)
class Config:
PORT = 8080
DEBUG = os.environ.get("INPUT_SERVICE_DEBUG", "true").lower() in ["true", "yes", "1"]
SENTRY_DSN = os.environ.get("SENTRY_DSN")
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
SENTRY_DSN = os.environ.get("SENTRY_DSN")
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev")
REDIS_URL = os.environ['CACHE_REDIS_URL']
RABBITMQ_HOST = os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost")
RABBITMQ_EXCHANGE = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev")
RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract")
RABBITMQ_USERNAME = os.getenv("INPUT_RABBITMQ_USERNAME", "rabbitmq")
RABBITMQ_PASSWORD = os.getenv("INPUT_RABBITMQ_PASSWORD", "rabbitmq")
EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta")
EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache")
POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
FLASK_PIKA_PARAMS = {
'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"),
'username': os.getenv("INPUT_RABBITMQ_USERNAME", "rabbitmq"),
'password': os.getenv("INPUT_RABBITMQ_PASSWORD", "rabbitmq"),
'port': int(os.getenv("INPUT_RABBITMQ_PORT", 5672)),
'virtual_host': '/'
}
STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx")
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
INFLUXDB_USER = os.getenv("INFLUX_USERNAME", "input-service")
INFLUXDB_PASSWORD = os.getenv("INFLUX_PASSWORD", "input-service-supersecret")
INFLUXDB_DATABASE = os.getenv("INFLUX_DB", "input-service")

18
src/error_handlers.py Normal file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
def get_standard_error_handler(code: int):
def error_handler(err):
return {"msg": str(err)}, code
return error_handler
# function to register all handlers
def register_all_error_handlers(app):
error_codes_to_override = [404, 403, 401, 405, 400, 409, 422, 500]
for code in error_codes_to_override:
app.register_error_handler(code, get_standard_error_handler(code))

View File

@ -1,102 +0,0 @@
#!/usr/bin/env python3
import uuid
import pika
"""
Flask Rabbit Broker
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "flask_rabbit_broker"
__version__text__ = "1"
class FlaskRabbitBroker:
"""Message Broker using RabbitMQ middleware"""
def __init__(self, app=None):
"""
Create a new instance of Broker Rabbit by using
the given parameters to connect to RabbitMQ.
"""
self.app = app
self.exchange_name = None
self.username = None
self.password = None
self.rabbitmq_host = None
self.routing_key = None
self.connection = None
self.channel = None
self.exchange = None
self.exchange_type = "fanout"
def init_app(self, app) -> None:
"""
Init the broker with the current application context
:param app: application context
:return:
"""
self.username = app.config.get('RABBIT_USERNAME')
self.password = app.config.get('RABBIT_PASSWORD')
self.rabbitmq_host = app.config.get('RABBIT_HOST')
self.exchange_name = app.config.get('EXCHANGE_NAME')
self.routing_key = app.config.get('RABBIT_ROUTING_KEY')
self.init_connection(timeout=5)
self.init_exchange()
def init_connection(self, timeout: int = 5) -> None:
""""
Init RabbitMQ connection
:param timeout: timeout of connection
:return:
"""
credentials = pika.PlainCredentials(self.username, self.password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.rabbitmq_host,
credentials=credentials,
heartbeat=0,
socket_timeout=timeout))
def close_connection(self) -> None:
self.connection.close()
def init_exchange(self) -> None:
"""
Init the exchange use to send messages
:return:
"""
channel = self.connection.channel()
try:
channel.exchange_declare(exchange=self.exchange_name,
exchange_type=self.exchange_type,
durable=True,
auto_delete=False)
finally:
channel.close()
def register_callback(self, callback) -> None:
"""
Register a callback.
:param callback:
:return:
"""
channel = self.connection.channel()
queue = channel.queue_declare(durable=True, auto_delete=False, exclusive=True,
queue=uuid.uuid4().urn.split(':')[2]).method.queue
channel.bind(exchange=self.exchange_name, queue=queue)
channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
def send(self, message: str) -> None:
"""
Sends a message to the declared exchange.
:param message:
:return:
"""
channel = self.connection.channel()
try:
channel.basic_publish(
exchange=self.exchange_name,
routing_key=self.routing_key,
body=message.encode('utf-8'))
finally:
channel.close()

22
src/healthchecks.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python3
from magic_amqp import magic_amqp
"""
Healthchek functions
"""
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "healthchecks"
__version__text__ = "1"
def amqp_connection_status():
if magic_amqp.is_healthy():
result = True
text = "amqp connection is ok"
else:
result = False
text = "amqp connection is unhealthy"
return result, text

View File

@ -1,13 +1,15 @@
#!/usr/bin/env python3
from flask_sqlalchemy import SQLAlchemy
from flask_influxdb import InfluxDB
"""
Flask Restful endpoints
Influx api
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "db"
__module_name__ = "influxus"
__version__text__ = "1"
db = SQLAlchemy()
influx_db = InfluxDB()

142
src/magic_amqp.py Normal file
View File

@ -0,0 +1,142 @@
from flask import Flask
from threading import Lock
import pika
import pika.exceptions
import json
import time
import opentracing
from opentracing.ext import tags
from opentracing.propagation import Format
class MagicAMQP:
"""
This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant.
"""
def __init__(self, app: Flask = None):
self.app = app
if app:
self.init_app(app)
self._lock = Lock()
self._credentials = None
def init_app(self, app: Flask):
self.app = app
self.app.config.setdefault('FLASK_PIKA_PARAMS', {})
self.app.config.setdefault('EXCHANGE_NAME_META', None)
self.app.config.setdefault('EXCHANGE_NAME_CACHE', None)
self._credentials = pika.PlainCredentials(
app.config['FLASK_PIKA_PARAMS']['username'],
app.config['FLASK_PIKA_PARAMS']['password']
)
self._reconnect_ampq()
def _reconnect_ampq(self):
self._pika_connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.app.config['FLASK_PIKA_PARAMS']['host'],
credentials=self._credentials,
heartbeat=10,
socket_timeout=5)
)
self._pika_channel = self._pika_connection.channel()
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_META'],
exchange_type='direct'
)
self._pika_channel.exchange_declare(
exchange=self.app.config['EXCHANGE_NAME_CACHE'],
exchange_type='direct'
)
def loop(self):
"""
This method should be called periodically to keep up the connection
"""
lock_start = time.time()
with self._lock:
lock_acquire_time = time.time() - lock_start
if lock_acquire_time >= 0.5:
self.app.logger.warning(f"Loop: Lock acquire took {lock_acquire_time:5f} sec")
try:
self._pika_connection.process_data_events(0)
# We won't attempt retry if this fail
except pika.exceptions.AMQPConnectionError as e:
self.app.logger.warning(f"Connection error during process loop: {e} (attempting reconnect)")
self._reconnect_ampq()
total_time = time.time() - lock_start
if total_time > 1:
self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
def _publish(self, exchange: str, payload=None):
"""
Publish a simple json serialized message to the configured queue.
If the connection is broken, then this call will block until the connection is restored
"""
span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER}
with opentracing.tracer.start_active_span('magic_amqp.publish', tags=span_tags) as scope:
opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, payload)
lock_start = time.time()
with self._lock:
scope.span.log_kv({'event': 'lockAcquired'})
lock_acquire_time = time.time() - lock_start
if lock_acquire_time >= 0.2:
self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec")
tries = 0
while True:
try:
self._pika_channel.basic_publish(
exchange=exchange,
routing_key='feature',
body=json.dumps(payload).encode('UTF-8')
)
self.app.logger.debug(f"Published: {payload}")
break # message sent successfully
except pika.exceptions.AMQPConnectionError as e:
scope.span.log_kv({'event': 'connectionError', 'error': str(e)})
self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)")
if tries > 30:
raise # just give up
while True:
try:
self._reconnect_ampq()
break
except pika.exceptions.AMQPConnectionError as e:
self.app.logger.warning(
f"Connection error during reconnection: {e} (attempting reconnect)")
tries += 1
if tries > 30:
raise # just give up
if tries > 10:
time.sleep(2)
total_time = time.time() - lock_start
if total_time > 0.4:
self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
def publish_cache(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload)
def publish_meta(self, payload=None):
return self._publish(self.app.config['EXCHANGE_NAME_META'], payload)
def is_healthy(self) -> bool:
with self._lock:
if not self._pika_channel:
return False
return self._pika_channel.is_open and self._pika_connection.is_open
# instance to be used in the flask app
magic_amqp = MagicAMQP()

View File

@ -1,25 +0,0 @@
#!/usr/bin/env python3
from db import db
from sqlalchemy.sql import func
"""
Flask Restful endpoints
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "models"
__version__text__ = "1"
class SampleMetadata(db.Model):
"""
SQLAlchemy model of metadata entries
"""
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
device_id = db.Column(db.Integer, nullable=False)
device_date = db.Column(db.DateTime, nullable=False)
tag = db.Column(db.String(32), nullable=False, unique=True)

View File

@ -1,14 +0,0 @@
#!/usr/bin/env python3
from flask_rabbit_broker import FlaskRabbitBroker
"""
Rabbit Broker instance
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "rabbit_broker_instance"
__version__text__ = "1"
mq = FlaskRabbitBroker()

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python3
import pika
from config import *
"""
Rabbitmq setup
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "endpoints"
__version__text__ = "1"
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials))
rabbitmq_channel = rabbitmq.channel()
rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout')

4
src/redis_client.py Normal file
View File

@ -0,0 +1,4 @@
#!/usr/bin/env python3
from flask_redis import FlaskRedis
redis_client = FlaskRedis()

View File

@ -1,16 +1,15 @@
#!/usr/bin/env python3
import logging
import json
import io
from datetime import datetime
import tzlocal
from xeger import Xeger
from flask_restful import Resource
from flask import request
import requests
import pika
from db import db
from models import SampleMetadata
from schemas import SampleSchema, SampleMetadataSchema
from config import *
from rabbit_broker_instance import mq
from flask import request, current_app, abort
from magic_amqp import magic_amqp
from influxus import influx_db
from schemas import SampleSchema
from redis_client import redis_client
import opentracing
"""
Flask Restful endpoints
@ -21,8 +20,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "endpoints"
__version__text__ = "1"
LOGGER = logging.getLogger(__name__)
class SampleResource(Resource):
"""
@ -31,83 +28,87 @@ class SampleResource(Resource):
"""
sampleschema = SampleSchema(many=False)
samplemetadataschema = SampleMetadataSchema(many=True)
def post(self):
"""
Post request send to the endpoint
:return:
"""
with opentracing.tracer.start_active_span('parseAndValidate'):
if 'file' not in request.files:
return {"err_msg": "no file found"}, 469
return abort(400, "no file found")
else:
soundfile = request.files['file']
if 'description' not in request.form:
return {"err_msg": "no description found"}, 470
return abort(400, "no description found")
else:
description = request.form.get("description")
description_raw = request.form.get("description")
if soundfile.content_type != 'audio/wave':
LOGGER.info(
f"Input file was not WAV.")
return {'err_msg': 'Input file not a wave file.'}, 415
current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.')
try:
desc = self.sampleschema.loads(description)
desc = self.sampleschema.loads(description_raw)
except Exception as e:
LOGGER.exception(e)
return {'err_msg': 'Input JSON schema invalid'}, 417
current_app.logger.exception(e)
return abort(417, 'Input JSON schema invalid')
with opentracing.tracer.start_active_span('generateTag'):
xeger = Xeger(limit=30)
while True:
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
if len(generated_tag) > 2: # Ensure minimum length
break
record = SampleMetadata(
device_id=desc['device_id'],
device_date=desc['date'],
tag=generated_tag)
with opentracing.tracer.start_active_span('publishMetaMessage'):
try:
db.session.add(record)
requests.post(
f"http://{STORAGE_HOSTNAME}/object",
files={
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile.content_length})}).raise_for_status()
mq.send(json.dumps({'tag': generated_tag}))
magic_amqp.publish_meta(
{
'tag': generated_tag,
'timestamp': datetime.now().isoformat(),
'device_id': desc['device_id'],
'device_date': desc['date'].isoformat()
}
)
except Exception as e:
LOGGER.exception(e)
db.session.rollback()
return {"err_msg": str(
e), "hint": "DB or downstream service error"}, 569
current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
with opentracing.tracer.start_active_span('readSampleToMemory'):
buf = io.BytesIO()
soundfile.save(buf)
with opentracing.tracer.start_active_span('putToCache'):
redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue
# Announce only after the data is successfully committed
with opentracing.tracer.start_active_span('publishInCacheMessage'):
try:
magic_amqp.publish_cache({
'tag': generated_tag,
'mime_type': soundfile.mimetype
})
except Exception as e:
current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
# metrics
if current_app.config['ENABLE_INFLUXDB']:
with opentracing.tracer.start_active_span('influxdb.write_points'):
influx_db.write_points(
[
{
'time': datetime.now(tz=tzlocal.get_localzone()),
'measurement': 'cloudinput',
'tags': {
'device': desc['device_id']
},
'fields': {
'bruh': 1.0
}
}
]
)
db.session.commit()
return {"tag": generated_tag}, 200
def get(self):
"""
Get all stored items
:return:
"""
samples = SampleMetadata.query.all()
return self.samplemetadataschema.dump(list(samples)), 200
class SampleParameterResource(Resource):
"""
Sample endpoint with parameters
"""
samplemetadataschema = SampleMetadataSchema(many=False)
def get(self, tag: str):
"""
Get a specific item
:param tag:
:return:
"""
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
return self.samplemetadataschema.dump(sample), 200

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
from models import SampleMetadata
from marshm import ma
from marshmallow import fields
@ -19,17 +18,8 @@ class SampleSchema(ma.Schema):
"""
Parameters:
- date (date)
- device_id (str)
- device_id (int)
"""
date = fields.DateTime(required=True)
device_id = fields.Str(required=True)
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
"""
Marshmallow schema generated
"""
class Meta:
model = SampleMetadata
device_id = fields.Integer(required=True)