Docs, tests, typing
This commit is contained in:
@@ -1,38 +1,69 @@
|
|||||||
import base64
|
import base64
|
||||||
import secrets
|
import secrets
|
||||||
|
from abc import ABC
|
||||||
|
from abc import abstractmethod
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
from cryptography.hazmat.primitives.ciphers import Cipher
|
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||||
from cryptography.hazmat.primitives.ciphers import algorithms
|
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||||
from cryptography.hazmat.primitives.ciphers import modes
|
from cryptography.hazmat.primitives.ciphers import modes
|
||||||
|
|
||||||
|
|
||||||
class AES256CTREncryptor:
|
class Encryptor(ABC):
|
||||||
|
"""Abstract base class defining interface for encryptors."""
|
||||||
|
|
||||||
def __init__(self, key):
|
@abstractmethod
|
||||||
|
def encrypt(self, plaintext: bytes) -> Tuple[bytes, bytes]:
|
||||||
|
"""Transform plaintext into a pair of ciphertext and IV."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def decrypt(self, ciphertext: bytes, iv: bytes) -> bytes:
|
||||||
|
"""Transform ciphertext and IV into plaintext."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AES256CTREncryptor(Encryptor):
|
||||||
|
"""Encryptor using AES with 256-bit long keys using CTR mode."""
|
||||||
|
|
||||||
|
def __init__(self, key: bytes):
|
||||||
if not isinstance(key, bytes):
|
if not isinstance(key, bytes):
|
||||||
raise ValueError()
|
raise ValueError()
|
||||||
if not len(key) == 32:
|
if not len(key) == 32:
|
||||||
raise ValueError()
|
raise ValueError()
|
||||||
self.__key = key
|
self.__key = key
|
||||||
|
|
||||||
def encrypt(self, plaintext):
|
def encrypt(self, plaintext: bytes) -> Tuple[bytes, bytes]:
|
||||||
iv = secrets.token_bytes(16)
|
iv = secrets.token_bytes(16)
|
||||||
cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv))
|
cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv))
|
||||||
encryptor = cipher.encryptor()
|
encryptor = cipher.encryptor()
|
||||||
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
|
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
|
||||||
return ciphertext, iv
|
return ciphertext, iv
|
||||||
|
|
||||||
def decrypt(self, ciphertext, iv):
|
def decrypt(self, ciphertext: bytes, iv: bytes) -> bytes:
|
||||||
cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv))
|
cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv))
|
||||||
decryptor = cipher.decryptor()
|
decryptor = cipher.decryptor()
|
||||||
return decryptor.update(ciphertext) + decryptor.finalize()
|
return decryptor.update(ciphertext) + decryptor.finalize()
|
||||||
|
|
||||||
|
|
||||||
class CryptoContainer:
|
class CryptoContainer:
|
||||||
|
"""
|
||||||
|
CryptoContainer is providing unified interface to encryptors.
|
||||||
|
Enter the encryptor instance and either plaintext string or
|
||||||
|
ciphertext and iv bytes in constructor. Then access plaintext,
|
||||||
|
ciphertext and IV strings in properties.
|
||||||
|
Ciphertext and IV strings properties yield base64 encoded strings.
|
||||||
|
Converting this container to string yields a concatenation of
|
||||||
|
ciphertext and IV in a single string. Calling from_encrypted
|
||||||
|
static method will construct CryptoContainer from such concatenation,
|
||||||
|
allowing access to plaintext.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
if 'encryptor' not in kwargs:
|
if 'encryptor' not in kwargs:
|
||||||
raise ValueError()
|
raise ValueError()
|
||||||
|
if not isinstance(kwargs['encryptor'], Encryptor):
|
||||||
|
raise ValueError()
|
||||||
|
|
||||||
if 'plaintext' in kwargs:
|
if 'plaintext' in kwargs:
|
||||||
self.__plaintext = kwargs['plaintext']
|
self.__plaintext = kwargs['plaintext']
|
||||||
@@ -45,22 +76,30 @@ class CryptoContainer:
|
|||||||
raise ValueError()
|
raise ValueError()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def plaintext(self):
|
def plaintext(self) -> str:
|
||||||
|
"""Provide plaintext."""
|
||||||
return self.__plaintext
|
return self.__plaintext
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ciphertext(self):
|
def ciphertext(self) -> str:
|
||||||
|
"""Provide base64 encoded ciphertext as string."""
|
||||||
return base64.b64encode(self.__ciphertext).decode('ascii')
|
return base64.b64encode(self.__ciphertext).decode('ascii')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def iv(self):
|
def iv(self) -> str:
|
||||||
|
"""Provide base64 encoded IV as string."""
|
||||||
return base64.b64encode(self.__iv).decode('ascii')
|
return base64.b64encode(self.__iv).decode('ascii')
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
"""Concatenate ciphertext and IV to store in the database."""
|
||||||
return '@'.join([self.ciphertext, self.iv])
|
return '@'.join([self.ciphertext, self.iv])
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_encrypted(container_str, encryptor):
|
def from_encrypted(container_str: str, encryptor: Encryptor):
|
||||||
|
"""
|
||||||
|
Builds a CryptoContainer from a string as provided by __str__,
|
||||||
|
allowing access to plaintext.
|
||||||
|
"""
|
||||||
ciphertext, iv = container_str.split('@')
|
ciphertext, iv = container_str.split('@')
|
||||||
return CryptoContainer(ciphertext=base64.b64decode(ciphertext),
|
return CryptoContainer(ciphertext=base64.b64decode(ciphertext),
|
||||||
iv=base64.b64decode(iv),
|
iv=base64.b64decode(iv),
|
||||||
|
|||||||
@@ -94,6 +94,19 @@ def test_container_init():
|
|||||||
try:
|
try:
|
||||||
cryptbase.cryptbase.CryptoContainer(encryptor=None, plaintext=None)
|
cryptbase.cryptbase.CryptoContainer(encryptor=None, plaintext=None)
|
||||||
assert False
|
assert False
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class DummyEncryptor(cryptbase.cryptbase.Encryptor):
|
||||||
|
def encrypt(self, plaintext):
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def decrypt(self, ct, iv):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
cryptbase.cryptbase.CryptoContainer(encryptor=DummyEncryptor(), plaintext=None)
|
||||||
|
assert False
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -112,5 +125,11 @@ def test_container_init():
|
|||||||
try:
|
try:
|
||||||
cryptbase.cryptbase.CryptoContainer(encryptor=None, ciphertext=None, iv=None)
|
cryptbase.cryptbase.CryptoContainer(encryptor=None, ciphertext=None, iv=None)
|
||||||
assert False
|
assert False
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
cryptbase.cryptbase.CryptoContainer(encryptor=DummyEncryptor(), ciphertext=None, iv=None)
|
||||||
|
assert False
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass
|
pass
|
||||||
|
|||||||
Reference in New Issue
Block a user