diff --git a/Dockerfile b/Dockerfile index 8db1da3..3aabc85 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,10 @@ FROM python:3 -ADD classification_service requirements.txt /classification_service/ +ADD classification_service requirements.txt uwsgi.ini /classification_service/ WORKDIR /classification_service/ -RUN pip3 install -r requirements.txt && pip3 install gunicorn +RUN pip3 install -r requirements.txt +USER 33 EXPOSE 8000 -CMD ["gunicorn", "-b", "0.0.0.0:8000", "app:app"] +CMD ["uwsgi", "--ini", "uwsgi.ini"] diff --git a/classification_service/app.py b/classification_service/app.py index e0506d4..3ce8822 100644 --- a/classification_service/app.py +++ b/classification_service/app.py @@ -11,7 +11,6 @@ from utils import register_all_error_handlers # import views from views import ClassifyView - # Setup sentry SENTRY_DSN = os.environ.get("SENTRY_DSN") if SENTRY_DSN: @@ -36,7 +35,6 @@ register_all_error_handlers(app) for view in [ClassifyView]: view.register(app, trailing_slash=False) - # start debugging if needed if __name__ == "__main__": app.run(debug=True) diff --git a/classification_service/mule.py b/classification_service/mule.py new file mode 100644 index 0000000..cb0c7f7 --- /dev/null +++ b/classification_service/mule.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +import sentry_sdk +import os +import requests +import tempfile +import numpy +import json +import pika +import uwsgi + +from pyAudioAnalysis.audioTrainTest import load_model, load_model_knn, classifier_wrapper + +SENTRY_DSN = os.environ.get("SENTRY_DSN") +if SENTRY_DSN: + sentry_sdk.init( + dsn=SENTRY_DSN, + send_default_pii=True, + release=os.environ.get('RELEASE_ID', 'test'), + environment=os.environ.get('RELEASEMODE', 'dev') + ) + + +def run_classification(task): + _, temp_model_name = tempfile.mkstemp() + temp_means_name = temp_model_name + "MEANS" + + r = requests.get(f"http://model-service/model/{task['model']}/details") + r.raise_for_status() + + model_details = r.json() + + try: + + r = requests.get(f"http://model-service/model/{task['model']}") + r.raise_for_status() + + with open(temp_model_name, 'wb') as f: + f.write(r.content) + + r = requests.get(f"http://model-service/model/{task['model']}?means") + r.raise_for_status() + + with open(temp_means_name, 'wb') as f: + f.write(r.content) + + if model_details['type'] == 'knn': + classifier, mean, std, classes, mid_window, mid_step, short_window, short_step, compute_beat \ + = load_model_knn(temp_model_name) + + else: + classifier, mean, std, classes, mid_window, mid_step, short_window, short_step, compute_beat \ + = load_model(temp_model_name) + + feature_vector = (numpy.array(task['features']) - mean) / std + class_id, probability = classifier_wrapper(classifier, model_details['type'], feature_vector) + + finally: # bruuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuh + try: + os.remove(temp_model_name) + except FileNotFoundError: + pass + + try: + os.remove(temp_means_name) + except FileNotFoundError: + pass + + results = { + "tag": task['tag'], + "model": task['model'], + "class_id": class_id, + "probability": probability + } + + return results + + +def main(): + connection = pika.BlockingConnection(pika.connection.URLParameters(os.environ['PIKA_URL'])) + channel = connection.channel() + channel.exchange_declare(exchange=os.environ['PIKA_EXCHANGE'], exchange_type='fanout') + + while True: + message = uwsgi.mule_get_msg() + task = json.loads(message) + results = run_classification(task) + channel.basic_publish(exchange=os.environ['PIKA_EXCHANGE'], routing_key='classification-result', body=results) + + +if __name__ == '__main__': + main() diff --git a/classification_service/utils/rabbit.py b/classification_service/utils/rabbit.py deleted file mode 100644 index 3370599..0000000 --- a/classification_service/utils/rabbit.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 - -import pika -from config import * - -""" -Rabbitmq setup -""" - -__author__ = '@tormakris' -__copyright__ = "Copyright 2020, Birbnetes Team" -__module_name__ = "rabbit" -__version__text__ = "1" - -credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) -rabbitmq = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials)) -rabbitmq_channel = rabbitmq.channel() -rabbitmq_channel.exchange_declare(exchange=RABBITMQ_EXCHANGE, exchange_type='fanout') - -"""Usage: - from rabbit import rabbitmq_channel - rabbitmq_channel.basic_publish( - exchange=RABBITMQ_EXCHANGE, - routing_key='feature', - body=generated_tag) -""" \ No newline at end of file diff --git a/classification_service/utils/require_decorators.py b/classification_service/utils/require_decorators.py index ba5b9ab..95daa39 100644 --- a/classification_service/utils/require_decorators.py +++ b/classification_service/utils/require_decorators.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from flask import request, current_app, abort +from flask import request, abort from functools import wraps diff --git a/classification_service/views/classify_view.py b/classification_service/views/classify_view.py index 51ae8fe..38789a1 100644 --- a/classification_service/views/classify_view.py +++ b/classification_service/views/classify_view.py @@ -1,70 +1,15 @@ #!/usr/bin/env python3 -import os from flask import request, jsonify from flask_classful import FlaskView from utils import json_required -import requests -import tempfile -import numpy - -from pyAudioAnalysis.audioTrainTest import load_model, load_model_knn, classifier_wrapper +import json +import uwsgi class ClassifyView(FlaskView): @json_required def post(self): - task = request.json # tag, features, model - - _, temp_model_name = tempfile.mkstemp() - temp_means_name = temp_model_name + "MEANS" - - r = requests.get(f"http://model-service/model/{task['model']}/details") - r.raise_for_status() - - model_details = r.json() - - try: - - r = requests.get(f"http://model-service/model/{task['model']}") - r.raise_for_status() - - with open(temp_model_name, 'wb') as f: - f.write(r.content) - - r = requests.get(f"http://model-service/model/{task['model']}?means") - r.raise_for_status() - - with open(temp_means_name, 'wb') as f: - f.write(r.content) - - if model_details['type'] == 'knn': - classifier, mean, std, classes, mid_window, mid_step, short_window, short_step, compute_beat \ - = load_model_knn(temp_model_name) - - else: - classifier, mean, std, classes, mid_window, mid_step, short_window, short_step, compute_beat \ - = load_model(temp_model_name) - - feature_vector = (numpy.array(task['features']) - mean) / std - class_id, probability = classifier_wrapper(classifier, model_details['type'], feature_vector) - - finally: # bruuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuh - try: - os.remove(temp_model_name) - except FileNotFoundError: - pass - - try: - os.remove(temp_means_name) - except FileNotFoundError: - pass - - # TODO: Publish to message queue - results = { - "class_id": class_id, - "probability": probability - } - + uwsgi.mule_msg(json.dumps(task)) return jsonify({"status": "OK", "msg": "enqueued"}) diff --git a/requirements.txt b/requirements.txt index 656ca61..6e928a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ marshmallow Flask-Classful sentry-sdk pika +uwsgi pyAudioanalysis numpy diff --git a/uwsgi.ini b/uwsgi.ini new file mode 100644 index 0000000..21b0263 --- /dev/null +++ b/uwsgi.ini @@ -0,0 +1,12 @@ +[uwsgi] +module = wsgi +http-socket = :8000 + +master = true +enable-threads = true + +die-on-term = true + +manage-script-name = true +mount=/=app:app +mule=mule.py \ No newline at end of file