norbi-update #1
@@ -4,13 +4,9 @@ Flask~=2.0.1
 | 
			
		||||
Flask-RESTful~=0.3.9
 | 
			
		||||
requests~=2.26.0
 | 
			
		||||
werkzeug
 | 
			
		||||
sqlalchemy~=1.4.22
 | 
			
		||||
flask_sqlalchemy~=2.5.1
 | 
			
		||||
xeger~=0.3.5
 | 
			
		||||
pika~=1.2.0
 | 
			
		||||
psycopg2-binary
 | 
			
		||||
marshmallow~=3.13.0
 | 
			
		||||
marshmallow-sqlalchemy~=0.26.1
 | 
			
		||||
flask-marshmallow
 | 
			
		||||
py-healthcheck
 | 
			
		||||
Flask-InfluxDB
 | 
			
		||||
 
 | 
			
		||||
@@ -7,11 +7,10 @@ from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
 | 
			
		||||
from healthcheck import HealthCheck
 | 
			
		||||
 | 
			
		||||
from config import Config
 | 
			
		||||
from db import db
 | 
			
		||||
from marshm import ma
 | 
			
		||||
from influxus import influx_db
 | 
			
		||||
from resources import SampleResource, SampleParameterResource
 | 
			
		||||
from healthchecks import health_database_status, amqp_connection_status
 | 
			
		||||
from healthchecks import amqp_connection_status
 | 
			
		||||
 | 
			
		||||
import atexit
 | 
			
		||||
 | 
			
		||||
@@ -48,7 +47,6 @@ app.config.from_object(Config)
 | 
			
		||||
 | 
			
		||||
api = Api(app)
 | 
			
		||||
health = HealthCheck()
 | 
			
		||||
db.init_app(app)
 | 
			
		||||
ma.init_app(app)
 | 
			
		||||
 | 
			
		||||
# ampq magic stuff
 | 
			
		||||
@@ -68,7 +66,6 @@ if Config.ENABLE_INFLUXDB:
 | 
			
		||||
def init_db():
 | 
			
		||||
    if Config.ENABLE_INFLUXDB:
 | 
			
		||||
        influx_db.database.create(Config.INFLUXDB_DATABASE)
 | 
			
		||||
    db.create_all()
 | 
			
		||||
 | 
			
		||||
# Setup tracing
 | 
			
		||||
def initialize_tracer():
 | 
			
		||||
@@ -83,7 +80,6 @@ tracing = FlaskTracing(initialize_tracer, True, app)
 | 
			
		||||
api.add_resource(SampleResource, "/input")
 | 
			
		||||
api.add_resource(SampleParameterResource, '/input/<tag>')
 | 
			
		||||
 | 
			
		||||
health.add_check(health_database_status)
 | 
			
		||||
health.add_check(amqp_connection_status)
 | 
			
		||||
 | 
			
		||||
register_all_error_handlers(app)
 | 
			
		||||
 
 | 
			
		||||
@@ -10,12 +10,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
 | 
			
		||||
__module_name__ = "app"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
_POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost")
 | 
			
		||||
_POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service")
 | 
			
		||||
_POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service")
 | 
			
		||||
_POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service")
 | 
			
		||||
_POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Config:
 | 
			
		||||
    PORT = 8080
 | 
			
		||||
@@ -36,8 +30,6 @@ class Config:
 | 
			
		||||
        'virtual_host': '/'
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    SQLALCHEMY_DATABASE_URI = f"postgresql://{_POSTGRES_USERNAME}:{_POSTGRES_PASSWORD}@{_POSTGRES_HOSTNAME}:5432/{_POSTGRES_DB}{_POSTGRES_OPTS}"
 | 
			
		||||
 | 
			
		||||
    STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042")
 | 
			
		||||
 | 
			
		||||
    ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"]
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										13
									
								
								src/db.py
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								src/db.py
									
									
									
									
									
								
							@@ -1,13 +0,0 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
from flask_sqlalchemy import SQLAlchemy
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
Database api
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
__author__ = '@tormakris'
 | 
			
		||||
__copyright__ = "Copyright 2020, Birbnetes Team"
 | 
			
		||||
__module_name__ = "db"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
db = SQLAlchemy()
 | 
			
		||||
@@ -1,6 +1,4 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from db import db
 | 
			
		||||
from magic_amqp import magic_amqp
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
@@ -13,17 +11,6 @@ __module_name__ = "healthchecks"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def health_database_status():
 | 
			
		||||
    is_database_working = True
 | 
			
		||||
    output = 'database is ok'
 | 
			
		||||
    try:
 | 
			
		||||
        db.session.execute('SELECT 1')
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        output = str(e)
 | 
			
		||||
        is_database_working = False
 | 
			
		||||
    return is_database_working, output
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def amqp_connection_status():
 | 
			
		||||
    if magic_amqp.is_healthy():
 | 
			
		||||
        result = True
 | 
			
		||||
 
 | 
			
		||||
@@ -1,25 +0,0 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
from db import db
 | 
			
		||||
from sqlalchemy.sql import func
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
Flask Restful endpoints
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
__author__ = '@tormakris'
 | 
			
		||||
__copyright__ = "Copyright 2020, Birbnetes Team"
 | 
			
		||||
__module_name__ = "models"
 | 
			
		||||
__version__text__ = "1"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SampleMetadata(db.Model):
 | 
			
		||||
    """
 | 
			
		||||
    SQLAlchemy model of metadata entries
 | 
			
		||||
    """
 | 
			
		||||
    id = db.Column(db.Integer, primary_key=True, auto_increment=True)
 | 
			
		||||
    timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
 | 
			
		||||
 | 
			
		||||
    device_id = db.Column(db.Integer, nullable=False)
 | 
			
		||||
    device_date = db.Column(db.DateTime, nullable=False)
 | 
			
		||||
 | 
			
		||||
    tag = db.Column(db.String(32), nullable=False, unique=True)
 | 
			
		||||
							
								
								
									
										106
									
								
								src/resources.py
									
									
									
									
									
								
							
							
						
						
									
										106
									
								
								src/resources.py
									
									
									
									
									
								
							@@ -6,12 +6,9 @@ import tzlocal
 | 
			
		||||
from xeger import Xeger
 | 
			
		||||
from flask_restful import Resource
 | 
			
		||||
from flask import request, current_app, abort
 | 
			
		||||
import requests
 | 
			
		||||
from magic_amqp import magic_amqp
 | 
			
		||||
from db import db
 | 
			
		||||
from influxus import influx_db
 | 
			
		||||
from models import SampleMetadata
 | 
			
		||||
from schemas import SampleSchema, SampleMetadataSchema
 | 
			
		||||
from schemas import SampleSchema
 | 
			
		||||
from requests_opentracing import SessionTracing
 | 
			
		||||
import opentracing
 | 
			
		||||
 | 
			
		||||
@@ -32,7 +29,6 @@ class SampleResource(Resource):
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    sampleschema = SampleSchema(many=False)
 | 
			
		||||
    samplemetadataschema = SampleMetadataSchema(many=True)
 | 
			
		||||
 | 
			
		||||
    def post(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -84,14 +80,6 @@ class SampleResource(Resource):
 | 
			
		||||
 | 
			
		||||
                # It's insane, that you can not set this field in curl
 | 
			
		||||
 | 
			
		||||
        with opentracing.tracer.start_active_span('sqlalchemy.create'):
 | 
			
		||||
            record = SampleMetadata(
 | 
			
		||||
                device_id=desc['device_id'],
 | 
			
		||||
                device_date=desc['date'],
 | 
			
		||||
                tag=generated_tag
 | 
			
		||||
            )
 | 
			
		||||
            db.session.add(record)
 | 
			
		||||
 | 
			
		||||
        with opentracing.tracer.start_active_span('uploadToStorageService'):
 | 
			
		||||
            files = {
 | 
			
		||||
                'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
 | 
			
		||||
@@ -115,9 +103,6 @@ class SampleResource(Resource):
 | 
			
		||||
                return abort(500,
 | 
			
		||||
                             f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
 | 
			
		||||
 | 
			
		||||
        with opentracing.tracer.start_active_span('sqlalchemy.commit'):
 | 
			
		||||
            db.session.commit()
 | 
			
		||||
 | 
			
		||||
        # Announce only after the data is successfully committed
 | 
			
		||||
        with opentracing.tracer.start_active_span('publishMessage'):
 | 
			
		||||
            try:
 | 
			
		||||
@@ -144,91 +129,4 @@ class SampleResource(Resource):
 | 
			
		||||
                    ]
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        return {"tag": generated_tag}, 200
 | 
			
		||||
 | 
			
		||||
    def get(self):
 | 
			
		||||
        """
 | 
			
		||||
        Get all stored items
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        with opentracing.tracer.start_active_span('compileQuery'):
 | 
			
		||||
            query = SampleMetadata.query
 | 
			
		||||
 | 
			
		||||
            ## Compile filters ##
 | 
			
		||||
 | 
			
		||||
            filters = []
 | 
			
		||||
            try:
 | 
			
		||||
                first = int(request.args.get('first'))
 | 
			
		||||
            except (ValueError, TypeError):
 | 
			
		||||
                first = None
 | 
			
		||||
            else:
 | 
			
		||||
                filters.append(
 | 
			
		||||
                    SampleMetadata.id >= first
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                after = datetime.fromisoformat(request.args.get('after'))
 | 
			
		||||
            except (ValueError, TypeError):
 | 
			
		||||
                after = None
 | 
			
		||||
            else:
 | 
			
		||||
                filters.append(
 | 
			
		||||
                    SampleMetadata.timestamp > after
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                before = datetime.fromisoformat(request.args.get('before'))
 | 
			
		||||
            except (ValueError, TypeError):
 | 
			
		||||
                before = None
 | 
			
		||||
            else:
 | 
			
		||||
                filters.append(
 | 
			
		||||
                    SampleMetadata.timestamp < before
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            if filters:
 | 
			
		||||
                query = query.filter(db.and_(*filters))
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                limit = int(request.args.get('limit'))
 | 
			
		||||
            except (ValueError, TypeError):
 | 
			
		||||
                limit = None
 | 
			
		||||
            else:
 | 
			
		||||
                query = query.limit(limit)
 | 
			
		||||
 | 
			
		||||
        ## Run query ##
 | 
			
		||||
        count = "count" in request.args
 | 
			
		||||
        tags = {
 | 
			
		||||
            "first": first,
 | 
			
		||||
            "limit": limit,
 | 
			
		||||
            "after": after,
 | 
			
		||||
            "before": before
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if count:
 | 
			
		||||
            with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags):
 | 
			
		||||
                rows = query.count()
 | 
			
		||||
 | 
			
		||||
            return {"count": rows}, 200
 | 
			
		||||
        else:
 | 
			
		||||
            with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags):
 | 
			
		||||
                samples = query.all()
 | 
			
		||||
 | 
			
		||||
            return self.samplemetadataschema.dump(list(samples)), 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SampleParameterResource(Resource):
 | 
			
		||||
    """
 | 
			
		||||
    Sample endpoint with parameters
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    samplemetadataschema = SampleMetadataSchema(many=False)
 | 
			
		||||
 | 
			
		||||
    def get(self, tag: str):
 | 
			
		||||
        """
 | 
			
		||||
        Get a specific item
 | 
			
		||||
        :param tag:
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}):
 | 
			
		||||
            sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
 | 
			
		||||
 | 
			
		||||
        return self.samplemetadataschema.dump(sample), 200
 | 
			
		||||
        return {"tag": generated_tag}, 200
 | 
			
		||||
@@ -26,13 +26,3 @@ class SampleSchema(ma.Schema):
 | 
			
		||||
 | 
			
		||||
    date = fields.DateTime(required=True)
 | 
			
		||||
    device_id = fields.Integer(required=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
 | 
			
		||||
    """
 | 
			
		||||
    Marshmallow schema generated
 | 
			
		||||
    """
 | 
			
		||||
    class Meta:
 | 
			
		||||
        model = SampleMetadata
 | 
			
		||||
        exclude = ('timestamp', 'id', 'device_date')
 | 
			
		||||
    date = auto_field("device_date", dump_only=False)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user