diff --git a/src/cryptbase/__init__.py b/src/cryptbase/__init__.py index c57bfd5..5833b19 100644 --- a/src/cryptbase/__init__.py +++ b/src/cryptbase/__init__.py @@ -1 +1,60 @@ __version__ = '0.0.0' + + +from .cryptbase import AES256CTREncryptor +from .cryptbase import CryptoContainer + +try: + from django.db import models + + class EncryptedTextField(models.TextField): + description = 'Encrypted data' + + def __init__(self, *args, **kwargs): + if 'key' in kwargs: + self.__key = kwargs['key'] + del kwargs['key'] + self.encryptor = AES256CTREncryptor(bytes.fromhex(self.__key)) + super().__init__(*args, **kwargs) + + def get_db_prep_value(self, value, *args, **kwargs): + if value is None: + return None + return str(CryptoContainer(plaintext=value, encryptor=self.encryptor)) + + def from_db_value(self, value, expression, connection): + if value is None: + return value + return CryptoContainer.from_encrypted(value, self.encryptor).plaintext + + def to_python(self, value): + if isinstance(value, str): + return CryptoContainer.from_encrypted(value, self.encryptor).plaintext + return None + + def deconstruct(self): + name, path, args, kwargs = super().deconstruct() + return name, path, args, kwargs +except ImportError: + pass + + +try: + from sqlalchemy import types + + class EncryptedText(types.TypeDecorator): + impl = types.TEXT + + def __init__(self, *args, **kwargs): + self.__key = kwargs['key'] + del kwargs['key'] + self.encryptor = AES256CTREncryptor(bytes.fromhex(self.__key)) + super().__init__(*args, **kwargs) + + def process_bind_param(self, value, dialect): + return str(CryptoContainer(plaintext=value, encryptor=self.encryptor)) + + def process_result_value(self, value, dialect): + return CryptoContainer.from_encrypted(value, self.encryptor).plaintext +except ImportError: + pass diff --git a/src/cryptbase/cryptbase.py b/src/cryptbase/cryptbase.py new file mode 100644 index 0000000..5973143 --- /dev/null +++ b/src/cryptbase/cryptbase.py @@ -0,0 +1,63 @@ +import base64 +import secrets + +from cryptography.hazmat.primitives.ciphers import Cipher +from cryptography.hazmat.primitives.ciphers import algorithms +from cryptography.hazmat.primitives.ciphers import modes + + +class AES256CTREncryptor: + + def __init__(self, key): + self.__key = key + + def encrypt(self, plaintext): + iv = secrets.token_bytes(16) + cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv)) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(plaintext) + encryptor.finalize() + return ciphertext, iv + + def decrypt(self, ciphertext, iv): + cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv)) + decryptor = cipher.decryptor() + return decryptor.update(ciphertext) + decryptor.finalize() + + +class CryptoContainer: + + def __init__(self, **kwargs): + if 'encryptor' not in kwargs: + raise ValueError() + + if 'plaintext' in kwargs: + self.__plaintext = kwargs['plaintext'] + self.__ciphertext, self.__iv = kwargs['encryptor'].encrypt(self.__plaintext.encode('utf-8')) + elif 'ciphertext' in kwargs and 'iv' in kwargs: + self.__ciphertext = kwargs['ciphertext'] + self.__iv = kwargs['iv'] + self.__plaintext = kwargs['encryptor'].decrypt(self.__ciphertext, self.__iv) + else: + raise ValueError() + + @property + def plaintext(self): + return self.__plaintext.decode('utf-8') + + @property + def ciphertext(self): + return base64.b64encode(self.__ciphertext).decode('ascii') + + @property + def iv(self): + return base64.b64encode(self.__iv).decode('ascii') + + def __str__(self): + return '@'.join([self.ciphertext, self.iv]) + + @staticmethod + def from_encrypted(container_str, encryptor): + ciphertext, iv = container_str.split('@') + return CryptoContainer(ciphertext=base64.b64decode(ciphertext), + iv=base64.b64decode(iv), + encryptor=encryptor)