blob: 7367818a383c667ff56b84b889773e968f060c7a [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import itertools
import sys
import cffi
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import interfaces
from cryptography.hazmat.primitives.block.ciphers import (
AES, Blowfish, Camellia, CAST5, TripleDES,
)
from cryptography.hazmat.primitives.block.modes import CBC, CTR, ECB, OFB, CFB
class Backend(object):
"""
OpenSSL API wrapper.
"""
_modules = [
"asn1",
"bignum",
"bio",
"conf",
"crypto",
"dh",
"dsa",
"engine",
"err",
"evp",
"hmac",
"nid",
"opensslv",
"pem",
"pkcs7",
"pkcs12",
"rand",
"rsa",
"ssl",
"x509",
"x509name",
"x509v3",
]
ffi = None
lib = None
def __init__(self):
self._ensure_ffi_initialized()
self.ciphers = Ciphers(self)
self.hashes = Hashes(self)
self.hmacs = HMACs(self)
@classmethod
def _ensure_ffi_initialized(cls):
if cls.ffi is not None and cls.lib is not None:
return
ffi = cffi.FFI()
includes = []
functions = []
macros = []
for name in cls._modules:
module_name = "cryptography.hazmat.bindings.openssl." + name
__import__(module_name)
module = sys.modules[module_name]
ffi.cdef(module.TYPES)
macros.append(module.MACROS)
functions.append(module.FUNCTIONS)
includes.append(module.INCLUDES)
# loop over the functions & macros after declaring all the types
# so we can set interdependent types in different files and still
# have them all defined before we parse the funcs & macros
for func in functions:
ffi.cdef(func)
for macro in macros:
ffi.cdef(macro)
# We include functions here so that if we got any of their definitions
# wrong, the underlying C compiler will explode. In C you are allowed
# to re-declare a function if it has the same signature. That is:
# int foo(int);
# int foo(int);
# is legal, but the following will fail to compile:
# int foo(int);
# int foo(short);
lib = ffi.verify(
source="\n".join(includes + functions),
libraries=["crypto", "ssl"],
)
cls.ffi = ffi
cls.lib = lib
cls.lib.OpenSSL_add_all_algorithms()
cls.lib.SSL_load_error_strings()
def openssl_version_text(self):
"""
Friendly string name of linked OpenSSL.
Example: OpenSSL 1.0.1e 11 Feb 2013
"""
return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii")
class GetCipherByName(object):
def __init__(self, fmt):
self._fmt = fmt
def __call__(self, backend, cipher, mode):
cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
return backend.lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
@interfaces.register(interfaces.CipherContext)
class _CipherContext(object):
_ENCRYPT = 1
_DECRYPT = 0
def __init__(self, backend, cipher, mode, operation):
self._backend = backend
ctx = self._backend.lib.EVP_CIPHER_CTX_new()
ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_CIPHER_CTX_free)
registry = self._backend.ciphers._cipher_registry
try:
adapter = registry[type(cipher), type(mode)]
except KeyError:
raise UnsupportedAlgorithm
evp_cipher = adapter(self._backend, cipher, mode)
assert evp_cipher != self._backend.ffi.NULL
if isinstance(mode, interfaces.ModeWithInitializationVector):
iv_nonce = mode.initialization_vector
elif isinstance(mode, interfaces.ModeWithNonce):
iv_nonce = mode.nonce
else:
iv_nonce = self._backend.ffi.NULL
# begin init with cipher and operation type
res = self._backend.lib.EVP_CipherInit_ex(ctx, evp_cipher,
self._backend.ffi.NULL,
self._backend.ffi.NULL,
self._backend.ffi.NULL,
operation)
assert res != 0
# set the key length to handle variable key ciphers
res = self._backend.lib.EVP_CIPHER_CTX_set_key_length(
ctx, len(cipher.key)
)
assert res != 0
# pass key/iv
res = self._backend.lib.EVP_CipherInit_ex(ctx, self._backend.ffi.NULL,
self._backend.ffi.NULL,
cipher.key,
iv_nonce,
operation)
assert res != 0
# We purposely disable padding here as it's handled higher up in the
# API.
self._backend.lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
self._ctx = ctx
def update(self, data):
block_size = self._backend.lib.EVP_CIPHER_CTX_block_size(self._ctx)
buf = self._backend.ffi.new("unsigned char[]",
len(data) + block_size - 1)
outlen = self._backend.ffi.new("int *")
res = self._backend.lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
assert res != 0
return self._backend.ffi.buffer(buf)[:outlen[0]]
def finalize(self):
block_size = self._backend.lib.EVP_CIPHER_CTX_block_size(self._ctx)
buf = self._backend.ffi.new("unsigned char[]", block_size)
outlen = self._backend.ffi.new("int *")
res = self._backend.lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
assert res != 0
res = self._backend.lib.EVP_CIPHER_CTX_cleanup(self._ctx)
assert res == 1
return self._backend.ffi.buffer(buf)[:outlen[0]]
class Ciphers(object):
def __init__(self, backend):
super(Ciphers, self).__init__()
self._backend = backend
self._cipher_registry = {}
self._register_default_ciphers()
def supported(self, cipher, mode):
try:
adapter = self._cipher_registry[type(cipher), type(mode)]
except KeyError:
return False
evp_cipher = adapter(self._backend, cipher, mode)
return self._backend.ffi.NULL != evp_cipher
def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
if (cipher_cls, mode_cls) in self._cipher_registry:
raise ValueError("Duplicate registration for: {0} {1}".format(
cipher_cls, mode_cls)
)
self._cipher_registry[cipher_cls, mode_cls] = adapter
def _register_default_ciphers(self):
for cipher_cls, mode_cls in itertools.product(
[AES, Camellia],
[CBC, CTR, ECB, OFB, CFB],
):
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
)
for mode_cls in [CBC, CFB, OFB]:
self.register_cipher_adapter(
TripleDES,
mode_cls,
GetCipherByName("des-ede3-{mode.name}")
)
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
Blowfish,
mode_cls,
GetCipherByName("bf-{mode.name}")
)
self.register_cipher_adapter(
CAST5,
ECB,
GetCipherByName("cast5-ecb")
)
def create_encrypt_ctx(self, cipher, mode):
return _CipherContext(self._backend, cipher, mode,
_CipherContext._ENCRYPT)
def create_decrypt_ctx(self, cipher, mode):
return _CipherContext(self._backend, cipher, mode,
_CipherContext._DECRYPT)
class Hashes(object):
def __init__(self, backend):
super(Hashes, self).__init__()
self._backend = backend
def supported(self, hash_cls):
return (self._backend.ffi.NULL !=
self._backend.lib.EVP_get_digestbyname(
hash_cls.name.encode("ascii")))
def create_ctx(self, hashobject):
ctx = self._backend.lib.EVP_MD_CTX_create()
ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_MD_CTX_destroy)
evp_md = self._backend.lib.EVP_get_digestbyname(
hashobject.name.encode("ascii"))
assert evp_md != self._backend.ffi.NULL
res = self._backend.lib.EVP_DigestInit_ex(ctx, evp_md,
self._backend.ffi.NULL)
assert res != 0
return ctx
def update_ctx(self, ctx, data):
res = self._backend.lib.EVP_DigestUpdate(ctx, data, len(data))
assert res != 0
def finalize_ctx(self, ctx, digest_size):
buf = self._backend.ffi.new("unsigned char[]", digest_size)
res = self._backend.lib.EVP_DigestFinal_ex(ctx, buf,
self._backend.ffi.NULL)
assert res != 0
res = self._backend.lib.EVP_MD_CTX_cleanup(ctx)
assert res == 1
return self._backend.ffi.buffer(buf)[:digest_size]
def copy_ctx(self, ctx):
copied_ctx = self._backend.lib.EVP_MD_CTX_create()
copied_ctx = self._backend.ffi.gc(copied_ctx,
self._backend.lib.EVP_MD_CTX_destroy)
res = self._backend.lib.EVP_MD_CTX_copy_ex(copied_ctx, ctx)
assert res != 0
return copied_ctx
class HMACs(object):
def __init__(self, backend):
super(HMACs, self).__init__()
self._backend = backend
def create_ctx(self, key, hash_cls):
ctx = self._backend.ffi.new("HMAC_CTX *")
self._backend.lib.HMAC_CTX_init(ctx)
ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup)
evp_md = self._backend.lib.EVP_get_digestbyname(
hash_cls.name.encode('ascii'))
assert evp_md != self._backend.ffi.NULL
res = self._backend.lib.HMAC_Init_ex(ctx, key, len(key), evp_md,
self._backend.ffi.NULL)
assert res != 0
return ctx
def update_ctx(self, ctx, data):
res = self._backend.lib.HMAC_Update(ctx, data, len(data))
assert res != 0
def finalize_ctx(self, ctx, digest_size):
buf = self._backend.ffi.new("unsigned char[]", digest_size)
buflen = self._backend.ffi.new("unsigned int *", digest_size)
res = self._backend.lib.HMAC_Final(ctx, buf, buflen)
assert res != 0
self._backend.lib.HMAC_CTX_cleanup(ctx)
return self._backend.ffi.buffer(buf)[:digest_size]
def copy_ctx(self, ctx):
copied_ctx = self._backend.ffi.new("HMAC_CTX *")
self._backend.lib.HMAC_CTX_init(copied_ctx)
copied_ctx = self._backend.ffi.gc(copied_ctx,
self._backend.lib.HMAC_CTX_cleanup)
res = self._backend.lib.HMAC_CTX_copy(copied_ctx, ctx)
assert res != 0
return copied_ctx
backend = Backend()