#!/usr/bin/env python3 from flask import Flask from flask_restful import Api import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from healthcheck import HealthCheck from config import Config from db import db from marshm import ma from influxus import influx_db from resources import SampleResource, SampleParameterResource from healthchecks import health_database_status, ampq_connection_status import atexit from apscheduler.schedulers.background import BackgroundScheduler from magic_ampq import magic_ampq """ Main Flask RESTful API """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" if Config.SENTRY_DSN: sentry_sdk.init( dsn=Config.SENTRY_DSN, integrations=[FlaskIntegration(), SqlalchemyIntegration()], traces_sample_rate=1.0, send_default_pii=True, release=Config.RELEASE_ID, environment=Config.RELEASEMODE, _experiments={"auto_enabling_integrations": True} ) app = Flask(__name__) app.config.from_object(Config) api = Api(app) health = HealthCheck() db.init_app(app) ma.init_app(app) # ampq magic stuff magic_ampq.init_app(app) ampq_loop_scheduler = BackgroundScheduler() ampq_loop_scheduler.add_job(func=lambda: magic_ampq.loop(), trigger="interval", seconds=5) atexit.register(lambda: ampq_loop_scheduler.shutdown()) ampq_loop_scheduler.start() if Config.ENABLE_INFLUXDB: influx_db.init_app(app) @app.before_first_request def init_db(): if Config.ENABLE_INFLUXDB: influx_db.database.create(Config.INFLUXDB_DATABASE) db.create_all() api.add_resource(SampleResource, "/sample") api.add_resource(SampleParameterResource, '/sample/') health.add_check(health_database_status) health.add_check(ampq_connection_status) app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())