#!/usr/bin/env python3 from uuid import UUID, uuid4 import datetime from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity from flask_restful import Resource from flask import request, current_app, abort from db import db from models import VideonUser, StreamResource, StreamResourceTypeEnum, OutputUrls from schemas import UserSchema, UserMetadataSchema, StreamResourceSchema, IngestInputSchema, EncodeInputSchema, \ RestreamInputSchema, CoordInputSchema from config import REGISTER_DISABLED import listdiffer from kuberclient import Kubectl """ Flask Restful endpoints """ __author__ = '@tormakris' __copyright__ = "Copyright 2020, videON Team" __module_name__ = "resources" __version__text__ = "1" INVALID_JSON_SCHEMA_MSG = "invalid json schema" DB_ERROR_MSG = "db session error" class SignupApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/createuser """ userschema = UserSchema(many=False) usermetadataschema = UserMetadataSchema(many=False) def post(self): if REGISTER_DISABLED: abort(401, "register disabled") body = request.get_json() try: userobj = self.userschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) user = VideonUser(name=userobj['name'], password=userobj['password']) user.hash_password() db.session.add(user) expires = datetime.timedelta(days=7) access_token = create_access_token(identity=str(user.name), expires_delta=expires) try: db.session.commit() except Exception as e: current_app.logger.warning(e) abort(409, "user already exists") return {'token': access_token}, 200 class LoginApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/logon """ userschema = UserSchema(many=False) usermetadataschema = UserMetadataSchema(many=False) def post(self): body = request.get_json() try: userobj = self.userschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) try: user = VideonUser.query.filter_by(name=userobj['name']).first() authorized = user.check_password(userobj['password']) if not authorized: abort(401, "username or password incorrect") except Exception as e: current_app.logger.info(e) abort(401, "unauthorized") user.last_logon = datetime.datetime.now() expires = datetime.timedelta(days=7) access_token = create_access_token(identity=str(user.name), expires_delta=expires) db.session.commit() return {'token': access_token}, 200 class MeApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/logon """ usermetadataschema = UserMetadataSchema(many=False) @jwt_required def get(self): username = get_jwt_identity() # username = "jozska" user = VideonUser.query.filter_by(name=username).first_or_404() return self.usermetadataschema.dump(user), 200 class UsersApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/getall """ usermetadataschema = UserMetadataSchema(many=True) @jwt_required def get(self): users = VideonUser.query.all() return self.usermetadataschema.dump(users), 200 class UserParameterApi(Resource): userschema = UserSchema(many=False) usermetadataschema = UserMetadataSchema(many=False) @jwt_required def get(self, username: str): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/getauser :param username: Username of user (url parameter) :return: """ user = VideonUser.query.filter_by(name=username).first_or_404() return self.usermetadataschema.dump(user), 200 @jwt_required def delete(self, username: str): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/deleteuser :param username: Username of user (url parameter) :return: """ user = VideonUser.query.filter_by(name=username).first_or_404() db.session.delete(user) try: db.session.commit() except Exception as e: current_app.logger.warning(e) abort(410, DB_ERROR_MSG) return self.usermetadataschema.dump(user), 200 @jwt_required def put(self, username: str): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/modifyUser :param username: Username of user (url parameter) :return: """ body = request.get_json() try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: userobj = self.userschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) user.password = userobj['password'] user.hash_password() try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.usermetadataschema.dump(user), 200 class CreateIngestResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/createIngest """ ingestinputschema = IngestInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def post(self): body = request.get_json() try: ingestobj = self.ingestinputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) ingest = StreamResource(resource_type=StreamResourceTypeEnum['INGEST'], url="rtmp://zelenka.tormakristof.eu/origin", x=ingestobj['x'], y=ingestobj['y']) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") ingest.owner_id = user.id ingest.stream_key = str(uuid4()) try: db.session.add(ingest) db.session.commit() Kubectl(name=ingest.id, resourcetype="ingest", stream_key=ingest.stream_key).create_resource() except Exception as e: current_app.logger.warning(e) abort(503, "object already exists") if 'outputNeighbours' in ingestobj: for neighbour in ingestobj['outputNeighbours']: neighbourobj = StreamResource.query.filter_by(id=neighbour).first_or_404() if neighbourobj.resource_type == StreamResourceTypeEnum.INGEST: abort(400, "ingest cannot be a downstream component") neighbourobj.parent_id = ingest.id try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(ingest), 200 class CreateRestreamResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/createRestream """ restreaminputschema = RestreamInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def post(self): body = request.get_json() try: restreamobj = self.restreaminputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) restream = StreamResource(resource_type=StreamResourceTypeEnum['RESTREAM'], url="rtmp://zelenka.tormakristof.eu/origin", x=restreamobj['x'], y=restreamobj['y']) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") restream.owner_id = user.id restream.stream_key = str(uuid4()) if 'inputNeighbour' in restreamobj: restream.parent_id = restreamobj['inputNeighbour'] try: db.session.add(restream) db.session.commit() Kubectl(name=restream.id, resourcetype="restream", stream_key=restream.stream_key, push_urls=restreamobj['outputURLs']).create_resource() except Exception as e: current_app.logger.warning(e) abort(409, "object already exists") for url in restreamobj['outputURLs']: outputurl = OutputUrls(output_url=url, streamresource_id=restream.id) db.session.add(outputurl) try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(restream), 200 class CreateEncodeResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/createEncode """ encodeinputschema = EncodeInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def post(self): body = request.get_json() try: encoderobj = self.encodeinputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) encoder = StreamResource(resource_type=StreamResourceTypeEnum['ENCODE'], url="rtmp://zelenka.tormakristof.eu/origin", x=encoderobj['x'], y=encoderobj['y']) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") encoder.owner_id = user.id encoder.stream_key = str(uuid4()) if 'inputNeighbour' in encoderobj: encoder.parent_id = encoderobj['inputNeighbour'] if 'bitrate' in encoderobj: encoder.bitrate = encoderobj['bitrate'] if 'width' in encoderobj: encoder.bitrate = encoderobj['width'] if 'height' in encoderobj: encoder.bitrate = encoderobj['height'] try: db.session.add(encoder) db.session.commit() Kubectl(name=encoder.id, resourcetype="encoder", stream_key=encoder.stream_key).create_resource() except Exception as e: current_app.logger.warning(e) abort(409, "object already exists") for neighbour in encoderobj['outputNeighbours']: neighbour = StreamResource.query.filter_by(id=neighbour).first_or_404() if neighbour.resource_type == StreamResourceTypeEnum.INGEST: abort(400, "ingest cannot be a downstream component") neighbour.parent_id = encoder.id try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(encoder), 200 class GetAllStreamResources(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/getResource """ streamresourceschema = StreamResourceSchema(many=True) @jwt_required def get(self): username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: streamreousrces = StreamResource.query.filter_by(owner_id=user.id).all() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") return self.streamresourceschema.dump(streamreousrces), 200 class ManipulateStreamResource(Resource): streamresourceschema = StreamResourceSchema(many=False) @jwt_required def get(self, resourceid: str): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/getAResource :param resourceid: :return: """ username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: streamreousrce = StreamResource.query.filter_by(id=UUID(resourceid), owner_id=user.id).first_or_404() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") return self.streamresourceschema.dump(streamreousrce), 200 @jwt_required def delete(self, resourceid: str): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/deleteResource :param resourceid: :return: """ username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: streamreousrce = StreamResource.query.filter_by(id=UUID(resourceid), owner_id=user.id).first_or_404() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") try: db.session.delete(streamreousrce) db.session.commit() Kubectl(name=streamreousrce.id, resourcetype=str(streamreousrce.resource_type)).delete_resource() except Exception as e: current_app.logger.exception(e) abort(410, DB_ERROR_MSG) return self.streamresourceschema.dump(streamreousrce), 200 class ModifyIngressResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/editIngress """ ingestinputschema = IngestInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def put(self, resourceid: str): body = request.get_json() try: ingestobj = self.ingestinputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: ingest = StreamResource.query.filter_by(id=resourceid, owner_id=user.id).first_or_404() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") ingest.x = ingestobj['x'] ingest.y = ingestobj['y'] currentoutputneighbours = ingest.children currentoutputneighboursids = [outputneighbour.id for outputneighbour in currentoutputneighbours] if 'outputNeighbours' in ingestobj: for removeneighbours in listdiffer.elementsinfirstlistbutnotinsecond(currentoutputneighboursids, ingestobj['outputNeighbours']): neighbourobj = StreamResource.query.filter_by(id=removeneighbours).first_or_404() neighbourobj.parent_id = None for addneighbours in listdiffer.elementsinsecondlistbutnotinfirst(currentoutputneighboursids, ingestobj['outputNeighbours']): neighbourobj = StreamResource.query.filter_by(id=addneighbours).first_or_404() if neighbourobj.resource_type == StreamResourceTypeEnum.INGEST: abort(400, "ingest cannot be a downstream component") neighbourobj.parent_id = ingest.id else: Kubectl(name=ingest.id, resourcetype="ingest", stream_key=ingest.stream_key).update_resource() if currentoutputneighbours: for currentneighbour in currentoutputneighbours: currentneighbour.parent_id = None try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(ingest), 200 class ModifyRestreamResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/editRestream """ restreaminputschema = RestreamInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def put(self, resourceid: str): body = request.get_json() try: restreamobj = self.restreaminputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: restream = StreamResource.query.filter_by(id=resourceid, owner_id=user.id).first_or_404() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") restream.x = restreamobj['x'] restream.y = restreamobj['y'] if 'inputNeighbour' in restreamobj: try: restream.parent_id = restreamobj['inputNeighbour'] except Exception as e: current_app.logger.warning(e) abort(503, "could not set parent id") currentoutputurls = OutputUrls.query.filter_by(streamresource_id=restream.id).all() currentoutputurlurls = [currentoutputurl.output_url for currentoutputurl in currentoutputurls] for deleteoutputurl in listdiffer.elementsinfirstlistbutnotinsecond(currentoutputurlurls, restreamobj['outputURLs']): elementtodelete = OutputUrls.query.filter_by(output_url=deleteoutputurl, streamresource_id=restream.id).first_or_404() db.session.delete(elementtodelete) for addoutputurl in listdiffer.elementsinsecondlistbutnotinfirst(currentoutputurlurls, restreamobj['outputURLs']): elementtoadd = OutputUrls(output_url=addoutputurl, streamresource_id=restream.id) db.session.add(elementtoadd) if currentoutputurlurls != restreamobj['outputURLs']: Kubectl(name=restream.id, resourcetype="restream", stream_key=restream.stream_key, push_urls=restreamobj['outputURLs']).update_resource() try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(restream), 200 class ModifyEncodeResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/editEncode """ encodeinputschema = EncodeInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def put(self, resourceid: str): body = request.get_json() try: encodeobj = self.encodeinputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: encode = StreamResource.query.filter_by(id=resourceid, owner_id=user.id).first_or_404() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") encode.x = encodeobj['x'] encode.y = encodeobj['y'] if 'inputNeighbour' in encodeobj: encode.parent_id = encodeobj['inputNeighbour'] if 'bitrate' in encodeobj: encode.bitrate = encodeobj['bitrate'] if 'width' in encodeobj: encode.width = encodeobj['width'] if 'height' in encodeobj: encode.height = encodeobj['height'] currentoutputneighbours = encode.children currentoutputneighboursids = [] for outputneighbour in currentoutputneighbours: currentoutputneighboursids.append(outputneighbour.id) if 'outputNeighbours' in encodeobj: for removeneighbours in listdiffer.elementsinfirstlistbutnotinsecond(currentoutputneighboursids, encodeobj['outputNeighbours']): neighbourobj = StreamResource.query.filter_by(id=removeneighbours).first_or_404() neighbourobj.parent_id = None for addneighbours in listdiffer.elementsinsecondlistbutnotinfirst(currentoutputneighboursids, encodeobj['outputNeighbours']): neighbourobj = StreamResource.query.filter_by(id=addneighbours).first_or_404() if neighbourobj.resource_type == StreamResourceTypeEnum.INGEST: abort(400, "ingest cannot be a downstream component") neighbourobj.parent_id = encode.id else: if currentoutputneighbours: for currentneighbour in currentoutputneighbours: currentneighbour.parent_id = None childurls = [] for child in encode.children: childurls.append(child.url) Kubectl(name=encode.id, resourcetype="encoder", stream_key=encode.stream_key, encode_push_urls=str(childurls)).update_resource() try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(encode), 200 class CoordModifyResource(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=videON%20Backend#/backend/editCoords """ coordinputschema = CoordInputSchema(many=False) streamresourceschema = StreamResourceSchema(many=False) @jwt_required def put(self, resourceid: str): body = request.get_json() try: coordobj = self.coordinputschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) username = get_jwt_identity() # username = "jozska" try: user = VideonUser.query.filter_by(name=username).first_or_404() except Exception as e: current_app.logger.warning(e) abort(428, "could not find user") try: resource = StreamResource.query.filter_by(id=resourceid, owner_id=user.id).first_or_404() except Exception as e: current_app.logger.warning(e) abort(422, "invalid arguments") resource.x = coordobj['x'] resource.y = coordobj['y'] try: db.session.commit() except Exception as e: current_app.logger.exception(e) abort(412, DB_ERROR_MSG) return self.streamresourceschema.dump(resource), 200