#!/usr/bin/env python3 """ AES Encryption methods """ __author__ = '@tormakris' __copyright__ = "Copyright 2020, onSpot Team" __module_name__ = "aes_encrypt" __version__text__ = "1" import base64 import json from Crypto.Cipher import AES from fred import flaskred from schemas import UserSchema class AESCrypto: def __init__(self, encoded_secret_key: str, padding_character: bytes = 'a'.encode('UFT-8')): self.padding_character = padding_character self.encoded_secret_key = encoded_secret_key def encrypt_message(self, private_msg) -> bytes: secret_key = base64.b64decode(self.encoded_secret_key) cipher = AES.new(secret_key, AES.MODE_EAX) padded_private_msg = private_msg + (self.padding_character.decode('UFT-8') * ((16 - len(private_msg)) % 16)) encrypted_msg = cipher.encrypt(padded_private_msg) encoded_encrypted_msg = base64.b64encode(encrypted_msg) return encoded_encrypted_msg def decrypt_message(self, encoded_encrypted_msg) -> str: secret_key = base64.b64decode(self.encoded_secret_key) encrypted_msg = base64.b64decode(encoded_encrypted_msg) cipher = AES.new(secret_key, AES.MODE_EAX) decrypted_msg = cipher.decrypt(encrypted_msg) unpadded_private_msg = decrypted_msg.rstrip(self.padding_character) return unpadded_private_msg.decode('UTF-8') class EncryptedUserRedis: def __init__(self, encoded_secret_key: str): self.aes = AESCrypto(encoded_secret_key) self.userschema = UserSchema(many=False) def store(self, user: UserSchema) -> None: plaindict = self.userschema.dump(user) plaindict['password'] = self.aes.encrypt_message(user['password']) flaskred.set(user['name'], json.dumps(plaindict).encode('UTF-8')) def load(self, username: str) -> UserSchema: encryptedstr = flaskred.get(username).decode('UTF-8') encrypteddict = json.loads(encryptedstr) user = UserSchema(name=encrypteddict['name'], password=self.aes.decrypt_message(encrypteddict['password'])) return user