diff --git a/model_service/views/cnn_view.py b/model_service/views/cnn_view.py index 341d774..16ddd08 100644 --- a/model_service/views/cnn_view.py +++ b/model_service/views/cnn_view.py @@ -6,6 +6,7 @@ from minio.error import NoSuchKey from schemas import AIModelSchema, DefaultSchema, InfoSchema from marshmallow.exceptions import ValidationError from utils import multipart_required, storage, ensure_buckets +import opentracing class CNNView(FlaskView): @@ -21,88 +22,155 @@ class CNNView(FlaskView): def post(self): # get important data from the request - try: - info = self.info_schema.loads(request.form.get('info')) - except ValidationError as e: - abort(400, str(e)) + with opentracing.tracer.start_active_span('parseAndValidate'): + try: + info = self.info_schema.loads(request.form.get('info')) + except ValidationError as e: + return abort(400, str(e)) - # check for conflict - m = AIModel.query.filter_by(id=info['id']).first() - if m: - abort(409) + # check for conflict + m = AIModel.query.filter_by(id=info['id']).first() + if m: + return abort(409) - # get and validate file - model_file = request.files['modelFile'] + # get and validate file + model_file = request.files['modelFile'] - if model_file.content_length <= 0: - abort(411, f"Content length for modelFile is not a positive integer or missing.") + if model_file.content_length <= 0: + return abort(411, f"Content length for modelFile is not a positive integer or missing.") - weights_file = request.files['weightsFile'] + weights_file = request.files['weightsFile'] - if weights_file.content_length <= 0: - abort(411, f"Content length for weightsFile is not a positive integer or missing.") + if weights_file.content_length <= 0: + return abort(411, f"Content length for weightsFile is not a positive integer or missing.") # create bucket if necessary - ensure_buckets() + with opentracing.tracer.start_active_span('ensureBuckets'): + ensure_buckets() # Create the entry in the db - m = AIModel(id=info['id'], type=AIModelType.cnn, target_class_name=info['target_class_name']) + with opentracing.tracer.start_active_span('sqlalchemy.create'): + m = AIModel(id=info['id'], type=AIModelType.cnn, target_class_name=info['target_class_name']) + db.session.add(m) # Put files into MinIO - storage.connection.put_object( - current_app.config['MINIO_CNN_BUCKET_NAME'], - self.MODEL_DIRECTORY + str(m.id), - model_file, - model_file.content_length, - content_type=model_file.content_type - ) + with opentracing.tracer.start_active_span('putObjectsInMinio'): + with opentracing.tracer.start_active_span( + 'minio.putObject', + tags={ + "bucket": current_app.config['MINIO_CNN_BUCKET_NAME'], + "object_name": self.MODEL_DIRECTORY + str(m.id), + "length": model_file.content_length, + "component": "model" + } + ): + storage.connection.put_object( + current_app.config['MINIO_CNN_BUCKET_NAME'], + self.MODEL_DIRECTORY + str(m.id), + model_file, + model_file.content_length, + content_type=model_file.content_type + ) + with opentracing.tracer.start_active_span( + 'minio.putObject', + tags={ + "bucket": current_app.config['MINIO_CNN_BUCKET_NAME'], + "object_name": self.WEIGHTS_DIRECTORY + str(m.id), + "length": model_file.content_type, + "component": "weights" + } + ): + storage.connection.put_object( + current_app.config['MINIO_CNN_BUCKET_NAME'], + self.WEIGHTS_DIRECTORY + str(m.id), + weights_file, + weights_file.content_length, + content_type=weights_file.content_type + ) - storage.connection.put_object( - current_app.config['MINIO_CNN_BUCKET_NAME'], - self.WEIGHTS_DIRECTORY + str(m.id), - weights_file, - weights_file.content_length, - content_type=weights_file.content_type - ) - - db.session.add(m) - db.session.commit() + with opentracing.tracer.start_active_span('sqlalchemy.commit'): + db.session.commit() return jsonify(self.aimodel_schema.dump(m)), 200 def delete(self, id_: str): - if id_ == "$default": - default = Default.query.filter_by(type=AIModelType.cnn).first_or_404() - m = default.aimodel - else: - m = AIModel.query.filter_by(type=AIModelType.cnn, id=id_).first_or_404() + with opentracing.tracer.start_active_span( + 'sqlalchemy.select', + tags={"aimodel_type": AIModelType.cnn, "id": id_} + ): + if id_ == "$default": + default = Default.query.filter_by(type=AIModelType.cnn).first_or_404() + m = default.aimodel + else: + m = AIModel.query.filter_by(type=AIModelType.cnn, id=id_).first_or_404() - storage.connection.remove_object(current_app.config['MINIO_CNN_BUCKET_NAME'], self.WEIGHTS_DIRECTORY + str(m.id)) - storage.connection.remove_object(current_app.config['MINIO_CNN_BUCKET_NAME'], self.MODEL_DIRECTORY + str(m.id)) + with opentracing.tracer.start_active_span('removeFromMinio'): + with opentracing.tracer.start_active_span( + 'minio.removeObject', + tags={ + "bucket": current_app.config['MINIO_CNN_BUCKET_NAME'], + "object_name": self.WEIGHTS_DIRECTORY + str(m.id), + "component": "weights" + } + ): + storage.connection.remove_object(current_app.config['MINIO_CNN_BUCKET_NAME'], + self.WEIGHTS_DIRECTORY + str(m.id)) - db.session.delete(m) - db.session.commit() + with opentracing.tracer.start_active_span( + 'minio.removeObject', + tags={ + "bucket": current_app.config['MINIO_CNN_BUCKET_NAME'], + "object_name": self.MODEL_DIRECTORY + str(m.id), + "component": "model" + } + ): + storage.connection.remove_object(current_app.config['MINIO_CNN_BUCKET_NAME'], + self.MODEL_DIRECTORY + str(m.id)) + + with opentracing.tracer.start_active_span( + 'sqlalchemy.delete', + tags={"aimodel_type": AIModelType.cnn, "id": id_} + ): + db.session.delete(m) + + with opentracing.tracer.start_active_span('sqlalchemy.commit'): + db.session.commit() return '', 204 @route('/file') def get_file(self, id_: str): - if id_ == "$default": - default = Default.query.filter_by(type=AIModelType.cnn).first_or_404() - m = default.aimodel - else: - m = AIModel.query.filter_by(type=AIModelType.cnn, id=id_).first_or_404() + with opentracing.tracer.start_active_span( + 'sqlalchemy.select', + tags={"aimodel_type": AIModelType.cnn, "id": id_} + ): + if id_ == "$default": + default = Default.query.filter_by(type=AIModelType.cnn).first_or_404() + m = default.aimodel + else: + m = AIModel.query.filter_by(type=AIModelType.cnn, id=id_).first_or_404() if "weights" in request.args: path = self.WEIGHTS_DIRECTORY + str(m.id) + component = "weights" else: path = self.MODEL_DIRECTORY + str(m.id) + component = "model" - try: - data = storage.connection.get_object(current_app.config['MINIO_CNN_BUCKET_NAME'], path) - except NoSuchKey: - abort(500, "The ID is stored in the database but not int the Object Store") + with opentracing.tracer.start_active_span( + 'minio.getObject', + tags={ + "bucket": current_app.config['MINIO_CNN_BUCKET_NAME'], + "object_name": path, + "component": component + } + ): + # Note: this only initiates the download, the file download itself is streamed + try: + data = storage.connection.get_object(current_app.config['MINIO_CNN_BUCKET_NAME'], path) + except NoSuchKey: + return abort(500, "The ID is stored in the database but not int the Object Store") return Response(data.stream(), mimetype=data.headers['Content-type']) diff --git a/model_service/views/root_view.py b/model_service/views/root_view.py index d5362eb..e8980a1 100644 --- a/model_service/views/root_view.py +++ b/model_service/views/root_view.py @@ -9,6 +9,8 @@ from model import db, AIModel, AIModelType, Default from schemas import AIModelSchema, DefaultSchema +import opentracing + class RootView(FlaskView): route_base = '/' @@ -21,7 +23,9 @@ class RootView(FlaskView): ## Shared stuff goes here def index(self): - models = AIModel.query.all() + with opentracing.tracer.start_active_span('sqlalchemy.select'): + models = AIModel.query.all() + return jsonify(self.aimodels_schema.dump(models)) @route('/') @@ -29,9 +33,11 @@ class RootView(FlaskView): try: aimodel_type = AIModelType[type_] except KeyError: - abort(404, "Unknown type") + return abort(404, "Unknown type") + + with opentracing.tracer.start_active_span('sqlalchemy.select', tags={"aimodel_type": aimodel_type}): + models = AIModel.query.filter_by(type=aimodel_type).all() - models = AIModel.query.filter_by(type=aimodel_type).all() return jsonify(self.aimodels_schema.dump(models)), 200 @route('//') @@ -39,32 +45,41 @@ class RootView(FlaskView): try: aimodel_type = AIModelType[type_] except KeyError: - abort(404, "Unknown type") + return abort(404, "Unknown type") - if id_ == "$default": - default = Default.query.filter_by(type=aimodel_type).first_or_404() - m = default.aimodel - else: - m = AIModel.query.filter_by(type=aimodel_type, id=id_).first_or_404() + with opentracing.tracer.start_active_span( + 'sqlalchemy.select', + tags={"aimodel_type": aimodel_type, "id": id_} + ): + if id_ == "$default": + + default = Default.query.filter_by(type=aimodel_type).first_or_404() + m = default.aimodel + else: + m = AIModel.query.filter_by(type=aimodel_type, id=id_).first_or_404() # Append download links - details = self.aimodel_schema.dump(m) + with opentracing.tracer.start_active_span( + 'compileResponseDict', + tags={"id": id_} + ): + details = self.aimodel_schema.dump(m) - # Vagy ez, vagy visszateszem a saját view-jébe és duplikálva lesz az egész - if aimodel_type == AIModelType.cnn: - details.update({ - "files": { - "model": url_for("CNNView:get_file", id_=m.id), - "weights": url_for("CNNView:get_file", id_=m.id, weights=''), - } - }) - elif aimodel_type == AIModelType.svm: - details.update({ - "files": { - "model": url_for("SVMView:get_file", id_=m.id), - "means": url_for("SVMView:get_file", id_=m.id, means=''), - } - }) + # Vagy ez, vagy visszateszem a saját view-jébe és duplikálva lesz az egész + if aimodel_type == AIModelType.cnn: + details.update({ + "files": { + "model": url_for("CNNView:get_file", id_=m.id), + "weights": url_for("CNNView:get_file", id_=m.id, weights=''), + } + }) + elif aimodel_type == AIModelType.svm: + details.update({ + "files": { + "model": url_for("SVMView:get_file", id_=m.id), + "means": url_for("SVMView:get_file", id_=m.id, means=''), + } + }) return jsonify(details) @@ -75,18 +90,24 @@ class RootView(FlaskView): try: aimodel_type = AIModelType[type_] except KeyError: - abort(404, "Unknown type") + return abort(404, "Unknown type") try: req = self.default_schema.load(request.json) except ValidationError as e: - abort(400, str(e)) + return abort(400, str(e)) - m = AIModel.query.filter_by(type=aimodel_type, id=req['id']).first_or_404() + with opentracing.tracer.start_active_span('sqlalchemy.select'): + m = AIModel.query.filter_by(type=aimodel_type, id=req['id']).first_or_404() - Default.query.filter_by(type=aimodel_type).delete() - new_default = Default(type=aimodel_type, aimodel=m) - db.session.add(new_default) - db.session.commit() + with opentracing.tracer.start_active_span('sqlalchemy.delete'): + Default.query.filter_by(type=aimodel_type).delete() + + with opentracing.tracer.start_active_span('sqlalchemy.create'): + new_default = Default(type=aimodel_type, aimodel=m) + db.session.add(new_default) + + with opentracing.tracer.start_active_span('sqlalchemy.commit'): + db.session.commit() return '', 204 diff --git a/model_service/views/svm_view.py b/model_service/views/svm_view.py index 06eba90..4fbea0d 100644 --- a/model_service/views/svm_view.py +++ b/model_service/views/svm_view.py @@ -9,6 +9,7 @@ from schemas import AIModelSchema, InfoSchema from marshmallow.exceptions import ValidationError from utils import storage, ensure_buckets, multipart_required from pyAudioAnalysis.audioTrainTest import load_model +import opentracing class SVMView(FlaskView): @@ -24,94 +25,130 @@ class SVMView(FlaskView): def post(self): # get important data from the request - try: - info = self.info_schema.loads(request.form.get('info')) - except ValidationError as e: - abort(400, str(e)) + with opentracing.tracer.start_active_span('parseAndValidate'): + try: + info = self.info_schema.loads(request.form.get('info')) + except ValidationError as e: + return abort(400, str(e)) - # check for conflict - m = AIModel.query.filter_by(id=info['id']).first() - if m: - abort(409) + # check for conflict + m = AIModel.query.filter_by(id=info['id']).first() + if m: + return abort(409) - # get and validate file - model_file = request.files['modelFile'] + # get and validate file + model_file = request.files['modelFile'] - if model_file.content_length <= 0: - abort(411, f"Content length for modelFile is not a positive integer or missing.") + if model_file.content_length <= 0: + return abort(411, f"Content length for modelFile is not a positive integer or missing.") - means_file = request.files['meansFile'] + means_file = request.files['meansFile'] - if means_file.content_length <= 0: - abort(411, f"Content length for meansFile is not a positive integer or missing.") + if means_file.content_length <= 0: + return abort(411, f"Content length for meansFile is not a positive integer or missing.") # create bucket if necessary - ensure_buckets() + with opentracing.tracer.start_active_span('ensureBuckets'): + ensure_buckets() # Temporarily save the file, because pyAudioAnalysis can only read files - temp_model_handle, temp_model_filename = tempfile.mkstemp() - temp_means_filename = temp_model_filename + "MEANS" + with opentracing.tracer.start_active_span('tempfile.save'): + temp_model_handle, temp_model_filename = tempfile.mkstemp() + temp_means_filename = temp_model_filename + "MEANS" - os.close(temp_model_handle) # BRUUUUH + os.close(temp_model_handle) # BRUUUUH - model_file.save(temp_model_filename) - means_file.save(temp_means_filename) + model_file.save(temp_model_filename) + means_file.save(temp_means_filename) - try: + with opentracing.tracer.start_active_span('pyAudioAnalysis.readModel'): + try: - _, _, _, classes, mid_window, mid_step, short_window, short_step, compute_beat \ - = load_model(temp_model_filename) + _, _, _, classes, mid_window, mid_step, short_window, short_step, compute_beat \ + = load_model(temp_model_filename) - if info['target_class_name'] not in classes: - abort(400, f"This model does not have a class named {info['target_class_name']}") + if info['target_class_name'] not in classes: + return abort(400, f"This model does not have a class named {info['target_class_name']}") - # Because of pyAudiomeme the files already saved, so we just use the file uploader functions - storage.connection.fput_object( - current_app.config['MINIO_SVM_BUCKET_NAME'], - self.MODEL_DIRECTORY + str(info['id']), - temp_model_filename + # Because of pyAudiomeme the files already saved, so we just use the file uploader functions + storage.connection.fput_object( + current_app.config['MINIO_SVM_BUCKET_NAME'], + self.MODEL_DIRECTORY + str(info['id']), + temp_model_filename + ) + + storage.connection.fput_object( + current_app.config['MINIO_SVM_BUCKET_NAME'], + self.MEANS_DIRECTORY + str(info['id']), + temp_means_filename + ) + + finally: + os.remove(temp_model_filename) + os.remove(temp_means_filename) + + with opentracing.tracer.start_active_span('sqlalchemy.create'): + m = AIModel(id=info['id'], type=AIModelType.svm, target_class_name=info['target_class_name']) + db.session.add(m) + + d = SVMDetails( + aimodel=m, + mid_window=mid_window, + mid_step=mid_step, + short_window=short_window, + short_step=short_step, + compute_beat=compute_beat ) + db.session.add(d) - storage.connection.fput_object( - current_app.config['MINIO_SVM_BUCKET_NAME'], - self.MEANS_DIRECTORY + str(info['id']), - temp_means_filename - ) - - finally: - os.remove(temp_model_filename) - os.remove(temp_means_filename) - - m = AIModel(id=info['id'], type=AIModelType.svm, target_class_name=info['target_class_name']) - - d = SVMDetails( - aimodel=m, - mid_window=mid_window, - mid_step=mid_step, - short_window=short_window, - short_step=short_step, - compute_beat=compute_beat - ) - - db.session.add(m) - db.session.add(d) - db.session.commit() + with opentracing.tracer.start_active_span('sqlalchemy.commit'): + db.session.commit() return jsonify(self.aimodel_schema.dump(m)), 200 def delete(self, id_: str): - if id_ == "$default": - default = Default.query.filter_by(type=AIModelType.svm).first_or_404() - m = default.aimodel - else: - m = AIModel.query.filter_by(type=AIModelType.svm, id=id_).first_or_404() + with opentracing.tracer.start_active_span( + 'sqlalchemy.select', + tags={"aimodel_type": AIModelType.svm, "id": id_} + ): + if id_ == "$default": + default = Default.query.filter_by(type=AIModelType.svm).first_or_404() + m = default.aimodel + else: + m = AIModel.query.filter_by(type=AIModelType.svm, id=id_).first_or_404() - storage.connection.remove_object(current_app.config['MINIO_SVM_BUCKET_NAME'], self.MEANS_DIRECTORY + str(m.id)) - storage.connection.remove_object(current_app.config['MINIO_SVM_BUCKET_NAME'], self.MODEL_DIRECTORY + str(m.id)) + with opentracing.tracer.start_active_span('removeFromMinio'): + with opentracing.tracer.start_active_span( + 'minio.removeObject', + tags={ + "bucket": current_app.config['MINIO_SVM_BUCKET_NAME'], + "object_name": self.MEANS_DIRECTORY + str(m.id), + "component": "means" + } + ): + storage.connection.remove_object(current_app.config['MINIO_SVM_BUCKET_NAME'], + self.MEANS_DIRECTORY + str(m.id)) - db.session.delete(m) - db.session.commit() + with opentracing.tracer.start_active_span( + 'minio.removeObject', + tags={ + "bucket": current_app.config['MINIO_SVM_BUCKET_NAME'], + "object_name": self.MODEL_DIRECTORY + str(m.id), + "component": "model" + } + ): + storage.connection.remove_object(current_app.config['MINIO_SVM_BUCKET_NAME'], + self.MODEL_DIRECTORY + str(m.id)) + + with opentracing.tracer.start_active_span( + 'sqlalchemy.delete', + tags={"aimodel_type": AIModelType.svm, "id": id_} + ): + db.session.delete(m) + + with opentracing.tracer.start_active_span('sqlalchemy.commit'): + db.session.commit() return '', 204 @@ -119,20 +156,34 @@ class SVMView(FlaskView): @route('/file') def get_file(self, id_: str): - if id_ == "$default": - default = Default.query.filter_by(type=AIModelType.svm).first_or_404() - m = default.aimodel - else: - m = AIModel.query.filter_by(type=AIModelType.svm, id=id_).first_or_404() + with opentracing.tracer.start_active_span( + 'sqlalchemy.select', + tags={"aimodel_type": AIModelType.svm, "id": id_} + ): + if id_ == "$default": + default = Default.query.filter_by(type=AIModelType.svm).first_or_404() + m = default.aimodel + else: + m = AIModel.query.filter_by(type=AIModelType.svm, id=id_).first_or_404() if "means" in request.args: path = self.MEANS_DIRECTORY + str(m.id) + component = "means" else: path = self.MODEL_DIRECTORY + str(m.id) + component = "model" - try: - data = storage.connection.get_object(current_app.config['MINIO_SVM_BUCKET_NAME'], path) - except NoSuchKey: - abort(500, "The ID is stored in the database but not int the Object Store") + with opentracing.tracer.start_active_span( + 'minio.getObject', + tags={ + "bucket": current_app.config['MINIO_SVM_BUCKET_NAME'], + "object_name": path, + "component": component + } + ): + try: + data = storage.connection.get_object(current_app.config['MINIO_SVM_BUCKET_NAME'], path) + except NoSuchKey: + return abort(500, "The ID is stored in the database but not int the Object Store") return Response(data.stream(), mimetype=data.headers['Content-type']) diff --git a/requirements.txt b/requirements.txt index 343feff..ba4c029 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ requests blinker -Flask -marshmallow +Flask~=1.1.2 +marshmallow~=3.7.1 Flask-Classful Flask-SQLAlchemy SQLAlchemy-Utils -SQLAlchemy +SQLAlchemy~=1.3.19 marshmallow-sqlalchemy~=0.26.1 marshmallow-enum psycopg2-binary @@ -13,6 +13,7 @@ minio~=6.0.0 flask_minio sentry-sdk py-healthcheck +Werkzeug~=1.0.1 cython @@ -25,7 +26,7 @@ joblib==0.14.1 kiwisolver==1.2.0 matplotlib==3.2.1 numpy==1.18.2 -pyAudioAnalysis==0.3.0 +pyAudioAnalysis==0.3.6 pydub==0.23.1 pyparsing==2.4.6 python-dateutil==2.8.1 @@ -35,4 +36,5 @@ simplejson==3.17.0 six==1.14.0 jaeger-client -Flask-Opentracing \ No newline at end of file +Flask-Opentracing +opentracing~=2.4.0 \ No newline at end of file