input-service/src/resources.py

164 lines
5.4 KiB
Python
Raw Normal View History

2020-03-25 01:19:22 +01:00
#!/usr/bin/env python3
2020-04-29 22:55:26 +02:00
import json
2021-06-13 20:01:23 +02:00
from datetime import datetime
import tzlocal
2020-03-31 16:51:31 +02:00
from xeger import Xeger
from flask_restful import Resource
2021-07-26 12:32:36 +02:00
from flask import request, current_app, abort
2020-03-31 14:46:46 +02:00
import requests
2020-10-19 23:31:40 +02:00
import pika
2020-03-31 15:10:58 +02:00
from db import db
2021-06-13 20:01:23 +02:00
from influxus import influx_db
2020-03-25 02:54:59 +01:00
from models import SampleMetadata
2020-04-29 22:24:32 +02:00
from schemas import SampleSchema, SampleMetadataSchema
2020-03-25 01:19:22 +01:00
"""
Flask Restful endpoints
"""
__author__ = '@tormakris'
2020-03-25 02:36:05 +01:00
__copyright__ = "Copyright 2020, Birbnetes Team"
2020-03-25 01:19:22 +01:00
__module_name__ = "endpoints"
__version__text__ = "1"
2020-03-31 15:10:58 +02:00
2020-03-25 01:19:22 +01:00
class SampleResource(Resource):
"""
Sample endpoint
See: https://swagger.kmlabz.com/?urls.primaryName=Input%20Service
"""
2020-03-25 02:36:05 +01:00
2020-04-29 22:24:32 +02:00
sampleschema = SampleSchema(many=False)
samplemetadataschema = SampleMetadataSchema(many=True)
2020-03-25 01:19:22 +01:00
def post(self):
2020-03-25 02:36:05 +01:00
"""
Post request send to the endpoint
:return:
"""
2020-10-20 00:15:37 +02:00
if 'file' not in request.files:
2021-07-26 12:32:36 +02:00
return abort(400, "no file found")
2020-10-20 00:15:37 +02:00
else:
soundfile = request.files['file']
if 'description' not in request.form:
2021-07-26 12:32:36 +02:00
return abort(400, "no description found")
2020-10-20 00:15:37 +02:00
else:
description = request.form.get("description")
if soundfile.content_type != 'audio/wave':
2021-07-26 12:32:36 +02:00
current_app.logger.info(f"Input file was not WAV.")
return abort(415, 'Input file not a wave file.')
2020-10-20 00:15:37 +02:00
try:
desc = self.sampleschema.loads(description)
except Exception as e:
current_app.logger.exception(e)
2021-07-26 12:32:36 +02:00
return abort(417, 'Input JSON schema invalid')
2020-10-20 00:15:37 +02:00
xeger = Xeger(limit=30)
while True:
generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32]
if len(generated_tag) > 2: # Ensure minimum length
break
2020-03-25 03:57:07 +01:00
# Handle mega-autismo-cliento
soundfile_content_length = soundfile.content_length
if soundfile_content_length <= 0: # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH
current_app.logger.debug(
"The uploader did not provide content-length for the sound file... Calculating manually..."
)
# So, this is a seekable stream, so we just seek to the end
old_ptr = soundfile.tell()
soundfile.seek(0, 2)
# Check where is the end (= content length)
soundfile_content_length = soundfile.tell()
# Seek back to where the stream was
soundfile.seek(old_ptr, 0)
# It's insane, that you can not set this field in curl
2020-03-31 14:46:46 +02:00
record = SampleMetadata(
2020-03-31 17:32:04 +02:00
device_id=desc['device_id'],
device_date=desc['date'],
2021-07-26 12:32:36 +02:00
tag=generated_tag
)
db.session.add(record)
files = {
'description': (None, json.dumps({'tag': generated_tag}), 'application/json'),
'soundFile': (
'wave.wav',
soundfile,
soundfile.content_type,
{'Content-Length': soundfile_content_length})}
r = requests.post(
f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
files=files)
if r.status_code not in [200, 201]:
return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
2020-03-31 14:46:46 +02:00
try:
2020-10-20 00:15:37 +02:00
credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'],
current_app.config['FLASK_PIKA_PARAMS']['password'])
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'],
credentials=credentials,
heartbeat=0,
socket_timeout=5))
channel = connection.channel()
channel.exchange_declare(exchange=current_app.config['EXCHANGE_NAME'],
2020-10-24 00:28:25 +02:00
exchange_type='direct')
2020-10-20 00:15:37 +02:00
channel.basic_publish(exchange=current_app.config['EXCHANGE_NAME'],
routing_key='feature',
body=json.dumps({'tag': generated_tag}).encode('UTF-8'))
connection.close()
2020-03-31 14:46:46 +02:00
except Exception as e:
2020-10-02 04:31:20 +02:00
current_app.logger.exception(e)
2021-07-26 12:32:36 +02:00
return abort(569, "AMPQ Publish error")
2020-03-25 03:57:07 +01:00
2021-07-26 12:45:01 +02:00
influx_db.write_points(
[
{
'time': datetime.now(tz=tzlocal.get_localzone()),
'measurement': 'cloudinput',
'tags': {
'device': desc['device_id']
},
'fields': {
'bruh': 1.0
}
}
]
)
2020-03-25 03:57:07 +01:00
2020-03-31 14:46:46 +02:00
db.session.commit()
2020-04-13 00:47:05 +02:00
return {"tag": generated_tag}, 200
2020-03-31 15:10:58 +02:00
2021-07-26 12:32:36 +02:00
def get(self):
"""
Get all stored items
:return:
"""
samples = SampleMetadata.query.all()
return self.samplemetadataschema.dump(list(samples)), 200
2020-03-31 15:10:58 +02:00
class SampleParameterResource(Resource):
"""
Sample endpoint with parameters
"""
2020-04-29 22:24:32 +02:00
samplemetadataschema = SampleMetadataSchema(many=False)
2020-03-31 15:10:58 +02:00
def get(self, tag: str):
"""
Get a specific item
:param tag:
:return:
"""
sample = SampleMetadata.query.filter_by(tag=tag).first_or_404()
2020-06-20 20:15:15 +02:00
return self.samplemetadataschema.dump(sample), 200