#!/usr/bin/env python3 import uuid from flask_restful import Resource from flask import request, current_app, abort import musicbrainzngs from fred import flaskred from config import ENCODED_SECRET_KEY from schemas import UserSchema from aes_encrypt import EncryptedUserRedis """ Flask Restful endpoints """ __author__ = '@tormakris' __copyright__ = "Copyright 2020, onSpot Team" __module_name__ = "resources" __version__text__ = "1" INVALID_JSON_SCHEMA_MSG = "invalid json schema" class LoginApi(Resource): userschema = UserSchema(many=False) encryptor = EncryptedUserRedis(ENCODED_SECRET_KEY) def post(self): """ See: https://swagger.kmlabz.com/?urls.primaryName=onSpot%20Backend#/backend/logon """ body = request.get_json() try: userobj = self.userschema.load(body) except Exception as e: current_app.logger.warning(e) abort(417, INVALID_JSON_SCHEMA_MSG) try: musicbrainzngs.auth(userobj['name'], userobj['password']) print(musicbrainzngs.get_collections()) musicbrainzngs.auth(None, None) except Exception as e: current_app.logger.warning(e) abort(401, "login denied to musicbrainz") self.encryptor.store(body) token = str(uuid.uuid4()) flaskred.set(token, userobj['name'].encode('UTF-8')) return { 'token': token }, 200 def delete(self): """ See: https://swagger.kmlabz.com/?urls.primaryName=onSpot%20Backend#/backend/logoff """ try: flaskred.delete(flaskred.get(request.headers.get('Authorization')).decode('UTF-8')) flaskred.delete(request.headers.get('Authorization')) except Exception as e: current_app.logger.warning(e) abort(401, "unauthorized") return "", 204 class MeApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=onSpot%20Backend#/backend/currentUser """ def get(self): try: currusername = flaskred.get(request.headers.get('Authorization')).decode('UTF-8') except Exception as e: current_app.logger.warning(e) abort(401, "unauthorized") return {"name": currusername}, 200 class ListsApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=onSpot%20Backend#/backend/getAllLists """ encryptor = EncryptedUserRedis(ENCODED_SECRET_KEY) def get(self): try: currcreds = self.encryptor.load(flaskred.get(request.headers.get('Authorization')).decode('UTF-8')) except Exception as e: current_app.logger.warning(e) abort(401, "unauthorized") musicbrainzngs.auth(currcreds['name'], currcreds['password']) collections = musicbrainzngs.get_collections() musicbrainzngs.auth(None, None) elementlist = [] for collection in collections['collection-list']: if collection['entity-type'] == 'release': count = collection['release-count'] elif collection['entity-type'] == 'artist': count = collection['artist-count'] elif collection['entity-type'] == 'work': count = collection['work-count'] elif collection['entity-type'] == 'recording': count = collection['recording-count'] else: continue flaskred.set(collection['id'], collection['entity-type'].encode('UTF-8')) elementlist.append({"id": collection['id'], "name": collection['name'], "element_count": count, "type": collection['entity-type']}) returndict = {"count": collections['collection-count'], "ids": elementlist} return returndict, 200 class SingleListApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=onSpot%20Backend#/backend/getList """ encryptor = EncryptedUserRedis(ENCODED_SECRET_KEY) def get(self, listid: str): try: currcreds = self.encryptor.load(flaskred.get(request.headers.get('Authorization')).decode('UTF-8')) except Exception as e: current_app.logger.warning(e) abort(401, "unauthorized") try: list_type = flaskred.get(listid).decode('UTF-8') except Exception as e: current_app.logger.warning(e) abort(404, "unknown list") musicbrainzngs.auth(currcreds['name'], currcreds['password']) limit = int(request.args.get('limit', 10)) offset = int(request.args.get('offset', 0)) if list_type == 'release': currdata = musicbrainzngs.get_releases_in_collection(listid, limit, offset)['collection'] releaselist = [] for release in currdata['release-list']: releasedata = {"id": release['id'], "album": release['title']} currrelease = musicbrainzngs.get_release_by_id(release['id'], includes=['artists']) if 'artist-credit' in currrelease['release'] and currrelease['release']['artist-credit']: currartist = currrelease['release']['artist-credit'][0]['artist'] releasedata['artist'] = currartist['name'] releaselist.append(releasedata) flaskred.set(release['id'], 'release'.encode('UTF-8')) retdata = {"id": currdata['id'], "element_count": currdata['release-count'], "itemlist": releaselist} elif list_type == 'artist': currdata = musicbrainzngs.get_artists_in_collection(listid, limit, offset)['collection'] artistlist = [] for artist in currdata['artist-list']: artistlist.append({"id": artist['id'], "artist": artist['name']}) flaskred.set(artist['id'], 'artist'.encode('UTF-8')) retdata = {"id": currdata['id'], "element_count": currdata['artist-count'], "itemlist": artistlist} elif list_type == 'work': currdata = musicbrainzngs.get_works_in_collection(listid, limit, offset)['collection'] worklist = [] for work in currdata['work-list']: worklist.append({"id": work['id'], "title": work['title']}) flaskred.set(work['id'], 'recording'.encode('UTF-8')) retdata = {"id": currdata['id'], "element_count": currdata['work-count'], "itemlist": worklist} elif list_type == 'recording': currdata = musicbrainzngs.get_recordings_in_collection(listid, limit, offset)['collection'] recordinglist = [] for recording in currdata['recording-list']: currrec = {"id": recording['id'], "title": recording['title']} currrecording = musicbrainzngs.get_recording_by_id(recording['id'], includes=['artists', 'releases']) if 'artist-credit' in currrecording['recording'] and currrecording['recording']['artist-credit']: currartist = currrecording['recording']['artist-credit'][0]['artist'] currrec['artist'] = currartist['name'] if 'release-list' in currrecording['recording'] and currrecording['recording']['release-list']: currrlease = currrecording['recording']['release-list'][0] currrec['album'] = currrlease['title'] recordinglist.append(currrec) flaskred.set(recording['id'], 'recording'.encode('UTF-8')) retdata = {"id": currdata['id'], "element_count": currdata['recording-count'], "itemlist": recordinglist} else: abort(417, "wrong type of collection") musicbrainzngs.auth(None, None) retdata['type'] = list_type retdata['name'] = currdata['name'] return retdata, 200 class ItemApi(Resource): """ See: https://swagger.kmlabz.com/?urls.primaryName=onSpot%20Backend#/backend/getItem """ def get(self, itemid: str): try: item_type = flaskred.get(itemid).decode('UTF-8') except Exception as e: current_app.logger.warning(e) abort(404, "unknown item") if item_type == 'release': currrelease = musicbrainzngs.get_release_by_id(itemid, includes=['artists'])['release'] print(currrelease) retdata = {"id": itemid, "album": currrelease['title']} if 'artist-credit' in currrelease and currrelease['artist-credit']: retdata['artist'] = currrelease['artist-credit'][0]['artist']['name'] elif item_type == 'artist': currartist = musicbrainzngs.get_artist_by_id(itemid)['artist'] retdata = {"id": itemid, "artist": currartist['name']} elif item_type == 'work': currwork = musicbrainzngs.get_work_by_id(itemid)['work'] retdata = {"id": itemid, "title": currwork['title']} elif item_type == 'recording': currrecording = musicbrainzngs.get_recording_by_id(itemid, includes=['artists', 'releases'])['recording'] retdata = {"id": itemid, "title": currrecording['title']} if 'artist-credit' in currrecording and currrecording['artist-credit']: currartist = currrecording['artist-credit'][0]['artist'] retdata['artist'] = currartist['name'] if 'release-list' in currrecording and currrecording['release-list']: currrlease = currrecording['release-list'][0] retdata['album'] = currrlease['title'] else: abort(417, "wrong type of item") retdata['type'] = item_type return retdata, 200