input-service/src/resources.py

115 lines
3.9 KiB
Python
Raw Permalink Normal View History

2020-03-25 01:19:22 +01:00
#!/usr/bin/env python3
2021-08-17 17:53:16 +02:00
import io
2021-06-13 20:01:23 +02:00
from datetime import datetime
import tzlocal
2020-03-31 16:51:31 +02:00
from xeger import Xeger
from flask_restful import Resource
2021-07-26 12:32:36 +02:00
from flask import request, current_app, abort
from magic_amqp import magic_amqp
2021-06-13 20:01:23 +02:00
from influxus import influx_db
2021-08-17 17:16:01 +02:00
from schemas import SampleSchema
2021-08-17 17:53:16 +02:00
from redis_client import redis_client
2021-08-10 14:40:57 +02:00
import opentracing
2020-03-25 01:19:22 +01:00
"""
Flask Restful endpoints
"""
__author__ = '@tormakris'
2020-03-25 02:36:05 +01:00
__copyright__ = "Copyright 2020, Birbnetes Team"
2020-03-25 01:19:22 +01:00
__module_name__ = "endpoints"
__version__text__ = "1"
2020-03-31 15:10:58 +02:00
2020-03-25 01:19:22 +01:00
class SampleResource(Resource):
"""
Sample endpoint
See: https://swagger.kmlabz.com/?urls.primaryName=Input%20Service
"""
2020-03-25 02:36:05 +01:00
2020-04-29 22:24:32 +02:00
sampleschema = SampleSchema(many=False)
2020-03-25 01:19:22 +01:00
def post(self):
2020-03-25 02:36:05 +01:00
"""
Post request send to the endpoint
:return:
"""
2021-08-10 14:40:57 +02:00
with opentracing.tracer.start_active_span('parseAndValidate'):
if 'file' not in request.files:
return abort(400, "no file found")
else:
soundfile = request.files['file']
if 'description' not in request.form:
return abort(400, "no description found")
else:
2021-08-17 17:53:16 +02:00
description_raw = request.form.get("description")
2021-08-10 14:40:57 +02:00
if soundfile.content_type != 'audio/wave':
current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.')
try:
2021-08-17 17:53:16 +02:00
desc = self.sampleschema.loads(description_raw)
2021-08-10 14:40:57 +02:00
except Exception as e:
current_app.logger.exception(e)
return abort(417, 'Input JSON schema invalid')
with opentracing.tracer.start_active_span('generateTag'):
xeger = Xeger(limit=30)
while True:
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
if len(generated_tag) > 2: # Ensure minimum length
break
2020-03-25 03:57:07 +01:00
2021-08-17 17:53:16 +02:00
with opentracing.tracer.start_active_span('publishMetaMessage'):
2021-08-17 18:05:32 +02:00
try:
magic_amqp.publish_meta(
2021-08-17 17:53:16 +02:00
{
'tag': generated_tag,
'timestamp': datetime.now().isoformat(),
'device_id': desc['device_id'],
'device_date': desc['date'].isoformat()
}
2021-08-10 14:40:57 +02:00
)
2021-08-17 17:53:16 +02:00
except Exception as e:
current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
2021-08-10 14:40:57 +02:00
2021-08-17 17:53:16 +02:00
with opentracing.tracer.start_active_span('readSampleToMemory'):
buf = io.BytesIO()
soundfile.save(buf)
2021-08-10 14:40:57 +02:00
2021-08-17 17:53:16 +02:00
with opentracing.tracer.start_active_span('putToCache'):
redis_client.set(generated_tag, buf.getbuffer()) # getbuffer is quick as it does not copy like getvalue
2021-08-10 14:40:57 +02:00
# Announce only after the data is successfully committed
2021-08-17 17:53:16 +02:00
with opentracing.tracer.start_active_span('publishInCacheMessage'):
2021-08-17 18:05:32 +02:00
try:
magic_amqp.publish_cache({
'tag': generated_tag,
'mime_type': soundfile.mimetype
})
2021-08-10 14:40:57 +02:00
except Exception as e:
current_app.logger.exception(e)
return abort(500, f"AMQP Publish error: {str(e)}")
2020-03-25 03:57:07 +01:00
# metrics
2021-07-26 12:51:52 +02:00
if current_app.config['ENABLE_INFLUXDB']:
2021-08-10 14:40:57 +02:00
with opentracing.tracer.start_active_span('influxdb.write_points'):
influx_db.write_points(
[
{
'time': datetime.now(tz=tzlocal.get_localzone()),
'measurement': 'cloudinput',
'tags': {
'device': desc['device_id']
},
'fields': {
'bruh': 1.0
}
2021-07-26 12:51:52 +02:00
}
2021-08-10 14:40:57 +02:00
]
)
2020-03-25 03:57:07 +01:00
2021-08-17 17:53:16 +02:00
return {"tag": generated_tag}, 200