#!/usr/bin/env python3 import json import time from datetime import datetime import tzlocal from xeger import Xeger from flask_restful import Resource from flask import request, current_app, abort import requests from magic_amqp import magic_amqp from db import db from influxus import influx_db from models import SampleMetadata from schemas import SampleSchema, SampleMetadataSchema from requests_opentracing import SessionTracing import opentracing """ Flask Restful endpoints """ __author__ = '@tormakris' __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "endpoints" __version__text__ = "1" class SampleResource(Resource): """ Sample endpoint See: https://swagger.kmlabz.com/?urls.primaryName=Input%20Service """ sampleschema = SampleSchema(many=False) samplemetadataschema = SampleMetadataSchema(many=True) def post(self): """ Post request send to the endpoint :return: """ with opentracing.tracer.start_active_span('parseAndValidate'): if 'file' not in request.files: return abort(400, "no file found") else: soundfile = request.files['file'] if 'description' not in request.form: return abort(400, "no description found") else: description = request.form.get("description") if soundfile.content_type != 'audio/wave': current_app.logger.info(f"Input file was not WAV.") return abort(415, 'Input file not a wave file.') try: desc = self.sampleschema.loads(description) except Exception as e: current_app.logger.exception(e) return abort(417, 'Input JSON schema invalid') with opentracing.tracer.start_active_span('generateTag'): xeger = Xeger(limit=30) while True: generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] if len(generated_tag) > 2: # Ensure minimum length break # Handle mega-autismo-cliento soundfile_content_length = soundfile.content_length if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH with opentracing.tracer.start_active_span( 'calculateContentLength'): # In an ideal scenario this span is missing current_app.logger.debug( "The uploader did not provide content-length for the sound file... Calculating manually..." ) # So, this is a seekable stream, so we just seek to the end old_ptr = soundfile.tell() soundfile.seek(0, 2) # Check where is the end (= content length) soundfile_content_length = soundfile.tell() # Seek back to where the stream was soundfile.seek(old_ptr, 0) # It's insane, that you can not set this field in curl with opentracing.tracer.start_active_span('sqlalchemy.create'): record = SampleMetadata( device_id=desc['device_id'], device_date=desc['date'], tag=generated_tag ) db.session.add(record) with opentracing.tracer.start_active_span('uploadToStorageService'): files = { 'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), 'soundFile': ( 'wave.wav', soundfile, soundfile.content_type, {'Content-Length': soundfile_content_length})} upload_started = time.time() r = SessionTracing(propagate=True).post( f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", files=files ) upload_time = time.time() - upload_started if upload_time > 0.8: current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") if r.status_code not in [200, 201]: return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") with opentracing.tracer.start_active_span('sqlalchemy.commit'): db.session.commit() # Announce only after the data is successfully committed with opentracing.tracer.start_active_span('publishMessage'): try: magic_amqp.publish({'tag': generated_tag}) except Exception as e: current_app.logger.exception(e) return abort(500, f"AMQP Publish error: {str(e)}") # metrics if current_app.config['ENABLE_INFLUXDB']: with opentracing.tracer.start_active_span('influxdb.write_points'): influx_db.write_points( [ { 'time': datetime.now(tz=tzlocal.get_localzone()), 'measurement': 'cloudinput', 'tags': { 'device': desc['device_id'] }, 'fields': { 'bruh': 1.0 } } ] ) return {"tag": generated_tag}, 200 def get(self): """ Get all stored items :return: """ with opentracing.tracer.start_active_span('compileQuery'): query = SampleMetadata.query ## Compile filters ## filters = [] try: first = int(request.args.get('first')) except (ValueError, TypeError): first = None else: filters.append( SampleMetadata.id >= first ) try: after = datetime.fromisoformat(request.args.get('after')) except (ValueError, TypeError): after = None else: filters.append( SampleMetadata.timestamp > after ) try: before = datetime.fromisoformat(request.args.get('before')) except (ValueError, TypeError): before = None else: filters.append( SampleMetadata.timestamp < before ) if filters: query = query.filter(db.and_(*filters)) try: limit = int(request.args.get('limit')) except (ValueError, TypeError): limit = None else: query = query.limit(limit) ## Run query ## count = "count" in request.args tags = { "first": first, "limit": limit, "after": after, "before": before } if count: with opentracing.tracer.start_active_span('sqlalchemy.count', tags=tags): rows = query.count() return {"count": rows}, 200 else: with opentracing.tracer.start_active_span('sqlalchemy.select', tags=tags): samples = query.all() return self.samplemetadataschema.dump(list(samples)), 200 class SampleParameterResource(Resource): """ Sample endpoint with parameters """ samplemetadataschema = SampleMetadataSchema(many=False) def get(self, tag: str): """ Get a specific item :param tag: :return: """ with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"tag": tag}): sample = SampleMetadata.query.filter_by(tag=tag).first_or_404() return self.samplemetadataschema.dump(sample), 200