blob: 070794151e82509b8c6c5c5b332373d5d36084cb [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cffi
class OpenSSLError(Exception):
def __init__(self, api):
e = api._lib.ERR_get_error()
if e == 0:
raise SystemError("Tried to create an OpenSSLError when there was "
"None")
msg = api._ffi.new("char[]", 120)
api._lib.ERR_error_string(e, msg)
super(OpenSSLError, self).__init__(api._ffi.string(msg))
class API(object):
"""
OpenSSL API wrapper.
"""
def __init__(self):
ffi = cffi.FFI()
self._populate_ffi(ffi)
self._ffi = ffi
self._lib = ffi.verify("""
#include <openssl/evp.h>
""")
self._lib.OpenSSL_add_all_algorithms()
self._lib.ERR_load_crypto_strings()
def _populate_ffi(self, ffi):
ffi.cdef("""
typedef struct {
...;
} EVP_CIPHER_CTX;
typedef ... EVP_CIPHER;
typedef ... ENGINE;
void OpenSSL_add_all_algorithms();
void ERR_load_crypto_strings();
const EVP_CIPHER *EVP_get_cipherbyname(const char *);
int EVP_EncryptInit_ex(EVP_CIPHER_CTX *, const EVP_CIPHER *,
ENGINE *, unsigned char *, unsigned char *);
int EVP_CIPHER_CTX_set_padding(EVP_CIPHER_CTX *, int);
int EVP_EncryptUpdate(EVP_CIPHER_CTX *, unsigned char *, int *,
unsigned char *, int);
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *, unsigned char *, int *);
int EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *);
unsigned long ERR_get_error();
""")
def create_block_cipher_context(self, cipher, mode):
ctx = self._ffi.new("EVP_CIPHER_CTX *")
ctx = self._ffi.gc(ctx, self._lib.EVP_CIPHER_CTX_cleanup)
# TODO: compute name using a better algorithm
ciphername = "{0}-{1}-{2}".format(
cipher.name, len(cipher.key) * 8, mode.name
)
evp_cipher = self._lib.EVP_get_cipherbyname(ciphername.encode("ascii"))
if evp_cipher == self._ffi.NULL:
raise OpenSSLError(self)
# TODO: only use the key and initialization_vector as needed. Sometimes
# this needs to be a DecryptInit, when?
res = self._lib.EVP_EncryptInit_ex(
ctx, evp_cipher, self._ffi.NULL, cipher.key,
mode.initialization_vector
)
if res == 0:
raise OpenSSLError(self)
# TODO: this should depend on mode.padding
self._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
return ctx
def update_encrypt_context(self, ctx, plaintext):
buf = self._ffi.new("unsigned char[]", len(plaintext))
outlen = self._ffi.new("int *")
res = self._lib.EVP_EncryptUpdate(
ctx, buf, outlen, plaintext, len(plaintext)
)
if res == 0:
raise OpenSSLError(self)
return self._ffi.buffer(buf)[:outlen[0]]
def finalize_encrypt_context(self, ctx):
# TODO: use real block size
buf = self._ffi.new("unsigned char[]", 16)
outlen = self._ffi.new("int *")
res = self._lib.EVP_EncryptFinal_ex(ctx, buf, outlen)
if res == 0:
raise OpenSSLError(self)
res = self._lib.EVP_CIPHER_CTX_cleanup(ctx)
if res == 0:
raise OpenSSLError(self)
return self._ffi.buffer(buf)[:outlen[0]]
api = API()