blob: 8bcaa40ad1db6f9b007a6a0daac452a2c5a39fad [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import binascii
import os
import struct
import time
import six
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding, hashes, constant_time
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
class InvalidToken(Exception):
pass
_MAX_CLOCK_SKEW = 60
class Fernet(object):
def __init__(self, key):
key = base64.urlsafe_b64decode(key)
assert len(key) == 32
self.signing_key = key[:16]
self.encryption_key = key[16:]
self.backend = default_backend()
@classmethod
def generate_key(cls):
return base64.urlsafe_b64encode(os.urandom(32))
def encrypt(self, data):
current_time = int(time.time())
iv = os.urandom(16)
return self._encrypt_from_parts(data, current_time, iv)
def _encrypt_from_parts(self, data, current_time, iv):
if isinstance(data, six.text_type):
raise TypeError(
"Unicode-objects must be encoded before encryption"
)
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self.encryption_key), modes.CBC(iv), self.backend
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
basic_parts = (
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext
)
h = HMAC(self.signing_key, hashes.SHA256(), self.backend)
h.update(basic_parts)
hmac = h.finalize()
return base64.urlsafe_b64encode(basic_parts + hmac)
def decrypt(self, data, ttl=None):
if isinstance(data, six.text_type):
raise TypeError(
"Unicode-objects must be encoded before decryption"
)
current_time = int(time.time())
try:
data = base64.urlsafe_b64decode(data)
except (TypeError, binascii.Error):
raise InvalidToken
assert six.indexbytes(data, 0) == 0x80
timestamp = struct.unpack(">Q", data[1:9])[0]
iv = data[9:25]
ciphertext = data[25:-32]
if ttl is not None:
if timestamp + ttl < current_time:
raise InvalidToken
if current_time + _MAX_CLOCK_SKEW < timestamp:
raise InvalidToken
h = HMAC(self.signing_key, hashes.SHA256(), self.backend)
h.update(data[:-32])
hmac = h.finalize()
if not constant_time.bytes_eq(hmac, data[-32:]):
raise InvalidToken
decryptor = Cipher(
algorithms.AES(self.encryption_key), modes.CBC(iv), self.backend
).decryptor()
plaintext_padded = decryptor.update(ciphertext)
try:
plaintext_padded += decryptor.finalize()
except ValueError:
raise InvalidToken
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
unpadded = unpadder.update(plaintext_padded)
try:
unpadded += unpadder.finalize()
except ValueError:
raise InvalidToken
return unpadded