blob: 1292778271b9bfc1ddb1aba45bc18ff06cfd64c4 [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import sys
import cffi
from cryptography.primitives import interfaces
class API(object):
"""
OpenSSL API wrapper.
"""
_modules = [
"bignum",
"crypto",
"dh",
"dsa",
"evp",
"rand",
"rsa",
"opensslv",
]
def __init__(self):
self.ffi = cffi.FFI()
includes = []
functions = []
for name in self._modules:
__import__("cryptography.bindings.openssl." + name)
module = sys.modules["cryptography.bindings.openssl." + name]
self.ffi.cdef(module.TYPES)
self.ffi.cdef(module.FUNCTIONS)
self.ffi.cdef(module.MACROS)
functions.append(module.FUNCTIONS)
includes.append(module.INCLUDES)
# We include functions here so that if we got any of their definitions
# wrong, the underlying C compiler will explode. In C you are allowed
# to re-declare a function if it has the same signature. That is:
# int foo(int);
# int foo(int);
# is legal, but the following will fail to compile:
# int foo(int);
# int foo(short);
self.lib = self.ffi.verify(
source="\n".join(includes + functions),
libraries=["crypto"],
)
self.lib.OpenSSL_add_all_algorithms()
def openssl_version_text(self):
"""
Friendly string name of linked OpenSSL.
Example: OpenSSL 1.0.1e 11 Feb 2013
"""
return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii")
def create_block_cipher_context(self, cipher, mode):
ctx = self.ffi.new("EVP_CIPHER_CTX *")
res = self.lib.EVP_CIPHER_CTX_init(ctx)
assert res != 0
ctx = self.ffi.gc(ctx, self.lib.EVP_CIPHER_CTX_cleanup)
# TODO: compute name using a better algorithm
ciphername = "{0}-{1}-{2}".format(
cipher.name, cipher.key_size, mode.name
)
evp_cipher = self.lib.EVP_get_cipherbyname(ciphername.encode("ascii"))
assert evp_cipher != self.ffi.NULL
if isinstance(mode, interfaces.ModeWithInitializationVector):
iv_nonce = mode.initialization_vector
else:
iv_nonce = self.ffi.NULL
# TODO: Sometimes this needs to be a DecryptInit, when?
res = self.lib.EVP_EncryptInit_ex(
ctx, evp_cipher, self.ffi.NULL, cipher.key, iv_nonce
)
assert res != 0
# We purposely disable padding here as it's handled higher up in the
# API.
self.lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
return ctx
def update_encrypt_context(self, ctx, plaintext):
buf = self.ffi.new("unsigned char[]", len(plaintext))
outlen = self.ffi.new("int *")
res = self.lib.EVP_EncryptUpdate(
ctx, buf, outlen, plaintext, len(plaintext)
)
assert res != 0
return self.ffi.buffer(buf)[:outlen[0]]
def finalize_encrypt_context(self, ctx):
cipher = self.lib.EVP_CIPHER_CTX_cipher(ctx)
block_size = self.lib.EVP_CIPHER_block_size(cipher)
buf = self.ffi.new("unsigned char[]", block_size)
outlen = self.ffi.new("int *")
res = self.lib.EVP_EncryptFinal_ex(ctx, buf, outlen)
assert res != 0
res = self.lib.EVP_CIPHER_CTX_cleanup(ctx)
assert res != 0
return self.ffi.buffer(buf)[:outlen[0]]
api = API()