blob: 1a4db380a6adb9c29342d2d984d7411b2af39863 [file] [log] [blame]
from time import time
from cryptography.hazmat.backends.openssl import backend
_ffi = backend.ffi
_lib = backend.lib
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
# TODO This was an API mistake. OpenSSL has no such constant.
FILETYPE_TEXT = 2 ** 16 - 1
TYPE_RSA = _lib.EVP_PKEY_RSA
TYPE_DSA = _lib.EVP_PKEY_DSA
def _bio_to_string(bio):
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
"""
result_buffer = _ffi.new('char**')
buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
return _ffi.buffer(result_buffer[0], buffer_length)[:]
def _new_mem_buf(buffer=None):
if buffer is None:
bio = _lib.BIO_new(_lib.BIO_s_mem())
free = _lib.BIO_free
else:
data = _ffi.new("char[]", buffer)
bio = _lib.BIO_new_mem_buf(data, len(buffer))
# Keep the memory alive as long as the bio is alive!
def free(bio, ref=data):
return _lib.BIO_free(bio)
if bio == _ffi.NULL:
1/0
bio = _ffi.gc(bio, free)
return bio
def _set_asn1_time(boundary, when):
if not isinstance(when, bytes):
raise TypeError("when must be a byte string")
set_result = _lib.ASN1_GENERALIZEDTIME_set_string(
_ffi.cast('ASN1_GENERALIZEDTIME*', boundary), when)
if set_result == 0:
dummy = _ffi.gc(_lib.ASN1_STRING_new(), _lib.ASN1_STRING_free)
_lib.ASN1_STRING_set(dummy, when, len(when))
check_result = _lib.ASN1_GENERALIZEDTIME_check(
_ffi.cast('ASN1_GENERALIZEDTIME*', dummy))
if not check_result:
raise ValueError("Invalid string")
else:
# TODO No tests for this case
raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
def _get_asn1_time(timestamp):
string_timestamp = _ffi.cast('ASN1_STRING*', timestamp)
if _lib.ASN1_STRING_length(string_timestamp) == 0:
return None
elif _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME:
return _ffi.string(_lib.ASN1_STRING_data(string_timestamp))
else:
generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
_lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
if generalized_timestamp[0] == _ffi.NULL:
1/0
else:
string_timestamp = _ffi.cast(
"ASN1_STRING*", generalized_timestamp[0])
string_data = _lib.ASN1_STRING_data(string_timestamp)
string_result = _ffi.string(string_data)
_lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
return string_result
class Error(Exception):
pass
def _raise_current_error(exceptionType=Error):
errors = []
while True:
error = _lib.ERR_get_error()
if error == 0:
break
errors.append((
_ffi.string(_lib.ERR_lib_error_string(error)),
_ffi.string(_lib.ERR_func_error_string(error)),
_ffi.string(_lib.ERR_reason_error_string(error))))
raise exceptionType(errors)
_exception_from_error_queue = _raise_current_error
class PKey(object):
_only_public = False
_initialized = True
def __init__(self):
pkey = _lib.EVP_PKEY_new()
self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
self._initialized = False
def generate_key(self, type, bits):
"""
Generate a key of a given type, with a given number of a bits
:param type: The key type (TYPE_RSA or TYPE_DSA)
:param bits: The number of bits
:return: None
"""
if not isinstance(type, int):
raise TypeError("type must be an integer")
if not isinstance(bits, int):
raise TypeError("bits must be an integer")
# TODO Check error return
exponent = _lib.BN_new()
exponent = _ffi.gc(exponent, _lib.BN_free)
_lib.BN_set_word(exponent, _lib.RSA_F4)
if type == TYPE_RSA:
if bits <= 0:
raise ValueError("Invalid number of bits")
rsa = _lib.RSA_new()
result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
if result == -1:
1/0
result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
if not result:
1/0
elif type == TYPE_DSA:
dsa = _lib.DSA_generate_parameters(
bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL)
if dsa == _ffi.NULL:
1/0
if not _lib.DSA_generate_key(dsa):
1/0
if not _lib.EVP_PKEY_assign_DSA(self._pkey, dsa):
1/0
else:
raise Error("No such key type")
self._initialized = True
def check(self):
"""
Check the consistency of an RSA private key.
:return: True if key is consistent.
:raise Error: if the key is inconsistent.
:raise TypeError: if the key is of a type which cannot be checked.
Only RSA keys can currently be checked.
"""
if self._only_public:
raise TypeError("public key only")
if _lib.EVP_PKEY_type(self._pkey.type) != _lib.EVP_PKEY_RSA:
raise TypeError("key type unsupported")
rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
rsa = _ffi.gc(rsa, _lib.RSA_free)
result = _lib.RSA_check_key(rsa)
if result:
return True
_raise_current_error()
def type(self):
"""
Returns the type of the key
:return: The type of the key.
"""
return self._pkey.type
def bits(self):
"""
Returns the number of bits of the key
:return: The number of bits of the key.
"""
return _lib.EVP_PKEY_bits(self._pkey)
PKeyType = PKey
class X509Name(object):
def __init__(self, name):
"""
Create a new X509Name, copying the given X509Name instance.
:param name: An X509Name object to copy
"""
name = _lib.X509_NAME_dup(name._name)
self._name = _ffi.gc(name, _lib.X509_NAME_free)
def __setattr__(self, name, value):
if name.startswith('_'):
return super(X509Name, self).__setattr__(name, value)
# Note: we really do not want str subclasses here, so we do not use
# isinstance.
if type(name) is not str:
raise TypeError("attribute name must be string, not '%.200s'" % (
type(value).__name__,))
nid = _lib.OBJ_txt2nid(name)
if nid == _lib.NID_undef:
try:
_raise_current_error()
except Error:
pass
raise AttributeError("No such attribute")
# If there's an old entry for this NID, remove it
for i in range(_lib.X509_NAME_entry_count(self._name)):
ent = _lib.X509_NAME_get_entry(self._name, i)
ent_obj = _lib.X509_NAME_ENTRY_get_object(ent)
ent_nid = _lib.OBJ_obj2nid(ent_obj)
if nid == ent_nid:
ent = _lib.X509_NAME_delete_entry(self._name, i)
_lib.X509_NAME_ENTRY_free(ent)
break
if isinstance(value, unicode):
value = value.encode('utf-8')
add_result = _lib.X509_NAME_add_entry_by_NID(
self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0)
if not add_result:
# TODO Untested
1/0
def __getattr__(self, name):
"""
Find attribute. An X509Name object has the following attributes:
countryName (alias C), stateOrProvince (alias ST), locality (alias L),
organization (alias O), organizationalUnit (alias OU), commonName (alias
CN) and more...
"""
nid = _lib.OBJ_txt2nid(name)
if nid == _lib.NID_undef:
# This is a bit weird. OBJ_txt2nid indicated failure, but it seems
# a lower level function, a2d_ASN1_OBJECT, also feels the need to
# push something onto the error queue. If we don't clean that up
# now, someone else will bump into it later and be quite confused.
# See lp#314814.
try:
_raise_current_error()
except Error:
pass
return super(X509Name, self).__getattr__(name)
entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
if entry_index == -1:
return None
entry = _lib.X509_NAME_get_entry(self._name, entry_index)
data = _lib.X509_NAME_ENTRY_get_data(entry)
result_buffer = _ffi.new("unsigned char**")
data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
if data_length < 0:
1/0
try:
result = _ffi.buffer(result_buffer[0], data_length)[:].decode('utf-8')
finally:
# XXX untested
_lib.OPENSSL_free(result_buffer[0])
return result
def __cmp__(self, other):
if not isinstance(other, X509Name):
return NotImplemented
result = _lib.X509_NAME_cmp(self._name, other._name)
# TODO result == -2 is an error case that maybe should be checked for
return result
def __repr__(self):
"""
String representation of an X509Name
"""
result_buffer = _ffi.new("char[]", 512);
format_result = _lib.X509_NAME_oneline(
self._name, result_buffer, len(result_buffer))
if format_result == _ffi.NULL:
1/0
return "<X509Name object '%s'>" % (_ffi.string(result_buffer),)
def hash(self):
"""
Return the hash value of this name
:return: None
"""
return _lib.X509_NAME_hash(self._name)
def der(self):
"""
Return the DER encoding of this name
:return: A :py:class:`bytes` instance giving the DER encoded form of
this name.
"""
result_buffer = _ffi.new('unsigned char**')
encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
if encode_result < 0:
1/0
string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
_lib.OPENSSL_free(result_buffer[0])
return string_result
def get_components(self):
"""
Returns the split-up components of this name.
:return: List of tuples (name, value).
"""
result = []
for i in range(_lib.X509_NAME_entry_count(self._name)):
ent = _lib.X509_NAME_get_entry(self._name, i)
fname = _lib.X509_NAME_ENTRY_get_object(ent)
fval = _lib.X509_NAME_ENTRY_get_data(ent)
nid = _lib.OBJ_obj2nid(fname)
name = _lib.OBJ_nid2sn(nid)
result.append((
_ffi.string(name),
_ffi.string(
_lib.ASN1_STRING_data(fval),
_lib.ASN1_STRING_length(fval))))
return result
X509NameType = X509Name
class X509Extension(object):
def __init__(self, type_name, critical, value, subject=None, issuer=None):
"""
:param typename: The name of the extension to create.
:type typename: :py:data:`str`
:param critical: A flag indicating whether this is a critical extension.
:param value: The value of the extension.
:type value: :py:data:`str`
:param subject: Optional X509 cert to use as subject.
:type subject: :py:class:`X509`
:param issuer: Optional X509 cert to use as issuer.
:type issuer: :py:class:`X509`
:return: The X509Extension object
"""
ctx = _ffi.new("X509V3_CTX*")
# A context is necessary for any extension which uses the r2i conversion
# method. That is, X509V3_EXT_nconf may segfault if passed a NULL ctx.
# Start off by initializing most of the fields to NULL.
_lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0)
# We have no configuration database - but perhaps we should (some
# extensions may require it).
_lib.X509V3_set_ctx_nodb(ctx)
# Initialize the subject and issuer, if appropriate. ctx is a local,
# and as far as I can tell none of the X509V3_* APIs invoked here steal
# any references, so no need to mess with reference counts or duplicates.
if issuer is not None:
if not isinstance(issuer, X509):
raise TypeError("issuer must be an X509 instance")
ctx.issuer_cert = issuer._x509
if subject is not None:
if not isinstance(subject, X509):
raise TypeError("subject must be an X509 instance")
ctx.subject_cert = subject._x509
if critical:
# There are other OpenSSL APIs which would let us pass in critical
# separately, but they're harder to use, and since value is already
# a pile of crappy junk smuggling a ton of utterly important
# structured data, what's the point of trying to avoid nasty stuff
# with strings? (However, X509V3_EXT_i2d in particular seems like it
# would be a better API to invoke. I do not know where to get the
# ext_struc it desires for its last parameter, though.)
value = "critical," + value
extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
if extension == _ffi.NULL:
_raise_current_error()
self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
@property
def _nid(self):
return _lib.OBJ_obj2nid(self._extension.object)
_prefixes = {
_lib.GEN_EMAIL: b"email",
_lib.GEN_DNS: b"DNS",
_lib.GEN_URI: b"URI",
}
def _subjectAltNameString(self):
method = _lib.X509V3_EXT_get(self._extension)
if method == _ffi.NULL:
1/0
payload = self._extension.value.data
length = self._extension.value.length
payloadptr = _ffi.new("unsigned char**")
payloadptr[0] = payload
if method.it != _ffi.NULL:
ptr = _lib.ASN1_ITEM_ptr(method.it)
data = _lib.ASN1_item_d2i(_ffi.NULL, payloadptr, length, ptr)
names = _ffi.cast("GENERAL_NAMES*", data)
else:
names = _ffi.cast(
"GENERAL_NAMES*",
method.d2i(_ffi.NULL, payloadptr, length))
parts = []
for i in range(_lib.sk_GENERAL_NAME_num(names)):
name = _lib.sk_GENERAL_NAME_value(names, i)
try:
label = self._prefixes[name.type]
except KeyError:
bio = _new_mem_buf()
_lib.GENERAL_NAME_print(bio, name)
parts.append(_bio_to_string(bio))
else:
value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
parts.append(label + b":" + value)
return b", ".join(parts)
def __str__(self):
"""
:return: a nice text representation of the extension
"""
if _lib.NID_subject_alt_name == self._nid:
return self._subjectAltNameString()
bio = _new_mem_buf()
print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
if not print_result:
1/0
return _bio_to_string(bio)
def get_critical(self):
"""
Returns the critical field of the X509Extension
:return: The critical field.
"""
return _lib.X509_EXTENSION_get_critical(self._extension)
def get_short_name(self):
"""
Returns the short version of the type name of the X509Extension
:return: The short type name.
"""
obj = _lib.X509_EXTENSION_get_object(self._extension)
nid = _lib.OBJ_obj2nid(obj)
return _ffi.string(_lib.OBJ_nid2sn(nid))
def get_data(self):
"""
Returns the data of the X509Extension
:return: A :py:data:`str` giving the X509Extension's ASN.1 encoded data.
"""
octet_result = _lib.X509_EXTENSION_get_data(self._extension)
string_result = _ffi.cast('ASN1_STRING*', octet_result)
char_result = _lib.ASN1_STRING_data(string_result)
result_length = _lib.ASN1_STRING_length(string_result)
return _ffi.buffer(char_result, result_length)[:]
X509ExtensionType = X509Extension
class X509Req(object):
def __init__(self):
req = _lib.X509_REQ_new()
self._req = _ffi.gc(req, _lib.X509_REQ_free)
def set_pubkey(self, pkey):
"""
Set the public key of the certificate request
:param pkey: The public key to use
:return: None
"""
set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
if not set_result:
1/0
def get_pubkey(self):
"""
Get the public key from the certificate request
:return: The public key
"""
pkey = PKey.__new__(PKey)
pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
if pkey._pkey == _ffi.NULL:
1/0
pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
pkey._only_public = True
return pkey
def set_version(self, version):
"""
Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
request.
:param version: The version number
:return: None
"""
set_result = _lib.X509_REQ_set_version(self._req, version)
if not set_result:
_raise_current_error()
def get_version(self):
"""
Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
request.
:return: an integer giving the value of the version subfield
"""
return _lib.X509_REQ_get_version(self._req)
def get_subject(self):
"""
Create an X509Name object for the subject of the certificate request
:return: An X509Name object
"""
name = X509Name.__new__(X509Name)
name._name = _lib.X509_REQ_get_subject_name(self._req)
if name._name == _ffi.NULL:
1/0
# The name is owned by the X509Req structure. As long as the X509Name
# Python object is alive, keep the X509Req Python object alive.
name._owner = self
return name
def add_extensions(self, extensions):
"""
Add extensions to the request.
:param extensions: a sequence of X509Extension objects
:return: None
"""
stack = _lib.sk_X509_EXTENSION_new_null()
if stack == _ffi.NULL:
1/0
stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free)
for ext in extensions:
if not isinstance(ext, X509Extension):
raise ValueError("One of the elements is not an X509Extension")
# TODO push can fail (here and elsewhere)
_lib.sk_X509_EXTENSION_push(stack, ext._extension)
add_result = _lib.X509_REQ_add_extensions(self._req, stack)
if not add_result:
1/0
def sign(self, pkey, digest):
"""
Sign the certificate request using the supplied key and digest
:param pkey: The key to sign with
:param digest: The message digest to use
:return: None
"""
if pkey._only_public:
raise ValueError("Key has only public part")
if not pkey._initialized:
raise ValueError("Key is uninitialized")
digest_obj = _lib.EVP_get_digestbyname(digest)
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
if not sign_result:
1/0
def verify(self, pkey):
"""
Verifies a certificate request using the supplied public key
:param key: a public key
:return: True if the signature is correct.
:raise OpenSSL.crypto.Error: If the signature is invalid or there is a
problem verifying the signature.
"""
if not isinstance(pkey, PKey):
raise TypeError("pkey must be a PKey instance")
result = _lib.X509_REQ_verify(self._req, pkey._pkey)
if result <= 0:
_raise_current_error(Error)
return result
X509ReqType = X509Req
class X509(object):
def __init__(self):
# TODO Allocation failure? And why not __new__ instead of __init__?
x509 = _lib.X509_new()
self._x509 = _ffi.gc(x509, _lib.X509_free)
def set_version(self, version):
"""
Set version number of the certificate
:param version: The version number
:type version: :py:class:`int`
:return: None
"""
if not isinstance(version, int):
raise TypeError("version must be an integer")
_lib.X509_set_version(self._x509, version)
def get_version(self):
"""
Return version number of the certificate
:return: Version number as a Python integer
"""
return _lib.X509_get_version(self._x509)
def get_pubkey(self):
"""
Get the public key of the certificate
:return: The public key
"""
pkey = PKey.__new__(PKey)
pkey._pkey = _lib.X509_get_pubkey(self._x509)
if pkey._pkey == _ffi.NULL:
_raise_current_error()
pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
pkey._only_public = True
return pkey
def set_pubkey(self, pkey):
"""
Set the public key of the certificate
:param pkey: The public key
:return: None
"""
if not isinstance(pkey, PKey):
raise TypeError("pkey must be a PKey instance")
set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
if not set_result:
_raise_current_error()
def sign(self, pkey, digest):
"""
Sign the certificate using the supplied key and digest
:param pkey: The key to sign with
:param digest: The message digest to use
:return: None
"""
if not isinstance(pkey, PKey):
raise TypeError("pkey must be a PKey instance")
if pkey._only_public:
raise ValueError("Key only has public part")
if not pkey._initialized:
raise ValueError("Key is uninitialized")
evp_md = _lib.EVP_get_digestbyname(digest)
if evp_md == _ffi.NULL:
raise ValueError("No such digest method")
sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
if not sign_result:
_raise_current_error()
def get_signature_algorithm(self):
"""
Retrieve the signature algorithm used in the certificate
:return: A byte string giving the name of the signature algorithm used in
the certificate.
:raise ValueError: If the signature algorithm is undefined.
"""
alg = self._x509.cert_info.signature.algorithm
nid = _lib.OBJ_obj2nid(alg)
if nid == _lib.NID_undef:
raise ValueError("Undefined signature algorithm")
return _ffi.string(_lib.OBJ_nid2ln(nid))
def digest(self, digest_name):
"""
Return the digest of the X509 object.
:param digest_name: The name of the digest algorithm to use.
:type digest_name: :py:class:`bytes`
:return: The digest of the object
"""
digest = _lib.EVP_get_digestbyname(digest_name)
if digest == _ffi.NULL:
raise ValueError("No such digest method")
result_buffer = _ffi.new("char[]", _lib.EVP_MAX_MD_SIZE)
result_length = _ffi.new("unsigned int[]", 1)
result_length[0] = len(result_buffer)
digest_result = _lib.X509_digest(
self._x509, digest, result_buffer, result_length)
if not digest_result:
1/0
return ':'.join([
ch.encode('hex').upper() for ch
in _ffi.buffer(result_buffer, result_length[0])])
def subject_name_hash(self):
"""
Return the hash of the X509 subject.
:return: The hash of the subject.
"""
return _lib.X509_subject_name_hash(self._x509)
def set_serial_number(self, serial):
"""
Set serial number of the certificate
:param serial: The serial number
:type serial: :py:class:`int`
:return: None
"""
if not isinstance(serial, (int, long)):
raise TypeError("serial must be an integer")
hex_serial = hex(serial)[2:]
if not isinstance(hex_serial, bytes):
hex_serial = hex_serial.encode('ascii')
bignum_serial = _ffi.new("BIGNUM**")
# BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
# it. If bignum is still NULL after this call, then the return value is
# actually the result. I hope. -exarkun
small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial)
if bignum_serial[0] == _ffi.NULL:
set_result = _lib.ASN1_INTEGER_set(
_lib.X509_get_serialNumber(self._x509), small_serial)
if set_result:
# TODO Not tested
_raise_current_error()
else:
asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL)
_lib.BN_free(bignum_serial[0])
if asn1_serial == _ffi.NULL:
# TODO Not tested
_raise_current_error()
asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free)
set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
if not set_result:
# TODO Not tested
_raise_current_error()
def get_serial_number(self):
"""
Return serial number of the certificate
:return: Serial number as a Python integer
"""
asn1_serial = _lib.X509_get_serialNumber(self._x509)
bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL)
try:
hex_serial = _lib.BN_bn2hex(bignum_serial)
try:
hexstring_serial = _ffi.string(hex_serial)
serial = int(hexstring_serial, 16)
return serial
finally:
_lib.OPENSSL_free(hex_serial)
finally:
_lib.BN_free(bignum_serial)
def gmtime_adj_notAfter(self, amount):
"""
Adjust the time stamp for when the certificate stops being valid
:param amount: The number of seconds by which to adjust the ending
validity time.
:type amount: :py:class:`int`
:return: None
"""
if not isinstance(amount, int):
raise TypeError("amount must be an integer")
notAfter = _lib.X509_get_notAfter(self._x509)
_lib.X509_gmtime_adj(notAfter, amount)
def gmtime_adj_notBefore(self, amount):
"""
Change the timestamp for when the certificate starts being valid to the current
time plus an offset.
:param amount: The number of seconds by which to adjust the starting validity
time.
:return: None
"""
if not isinstance(amount, int):
raise TypeError("amount must be an integer")
notBefore = _lib.X509_get_notBefore(self._x509)
_lib.X509_gmtime_adj(notBefore, amount)
def has_expired(self):
"""
Check whether the certificate has expired.
:return: True if the certificate has expired, false otherwise
"""
now = int(time())
notAfter = _lib.X509_get_notAfter(self._x509)
return _lib.ASN1_UTCTIME_cmp_time_t(
_ffi.cast('ASN1_UTCTIME*', notAfter), now) < 0
def _get_boundary_time(self, which):
return _get_asn1_time(which(self._x509))
def get_notBefore(self):
"""
Retrieve the time stamp for when the certificate starts being valid
:return: A string giving the timestamp, in the format::
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
or None if there is no value set.
"""
return self._get_boundary_time(_lib.X509_get_notBefore)
def _set_boundary_time(self, which, when):
return _set_asn1_time(which(self._x509), when)
def set_notBefore(self, when):
"""
Set the time stamp for when the certificate starts being valid
:param when: A string giving the timestamp, in the format:
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
:type when: :py:class:`bytes`
:return: None
"""
return self._set_boundary_time(_lib.X509_get_notBefore, when)
def get_notAfter(self):
"""
Retrieve the time stamp for when the certificate stops being valid
:return: A string giving the timestamp, in the format::
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
or None if there is no value set.
"""
return self._get_boundary_time(_lib.X509_get_notAfter)
def set_notAfter(self, when):
"""
Set the time stamp for when the certificate stops being valid
:param when: A string giving the timestamp, in the format:
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
:type when: :py:class:`bytes`
:return: None
"""
return self._set_boundary_time(_lib.X509_get_notAfter, when)
def _get_name(self, which):
name = X509Name.__new__(X509Name)
name._name = which(self._x509)
if name._name == _ffi.NULL:
1/0
# The name is owned by the X509 structure. As long as the X509Name
# Python object is alive, keep the X509 Python object alive.
name._owner = self
return name
def _set_name(self, which, name):
if not isinstance(name, X509Name):
raise TypeError("name must be an X509Name")
set_result = which(self._x509, name._name)
if not set_result:
1/0
def get_issuer(self):
"""
Create an X509Name object for the issuer of the certificate
:return: An X509Name object
"""
return self._get_name(_lib.X509_get_issuer_name)
def set_issuer(self, issuer):
"""
Set the issuer of the certificate
:param issuer: The issuer name
:type issuer: :py:class:`X509Name`
:return: None
"""
return self._set_name(_lib.X509_set_issuer_name, issuer)
def get_subject(self):
"""
Create an X509Name object for the subject of the certificate
:return: An X509Name object
"""
return self._get_name(_lib.X509_get_subject_name)
def set_subject(self, subject):
"""
Set the subject of the certificate
:param subject: The subject name
:type subject: :py:class:`X509Name`
:return: None
"""
return self._set_name(_lib.X509_set_subject_name, subject)
def get_extension_count(self):
"""
Get the number of extensions on the certificate.
:return: The number of extensions as an integer.
"""
return _lib.X509_get_ext_count(self._x509)
def add_extensions(self, extensions):
"""
Add extensions to the certificate.
:param extensions: a sequence of X509Extension objects
:return: None
"""
for ext in extensions:
if not isinstance(ext, X509Extension):
raise ValueError("One of the elements is not an X509Extension")
add_result = _lib.X509_add_ext(self._x509, ext._extension, -1)
if not add_result:
_raise_current_error()
def get_extension(self, index):
"""
Get a specific extension of the certificate by index.
:param index: The index of the extension to retrieve.
:return: The X509Extension object at the specified index.
"""
ext = X509Extension.__new__(X509Extension)
ext._extension = _lib.X509_get_ext(self._x509, index)
if ext._extension == _ffi.NULL:
raise IndexError("extension index out of bounds")
extension = _lib.X509_EXTENSION_dup(ext._extension)
ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
return ext
X509Type = X509
class X509Store(object):
def __init__(self):
store = _lib.X509_STORE_new()
self._store = _ffi.gc(store, _lib.X509_STORE_free)
def add_cert(self, cert):
if not isinstance(cert, X509):
raise TypeError()
result = _lib.X509_STORE_add_cert(self._store, cert._x509)
if not result:
_raise_current_error(Error)
X509StoreType = X509Store
def load_certificate(type, buffer):
"""
Load a certificate from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the certificate is stored in
:type buffer: :py:class:`bytes`
:return: The X509 object
"""
bio = _new_mem_buf(buffer)
if type == FILETYPE_PEM:
x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
elif type == FILETYPE_ASN1:
x509 = _lib.d2i_X509_bio(bio, _ffi.NULL);
else:
raise ValueError(
"type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if x509 == _ffi.NULL:
_raise_current_error()
cert = X509.__new__(X509)
cert._x509 = _ffi.gc(x509, _lib.X509_free)
return cert
def dump_certificate(type, cert):
"""
Dump a certificate to a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
FILETYPE_TEXT)
:param cert: The certificate to dump
:return: The buffer with the dumped certificate in
"""
bio = _new_mem_buf()
if type == FILETYPE_PEM:
result_code = _lib.PEM_write_bio_X509(bio, cert._x509)
elif type == FILETYPE_ASN1:
result_code = _lib.i2d_X509_bio(bio, cert._x509)
elif type == FILETYPE_TEXT:
result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0)
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
"FILETYPE_TEXT")
return _bio_to_string(bio)
def dump_privatekey(type, pkey, cipher=None, passphrase=None):
"""
Dump a private key to a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
FILETYPE_TEXT)
:param pkey: The PKey to dump
:param cipher: (optional) if encrypted PEM format, the cipher to
use
:param passphrase: (optional) if encrypted PEM format, this can be either
the passphrase to use, or a callback for providing the
passphrase.
:return: The buffer with the dumped key in
:rtype: :py:data:`str`
"""
bio = _new_mem_buf()
if cipher is not None:
cipher_obj = _lib.EVP_get_cipherbyname(cipher)
if cipher_obj == _ffi.NULL:
raise ValueError("Invalid cipher name")
else:
cipher_obj = _ffi.NULL
helper = _PassphraseHelper(type, passphrase)
if type == FILETYPE_PEM:
result_code = _lib.PEM_write_bio_PrivateKey(
bio, pkey._pkey, cipher_obj, _ffi.NULL, 0,
helper.callback, helper.callback_args)
helper.raise_if_problem()
elif type == FILETYPE_ASN1:
result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
elif type == FILETYPE_TEXT:
rsa = _lib.EVP_PKEY_get1_RSA(pkey._pkey)
result_code = _lib.RSA_print(bio, rsa, 0)
# TODO RSA_free(rsa)?
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
"FILETYPE_TEXT")
if result_code == 0:
_raise_current_error()
return _bio_to_string(bio)
def _X509_REVOKED_dup(original):
copy = _lib.X509_REVOKED_new()
if copy == _ffi.NULL:
1/0
if original.serialNumber != _ffi.NULL:
copy.serialNumber = _lib.ASN1_INTEGER_dup(original.serialNumber)
if original.revocationDate != _ffi.NULL:
copy.revocationDate = _lib.M_ASN1_TIME_dup(original.revocationDate)
if original.extensions != _ffi.NULL:
extension_stack = _lib.sk_X509_EXTENSION_new_null()
for i in range(_lib.sk_X509_EXTENSION_num(original.extensions)):
original_ext = _lib.sk_X509_EXTENSION_value(original.extensions, i)
copy_ext = _lib.X509_EXTENSION_dup(original_ext)
_lib.sk_X509_EXTENSION_push(extension_stack, copy_ext)
copy.extensions = extension_stack
copy.sequence = original.sequence
return copy
class Revoked(object):
# http://www.openssl.org/docs/apps/x509v3_config.html#CRL_distribution_points_
# which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
# OCSP_crl_reason_str. We use the latter, just like the command line
# program.
_crl_reasons = [
"unspecified",
"keyCompromise",
"CACompromise",
"affiliationChanged",
"superseded",
"cessationOfOperation",
"certificateHold",
# "removeFromCRL",
]
def __init__(self):
revoked = _lib.X509_REVOKED_new()
self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free)
def set_serial(self, hex_str):
"""
Set the serial number of a revoked Revoked structure
:param hex_str: The new serial number.
:type hex_str: :py:data:`str`
:return: None
"""
bignum_serial = _ffi.gc(_lib.BN_new(), _lib.BN_free)
bignum_ptr = _ffi.new("BIGNUM**")
bignum_ptr[0] = bignum_serial
bn_result = _lib.BN_hex2bn(bignum_ptr, hex_str)
if not bn_result:
raise ValueError("bad hex string")
asn1_serial = _ffi.gc(
_lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL),
_lib.ASN1_INTEGER_free)
_lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
def get_serial(self):
"""
Return the serial number of a Revoked structure
:return: The serial number as a string
"""
bio = _new_mem_buf()
result = _lib.i2a_ASN1_INTEGER(bio, self._revoked.serialNumber)
if result < 0:
1/0
return _bio_to_string(bio)
def _delete_reason(self):
stack = self._revoked.extensions
for i in range(_lib.sk_X509_EXTENSION_num(stack)):
ext = _lib.sk_X509_EXTENSION_value(stack, i)
if _lib.OBJ_obj2nid(ext.object) == _lib.NID_crl_reason:
_lib.X509_EXTENSION_free(ext)
_lib.sk_X509_EXTENSION_delete(stack, i)
break
def set_reason(self, reason):
"""
Set the reason of a Revoked object.
If :py:data:`reason` is :py:data:`None`, delete the reason instead.
:param reason: The reason string.
:type reason: :py:class:`str` or :py:class:`NoneType`
:return: None
"""
if reason is None:
self._delete_reason()
elif not isinstance(reason, bytes):
raise TypeError("reason must be None or a byte string")
else:
reason = reason.lower().replace(' ', '')
reason_code = [r.lower() for r in self._crl_reasons].index(reason)
new_reason_ext = _lib.ASN1_ENUMERATED_new()
if new_reason_ext == _ffi.NULL:
1/0
new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free)
set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code)
if set_result == _ffi.NULL:
1/0
self._delete_reason()
add_result = _lib.X509_REVOKED_add1_ext_i2d(
self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0)
if not add_result:
1/0
def get_reason(self):
"""
Return the reason of a Revoked object.
:return: The reason as a string
"""
extensions = self._revoked.extensions
for i in range(_lib.sk_X509_EXTENSION_num(extensions)):
ext = _lib.sk_X509_EXTENSION_value(extensions, i)
if _lib.OBJ_obj2nid(ext.object) == _lib.NID_crl_reason:
bio = _new_mem_buf()
print_result = _lib.X509V3_EXT_print(bio, ext, 0, 0)
if not print_result:
print_result = _lib.M_ASN1_OCTET_STRING_print(bio, ext.value)
if print_result == 0:
1/0
return _bio_to_string(bio)
def all_reasons(self):
"""
Return a list of all the supported reason strings.
:return: A list of reason strings.
"""
return self._crl_reasons[:]
def set_rev_date(self, when):
"""
Set the revocation timestamp
:param when: A string giving the timestamp, in the format:
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
:return: None
"""
return _set_asn1_time(self._revoked.revocationDate, when)
def get_rev_date(self):
"""
Retrieve the revocation date
:return: A string giving the timestamp, in the format:
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
"""
return _get_asn1_time(self._revoked.revocationDate)
class CRL(object):
def __init__(self):
"""
Create a new empty CRL object.
"""
crl = _lib.X509_CRL_new()
self._crl = _ffi.gc(crl, _lib.X509_CRL_free)
def get_revoked(self):
"""
Return revoked portion of the CRL structure (by value not reference).
:return: A tuple of Revoked objects.
"""
results = []
revoked_stack = self._crl.crl.revoked
for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)):
revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i)
revoked_copy = _X509_REVOKED_dup(revoked)
pyrev = Revoked.__new__(Revoked)
pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free)
results.append(pyrev)
if results:
return tuple(results)
def add_revoked(self, revoked):
"""
Add a revoked (by value not reference) to the CRL structure
:param revoked: The new revoked.
:type revoked: :class:`X509`
:return: None
"""
copy = _X509_REVOKED_dup(revoked._revoked)
if copy == _ffi.NULL:
1/0
add_result = _lib.X509_CRL_add0_revoked(self._crl, copy)
# TODO what check on add_result?
def export(self, cert, key, type=FILETYPE_PEM, days=100):
"""
export a CRL as a string
:param cert: Used to sign CRL.
:type cert: :class:`X509`
:param key: Used to sign CRL.
:type key: :class:`PKey`
:param type: The export format, either :py:data:`FILETYPE_PEM`, :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`.
:param days: The number of days until the next update of this CRL.
:type days: :py:data:`int`
:return: :py:data:`str`
"""
if not isinstance(cert, X509):
raise TypeError("cert must be an X509 instance")
if not isinstance(key, PKey):
raise TypeError("key must be a PKey instance")
if not isinstance(type, int):
raise TypeError("type must be an integer")
bio = _lib.BIO_new(_lib.BIO_s_mem())
if bio == _ffi.NULL:
1/0
# A scratch time object to give different values to different CRL fields
sometime = _lib.ASN1_TIME_new()
if sometime == _ffi.NULL:
1/0
_lib.X509_gmtime_adj(sometime, 0)
_lib.X509_CRL_set_lastUpdate(self._crl, sometime)
_lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
_lib.X509_CRL_set_nextUpdate(self._crl, sometime)
_lib.X509_CRL_set_issuer_name(self._crl, _lib.X509_get_subject_name(cert._x509))
sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, _lib.EVP_md5())
if not sign_result:
_raise_current_error()
if type == FILETYPE_PEM:
ret = _lib.PEM_write_bio_X509_CRL(bio, self._crl)
elif type == FILETYPE_ASN1:
ret = _lib.i2d_X509_CRL_bio(bio, self._crl)
elif type == FILETYPE_TEXT:
ret = _lib.X509_CRL_print(bio, self._crl)
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
if not ret:
1/0
return _bio_to_string(bio)
CRLType = CRL
class PKCS7(object):
def type_is_signed(self):
"""
Check if this NID_pkcs7_signed object
:return: True if the PKCS7 is of type signed
"""
if _lib.PKCS7_type_is_signed(self._pkcs7):
return True
return False
def type_is_enveloped(self):
"""
Check if this NID_pkcs7_enveloped object
:returns: True if the PKCS7 is of type enveloped
"""
if _lib.PKCS7_type_is_enveloped(self._pkcs7):
return True
return False
def type_is_signedAndEnveloped(self):
"""
Check if this NID_pkcs7_signedAndEnveloped object
:returns: True if the PKCS7 is of type signedAndEnveloped
"""
if _lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7):
return True
return False
def type_is_data(self):
"""
Check if this NID_pkcs7_data object
:return: True if the PKCS7 is of type data
"""
if _lib.PKCS7_type_is_data(self._pkcs7):
return True
return False
def get_type_name(self):
"""
Returns the type name of the PKCS7 structure
:return: A string with the typename
"""
nid = _lib.OBJ_obj2nid(self._pkcs7.type)
string_type = _lib.OBJ_nid2sn(nid)
return _ffi.string(string_type)
PKCS7Type = PKCS7
class PKCS12(object):
def __init__(self):
self._pkey = None
self._cert = None
self._cacerts = None
self._friendlyname = None
def get_certificate(self):
"""
Return certificate portion of the PKCS12 structure
:return: X509 object containing the certificate
"""
return self._cert
def set_certificate(self, cert):
"""
Replace the certificate portion of the PKCS12 structure
:param cert: The new certificate.
:type cert: :py:class:`X509` or :py:data:`None`
:return: None
"""
if not isinstance(cert, X509):
raise TypeError("cert must be an X509 instance")
self._cert = cert
def get_privatekey(self):
"""
Return private key portion of the PKCS12 structure
:returns: PKey object containing the private key
"""
return self._pkey
def set_privatekey(self, pkey):
"""
Replace or set the certificate portion of the PKCS12 structure
:param pkey: The new private key.
:type pkey: :py:class:`PKey`
:return: None
"""
if not isinstance(pkey, PKey):
raise TypeError("pkey must be a PKey instance")
self._pkey = pkey
def get_ca_certificates(self):
"""
Return CA certificates within of the PKCS12 object
:return: A newly created tuple containing the CA certificates in the chain,
if any are present, or None if no CA certificates are present.
"""
if self._cacerts is not None:
return tuple(self._cacerts)
def set_ca_certificates(self, cacerts):
"""
Replace or set the CA certificates withing the PKCS12 object.
:param cacerts: The new CA certificates.
:type cacerts: :py:data:`None` or an iterable of :py:class:`X509`
:return: None
"""
if cacerts is None:
self._cacerts = None
else:
cacerts = list(cacerts)
for cert in cacerts:
if not isinstance(cert, X509):
raise TypeError("iterable must only contain X509 instances")
self._cacerts = cacerts
def set_friendlyname(self, name):
"""
Replace or set the certificate portion of the PKCS12 structure
:param name: The new friendly name.
:type name: :py:class:`bytes`
:return: None
"""
if name is None:
self._friendlyname = None
elif not isinstance(name, bytes):
raise TypeError("name must be a byte string or None (not %r)" % (name,))
self._friendlyname = name
def get_friendlyname(self):
"""
Return friendly name portion of the PKCS12 structure
:returns: String containing the friendlyname
"""
return self._friendlyname
def export(self, passphrase=None, iter=2048, maciter=1):
"""
Dump a PKCS12 object as a string. See also "man PKCS12_create".
:param passphrase: used to encrypt the PKCS12
:type passphrase: :py:data:`bytes`
:param iter: How many times to repeat the encryption
:type iter: :py:data:`int`
:param maciter: How many times to repeat the MAC
:type maciter: :py:data:`int`
:return: The string containing the PKCS12
"""
if self._cacerts is None:
cacerts = _ffi.NULL
else:
cacerts = _lib.sk_X509_new_null()
cacerts = _ffi.gc(cacerts, _lib.sk_X509_free)
for cert in self._cacerts:
_lib.sk_X509_push(cacerts, cert._x509)
if passphrase is None:
passphrase = _ffi.NULL
friendlyname = self._friendlyname
if friendlyname is None:
friendlyname = _ffi.NULL
if self._pkey is None:
pkey = _ffi.NULL
else:
pkey = self._pkey._pkey
if self._cert is None:
cert = _ffi.NULL
else:
cert = self._cert._x509
pkcs12 = _lib.PKCS12_create(
passphrase, friendlyname, pkey, cert, cacerts,
_lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
_lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
iter, maciter, 0)
if pkcs12 == _ffi.NULL:
_raise_current_error()
pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free)
bio = _new_mem_buf()
_lib.i2d_PKCS12_bio(bio, pkcs12)
return _bio_to_string(bio)
PKCS12Type = PKCS12
class NetscapeSPKI(object):
def __init__(self):
spki = _lib.NETSCAPE_SPKI_new()
self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free)
def sign(self, pkey, digest):
"""
Sign the certificate request using the supplied key and digest
:param pkey: The key to sign with
:param digest: The message digest to use
:return: None
"""
if pkey._only_public:
raise ValueError("Key has only public part")
if not pkey._initialized:
raise ValueError("Key is uninitialized")
digest_obj = _lib.EVP_get_digestbyname(digest)
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
sign_result = _lib.NETSCAPE_SPKI_sign(self._spki, pkey._pkey, digest_obj)
if not sign_result:
1/0
def verify(self, key):
"""
Verifies a certificate request using the supplied public key
:param key: a public key
:return: True if the signature is correct.
:raise OpenSSL.crypto.Error: If the signature is invalid or there is a
problem verifying the signature.
"""
answer = _lib.NETSCAPE_SPKI_verify(self._spki, key._pkey)
if answer <= 0:
_raise_current_error()
return True
def b64_encode(self):
"""
Generate a base64 encoded string from an SPKI
:return: The base64 encoded string
"""
encoded = _lib.NETSCAPE_SPKI_b64_encode(self._spki)
result = _ffi.string(encoded)
_lib.CRYPTO_free(encoded)
return result
def get_pubkey(self):
"""
Get the public key of the certificate
:return: The public key
"""
pkey = PKey.__new__(PKey)
pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki)
if pkey._pkey == _ffi.NULL:
1/0
pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
pkey._only_public = True
return pkey
def set_pubkey(self, pkey):
"""
Set the public key of the certificate
:param pkey: The public key
:return: None
"""
set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey)
if not set_result:
1/0
NetscapeSPKIType = NetscapeSPKI
class _PassphraseHelper(object):
def __init__(self, type, passphrase, more_args=False, truncate=False):
if type != FILETYPE_PEM and passphrase is not None:
raise ValueError("only FILETYPE_PEM key format supports encryption")
self._passphrase = passphrase
self._more_args = more_args
self._truncate = truncate
self._problems = []
@property
def callback(self):
if self._passphrase is None:
return _ffi.NULL
elif isinstance(self._passphrase, bytes):
return _ffi.NULL
elif callable(self._passphrase):
return _ffi.callback("pem_password_cb", self._read_passphrase)
else:
raise TypeError("Last argument must be string or callable")
@property
def callback_args(self):
if self._passphrase is None:
return _ffi.NULL
elif isinstance(self._passphrase, bytes):
return self._passphrase
elif callable(self._passphrase):
return _ffi.NULL
else:
raise TypeError("Last argument must be string or callable")
def raise_if_problem(self, exceptionType=Error):
try:
_raise_current_error(exceptionType)
except exceptionType as e:
pass
if self._problems:
raise self._problems[0]
return e
def _read_passphrase(self, buf, size, rwflag, userdata):
try:
if self._more_args:
result = self._passphrase(size, rwflag, userdata)
else:
result = self._passphrase(rwflag)
if not isinstance(result, bytes):
raise ValueError("String expected")
if len(result) > size:
if self._truncate:
result = result[:size]
else:
raise ValueError("passphrase returned by callback is too long")
for i in range(len(result)):
buf[i] = result[i]
return len(result)
except Exception as e:
self._problems.append(e)
return 0
def load_privatekey(type, buffer, passphrase=None):
"""
Load a private key from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the key is stored in
:param passphrase: (optional) if encrypted PEM format, this can be
either the passphrase to use, or a callback for
providing the passphrase.
:return: The PKey object
"""
bio = _new_mem_buf(buffer)
helper = _PassphraseHelper(type, passphrase)
if type == FILETYPE_PEM:
evp_pkey = _lib.PEM_read_bio_PrivateKey(
bio, _ffi.NULL, helper.callback, helper.callback_args)
helper.raise_if_problem()
elif type == FILETYPE_ASN1:
evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
else:
raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if evp_pkey == _ffi.NULL:
_raise_current_error()
pkey = PKey.__new__(PKey)
pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
return pkey
def dump_certificate_request(type, req):
"""
Dump a certificate request to a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param req: The certificate request to dump
:return: The buffer with the dumped certificate request in
"""
bio = _new_mem_buf()
if type == FILETYPE_PEM:
result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req)
elif type == FILETYPE_ASN1:
result_code = _lib.i2d_X509_REQ_bio(bio, req._req)
elif type == FILETYPE_TEXT:
result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0)
else:
raise ValueError("type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
if result_code == 0:
1/0
return _bio_to_string(bio)
def load_certificate_request(type, buffer):
"""
Load a certificate request from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the certificate request is stored in
:return: The X509Req object
"""
bio = _new_mem_buf(buffer)
if type == FILETYPE_PEM:
req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
elif type == FILETYPE_ASN1:
req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL)
else:
1/0
if req == _ffi.NULL:
1/0
x509req = X509Req.__new__(X509Req)
x509req._req = _ffi.gc(req, _lib.X509_REQ_free)
return x509req
def sign(pkey, data, digest):
"""
Sign data with a digest
:param pkey: Pkey to sign with
:param data: data to be signed
:param digest: message digest to use
:return: signature
"""
digest_obj = _lib.EVP_get_digestbyname(digest)
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
md_ctx = _ffi.new("EVP_MD_CTX*")
md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_cleanup)
_lib.EVP_SignInit(md_ctx, digest_obj)
_lib.EVP_SignUpdate(md_ctx, data, len(data))
signature_buffer = _ffi.new("unsigned char[]", 512)
signature_length = _ffi.new("unsigned int*")
signature_length[0] = len(signature_buffer)
final_result = _lib.EVP_SignFinal(
md_ctx, signature_buffer, signature_length, pkey._pkey)
if final_result != 1:
1/0
return _ffi.buffer(signature_buffer, signature_length[0])[:]
def verify(cert, signature, data, digest):
"""
Verify a signature
:param cert: signing certificate (X509 object)
:param signature: signature returned by sign function
:param data: data to be verified
:param digest: message digest to use
:return: None if the signature is correct, raise exception otherwise
"""
digest_obj = _lib.EVP_get_digestbyname(digest)
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
pkey = _lib.X509_get_pubkey(cert._x509)
if pkey == _ffi.NULL:
1/0
pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
md_ctx = _ffi.new("EVP_MD_CTX*")
md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_cleanup)
_lib.EVP_VerifyInit(md_ctx, digest_obj)
_lib.EVP_VerifyUpdate(md_ctx, data, len(data))
verify_result = _lib.EVP_VerifyFinal(md_ctx, signature, len(signature), pkey)
if verify_result != 1:
_raise_current_error()
def load_crl(type, buffer):
"""
Load a certificate revocation list from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the CRL is stored in
:return: The PKey object
"""
bio = _new_mem_buf(buffer)
if type == FILETYPE_PEM:
crl = _lib.PEM_read_bio_X509_CRL(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
elif type == FILETYPE_ASN1:
crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL)
else:
raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if crl == _ffi.NULL:
_raise_current_error()
result = CRL.__new__(CRL)
result._crl = crl
return result
def load_pkcs7_data(type, buffer):
"""
Load pkcs7 data from a buffer
:param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1)
:param buffer: The buffer with the pkcs7 data.
:return: The PKCS7 object
"""
bio = _new_mem_buf(buffer)
if type == FILETYPE_PEM:
pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
elif type == FILETYPE_ASN1:
pass
else:
1/0
raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if pkcs7 == _ffi.NULL:
_raise_current_error()
pypkcs7 = PKCS7.__new__(PKCS7)
pypkcs7._pkcs7 = _ffi.gc(pkcs7, _lib.PKCS7_free)
return pypkcs7
def load_pkcs12(buffer, passphrase):
"""
Load a PKCS12 object from a buffer
:param buffer: The buffer the certificate is stored in
:param passphrase: (Optional) The password to decrypt the PKCS12 lump
:returns: The PKCS12 object
"""
bio = _new_mem_buf(buffer)
p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL)
if p12 == _ffi.NULL:
_raise_current_error()
p12 = _ffi.gc(p12, _lib.PKCS12_free)
pkey = _ffi.new("EVP_PKEY**")
cert = _ffi.new("X509**")
cacerts = _ffi.new("Cryptography_STACK_OF_X509**")
parse_result = _lib.PKCS12_parse(p12, passphrase, pkey, cert, cacerts)
if not parse_result:
_raise_current_error()
cacerts = _ffi.gc(cacerts[0], _lib.sk_X509_free)
# openssl 1.0.0 sometimes leaves an X509_check_private_key error in the
# queue for no particular reason. This error isn't interesting to anyone
# outside this function. It's not even interesting to us. Get rid of it.
try:
_raise_current_error()
except Error:
pass
if pkey[0] == _ffi.NULL:
pykey = None
else:
pykey = PKey.__new__(PKey)
pykey._pkey = _ffi.gc(pkey[0], _lib.EVP_PKEY_free)
if cert[0] == _ffi.NULL:
pycert = None
friendlyname = None
else:
pycert = X509.__new__(X509)
pycert._x509 = _ffi.gc(cert[0], _lib.X509_free)
friendlyname_length = _ffi.new("int*")
friendlyname_buffer = _lib.X509_alias_get0(cert[0], friendlyname_length)
friendlyname = _ffi.buffer(friendlyname_buffer, friendlyname_length[0])[:]
if friendlyname_buffer == _ffi.NULL:
friendlyname = None
pycacerts = []
for i in range(_lib.sk_X509_num(cacerts)):
pycacert = X509.__new__(X509)
pycacert._x509 = _lib.sk_X509_value(cacerts, i)
pycacerts.append(pycacert)
if not pycacerts:
pycacerts = None
pkcs12 = PKCS12.__new__(PKCS12)
pkcs12._pkey = pykey
pkcs12._cert = pycert
pkcs12._cacerts = pycacerts
pkcs12._friendlyname = friendlyname
return pkcs12