Base library code
This commit is contained in:
@@ -1 +1,60 @@
|
|||||||
__version__ = '0.0.0'
|
__version__ = '0.0.0'
|
||||||
|
|
||||||
|
|
||||||
|
from .cryptbase import AES256CTREncryptor
|
||||||
|
from .cryptbase import CryptoContainer
|
||||||
|
|
||||||
|
try:
|
||||||
|
from django.db import models
|
||||||
|
|
||||||
|
class EncryptedTextField(models.TextField):
|
||||||
|
description = 'Encrypted data'
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
if 'key' in kwargs:
|
||||||
|
self.__key = kwargs['key']
|
||||||
|
del kwargs['key']
|
||||||
|
self.encryptor = AES256CTREncryptor(bytes.fromhex(self.__key))
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def get_db_prep_value(self, value, *args, **kwargs):
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return str(CryptoContainer(plaintext=value, encryptor=self.encryptor))
|
||||||
|
|
||||||
|
def from_db_value(self, value, expression, connection):
|
||||||
|
if value is None:
|
||||||
|
return value
|
||||||
|
return CryptoContainer.from_encrypted(value, self.encryptor).plaintext
|
||||||
|
|
||||||
|
def to_python(self, value):
|
||||||
|
if isinstance(value, str):
|
||||||
|
return CryptoContainer.from_encrypted(value, self.encryptor).plaintext
|
||||||
|
return None
|
||||||
|
|
||||||
|
def deconstruct(self):
|
||||||
|
name, path, args, kwargs = super().deconstruct()
|
||||||
|
return name, path, args, kwargs
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from sqlalchemy import types
|
||||||
|
|
||||||
|
class EncryptedText(types.TypeDecorator):
|
||||||
|
impl = types.TEXT
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.__key = kwargs['key']
|
||||||
|
del kwargs['key']
|
||||||
|
self.encryptor = AES256CTREncryptor(bytes.fromhex(self.__key))
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def process_bind_param(self, value, dialect):
|
||||||
|
return str(CryptoContainer(plaintext=value, encryptor=self.encryptor))
|
||||||
|
|
||||||
|
def process_result_value(self, value, dialect):
|
||||||
|
return CryptoContainer.from_encrypted(value, self.encryptor).plaintext
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|||||||
63
src/cryptbase/cryptbase.py
Normal file
63
src/cryptbase/cryptbase.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
import base64
|
||||||
|
import secrets
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||||
|
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||||
|
from cryptography.hazmat.primitives.ciphers import modes
|
||||||
|
|
||||||
|
|
||||||
|
class AES256CTREncryptor:
|
||||||
|
|
||||||
|
def __init__(self, key):
|
||||||
|
self.__key = key
|
||||||
|
|
||||||
|
def encrypt(self, plaintext):
|
||||||
|
iv = secrets.token_bytes(16)
|
||||||
|
cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv))
|
||||||
|
encryptor = cipher.encryptor()
|
||||||
|
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
|
||||||
|
return ciphertext, iv
|
||||||
|
|
||||||
|
def decrypt(self, ciphertext, iv):
|
||||||
|
cipher = Cipher(algorithms.AES(self.__key), modes.CTR(iv))
|
||||||
|
decryptor = cipher.decryptor()
|
||||||
|
return decryptor.update(ciphertext) + decryptor.finalize()
|
||||||
|
|
||||||
|
|
||||||
|
class CryptoContainer:
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
if 'encryptor' not in kwargs:
|
||||||
|
raise ValueError()
|
||||||
|
|
||||||
|
if 'plaintext' in kwargs:
|
||||||
|
self.__plaintext = kwargs['plaintext']
|
||||||
|
self.__ciphertext, self.__iv = kwargs['encryptor'].encrypt(self.__plaintext.encode('utf-8'))
|
||||||
|
elif 'ciphertext' in kwargs and 'iv' in kwargs:
|
||||||
|
self.__ciphertext = kwargs['ciphertext']
|
||||||
|
self.__iv = kwargs['iv']
|
||||||
|
self.__plaintext = kwargs['encryptor'].decrypt(self.__ciphertext, self.__iv)
|
||||||
|
else:
|
||||||
|
raise ValueError()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def plaintext(self):
|
||||||
|
return self.__plaintext.decode('utf-8')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ciphertext(self):
|
||||||
|
return base64.b64encode(self.__ciphertext).decode('ascii')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def iv(self):
|
||||||
|
return base64.b64encode(self.__iv).decode('ascii')
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return '@'.join([self.ciphertext, self.iv])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_encrypted(container_str, encryptor):
|
||||||
|
ciphertext, iv = container_str.split('@')
|
||||||
|
return CryptoContainer(ciphertext=base64.b64decode(ciphertext),
|
||||||
|
iv=base64.b64decode(iv),
|
||||||
|
encryptor=encryptor)
|
||||||
Reference in New Issue
Block a user