From 862c1f17af3290bcfd17f63b1831d8d40a8813c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Mon, 26 Apr 2021 16:58:16 +0200 Subject: [PATCH] use authenticated chacha20 --- client/netwrapper.py | 46 ++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/client/netwrapper.py b/client/netwrapper.py index 4d8222d..73ad464 100644 --- a/client/netwrapper.py +++ b/client/netwrapper.py @@ -5,7 +5,7 @@ import json from base64 import b64encode, b64decode import pyDH from Crypto.Hash import SHA512 -from Crypto.Cipher import ChaCha20, PKCS1_OAEP +from Crypto.Cipher import ChaCha20_Poly1305, PKCS1_OAEP from Crypto.PublicKey.RSA import RsaKey from Crypto.Random import get_random_bytes @@ -66,11 +66,14 @@ class NetWrapper: def authenticate(self, password: str): message = f"LIN {self.username} {password}".encode('UTF-8') - cipher = ChaCha20.new(key=self.cipherkey, nonce=get_random_bytes(12)) - ciphertext = cipher.encrypt(message) - nonce = b64encode(cipher.nonce).decode('ASCII') - ct = b64encode(ciphertext).decode('ASCII') - sendjson = json.dumps({'type': 'AUT', 'source': self.network.own_addr, 'nonce': nonce, 'message': ct}).encode( + cipher = ChaCha20_Poly1305.new(key=self.cipherkey) + header = json.dumps({'type': 'AUT', 'source': self.network.own_addr}).encode('UTF-8') + cipher.update(header) + ciphertext, tag = cipher.encrypt_and_digest(message) + nonce = b64encode(cipher.nonce).decode('UTF-8') + ct = b64encode(ciphertext).decode('UTF-8') + b64tag = b64encode(tag).decode('UTF-8') + sendjson = json.dumps({'header': b64encode(header).decode('UTF-8'), 'nonce': nonce, 'message': ct, 'tag': b64tag}).encode( 'UTF-8') self.network.send_msg(self.serverAddr, sendjson) b64 = {'source': '', 'type': ''} @@ -82,9 +85,11 @@ class NetWrapper: try: retnonce = b64decode(b64['nonce']) retciphertext = b64decode(b64['message']) - retcipher = ChaCha20.new(key=self.cipherkey, nonce=retnonce) - plaintext = retcipher.decrypt(retciphertext).decode('UTF-8') - if plaintext != "OK": + retcipher = ChaCha20_Poly1305.new(key=self.cipherkey, nonce=retnonce) + retcipher.update(b64decode(b64['header'])) + retheader = json.loads(b64decode(b64['header']).decode('UTF-8')) + plaintext = retcipher.decrypt_and_verify(retciphertext,b64decode(b64['tag'])).decode('UTF-8') + if plaintext != "OK" or not (retheader['source'] == self.serverAddr and retheader['type'] == 'AUT'): raise Exception('Authentication error') except Exception as e: print(e) @@ -99,26 +104,29 @@ class NetWrapper: self.authenticate(pw) def sendMessage(self, message: bytes): - cipher = ChaCha20.new(key=self.cipherkey, nonce=get_random_bytes(12)) - ciphertext = cipher.encrypt(message) - nonce = b64encode(cipher.nonce).decode('ASCII') - ct = b64encode(ciphertext).decode('ASCII') - sendjson = json.dumps({'type': 'CMD', 'source': self.network.own_addr, 'nonce': nonce, 'message': ct}).encode( + cipher = ChaCha20_Poly1305.new(key=self.cipherkey) + header = json.dumps({'type': 'CMD', 'source': self.network.own_addr}).encode('UTF-8') + cipher.update(header) + ciphertext, tag = cipher.encrypt_and_digest(message) + nonce = b64encode(cipher.nonce).decode('UTF-8') + ct = b64encode(ciphertext).decode('UTF-8') + sendjson = json.dumps({'header': b64encode(header).decode('UTF-8'), 'nonce': nonce, 'message': ct, 'tag': b64encode(tag).decode('UTF-8')}).encode( 'UTF-8') self.network.send_msg(self.serverAddr, sendjson) def recieveMessage(self) -> bytes: - b64 = {'source': '', 'type': ''} - while not (b64['source'] == self.serverAddr and b64['type'] == 'AUT'): + try: status, msg = self.network.receive_msg(blocking=True) if not status: raise Exception('Network error during connection.') b64 = json.loads(msg.decode('UTF-8')) - try: retnonce = b64decode(b64['nonce']) retciphertext = b64decode(b64['message']) - retcipher = ChaCha20.new(key=self.cipherkey, nonce=retnonce) - plaintext = retcipher.decrypt(retciphertext) + retcipher = ChaCha20_Poly1305.new(key=self.cipherkey, nonce=retnonce) + plaintext = retcipher.decrypt_and_verify(retciphertext,b64decode(b64['tag'])) + retheader = json.loads(b64decode(b64['header']).decode('UTF-8')) + if not (retheader['source'] == self.serverAddr and retheader['type'] == 'CMD'): + return "ERROR".encode('UTF-8') return plaintext except Exception: print("Incorrect decryption")