#!/usr/bin/env python3 import io from datetime import datetime import tzlocal from xeger import Xeger from flask_restful import Resource from flask import request, current_app, abort from magic_amqp import magic_amqp from influxus import influx_db from schemas import SampleSchema from redis_client import redis_client import opentracing """ Flask Restful endpoints """ __author__ = '@tormakris' __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "endpoints" __version__text__ = "1" class SampleResource(Resource): """ Sample endpoint See: https://swagger.kmlabz.com/?urls.primaryName=Input%20Service """ sampleschema = SampleSchema(many=False) def post(self): """ Post request send to the endpoint :return: """ with opentracing.tracer.start_active_span('parseAndValidate'): if 'file' not in request.files: return abort(400, "no file found") else: soundfile = request.files['file'] if 'description' not in request.form: return abort(400, "no description found") else: description_raw = request.form.get("description") if soundfile.content_type != 'audio/wave': current_app.logger.info(f"Input file was not WAV.") return abort(415, 'Input file not a wave file.') try: desc = self.sampleschema.loads(description_raw) except Exception as e: current_app.logger.exception(e) return abort(417, 'Input JSON schema invalid') with opentracing.tracer.start_active_span('generateTag'): xeger = Xeger(limit=30) while True: generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] if len(generated_tag) > 2: # Ensure minimum length break with opentracing.tracer.start_active_span('publishMetaMessage'): try: magic_amqp.publish_meta( { 'tag': generated_tag, 'timestamp': datetime.now().isoformat(), 'device_id': desc['device_id'], 'device_date': desc['date'].isoformat() } ) except Exception as e: current_app.logger.exception(e) return abort(500, f"AMQP Publish error: {str(e)}") with opentracing.tracer.start_active_span('readSampleToMemory'): buf = io.BytesIO() soundfile.save(buf) with opentracing.tracer.start_active_span('putToCache'): redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue # Announce only after the data is successfully committed with opentracing.tracer.start_active_span('publishInCacheMessage'): try: magic_amqp.publish_cache({ 'tag': generated_tag, 'mime_type': soundfile.mimetype }) except Exception as e: current_app.logger.exception(e) return abort(500, f"AMQP Publish error: {str(e)}") # metrics if current_app.config['ENABLE_INFLUXDB']: with opentracing.tracer.start_active_span('influxdb.write_points'): influx_db.write_points( [ { 'time': datetime.now(tz=tzlocal.get_localzone()), 'measurement': 'cloudinput', 'tags': { 'device': desc['device_id'] }, 'fields': { 'bruh': 1.0 } } ] ) return {"tag": generated_tag}, 200