blob: 1329f939d6dcf5861ba9187548dd07677de7b74a [file] [log] [blame]
from time import time
from OpenSSL.xcrypto import *
from tls.c import api as _api
FILETYPE_PEM = _api.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _api.SSL_FILETYPE_ASN1
# TODO This was an API mistake. OpenSSL has no such constant.
FILETYPE_TEXT = 2 ** 16 - 1
def _bio_to_string(bio):
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
"""
result_buffer = _api.new('char**')
buffer_length = _api.BIO_get_mem_data(bio, result_buffer)
return _api.buffer(result_buffer[0], buffer_length)[:]
def _raise_current_error():
errors = []
while True:
error = _api.ERR_get_error()
if error == 0:
break
errors.append((
_api.string(_api.ERR_lib_error_string(error)),
_api.string(_api.ERR_func_error_string(error)),
_api.string(_api.ERR_reason_error_string(error))))
raise Error(errors)
_exception_from_error_queue = _raise_current_error
class Error(Exception):
pass
class PKey(object):
def __init__(self):
pass
def check(self):
"""
Check the consistency of an RSA private key.
:return: True if key is consistent.
:raise Error: if the key is inconsistent.
:raise TypeError: if the key is of a type which cannot be checked.
Only RSA keys can currently be checked.
"""
if _api.EVP_PKEY_type(self._pkey.type) != _api.EVP_PKEY_RSA:
raise TypeError("key type unsupported")
rsa = _api.EVP_PKEY_get1_RSA(self._pkey)
result = _api.RSA_check_key(rsa)
if result:
return True
_raise_current_error()
class X509(object):
def __init__(self):
# TODO Allocation failure? And why not __new__ instead of __init__?
self._x509 = _api.X509_new()
def set_version(self, version):
"""
Set version number of the certificate
:param version: The version number
:type version: :py:class:`int`
:return: None
"""
if not isinstance(version, int):
raise TypeError("version must be an integer")
_api.X509_set_version(self._x509, version)
def get_version(self):
"""
Return version number of the certificate
:return: Version number as a Python integer
"""
return _api.X509_get_version(self._x509)
def subject_name_hash(self):
"""
Return the hash of the X509 subject.
:return: The hash of the subject.
"""
return _api.X509_subject_name_hash(self._x509)
def set_serial_number(self, serial):
"""
Set serial number of the certificate
:param serial: The serial number
:type serial: :py:class:`int`
:return: None
"""
if not isinstance(serial, (int, long)):
raise TypeError("serial must be an integer")
hex_serial = hex(serial)[2:]
if not isinstance(hex_serial, bytes):
hex_serial = hex_serial.encode('ascii')
bignum_serial = _api.new("BIGNUM**")
# BN_hex2bn stores the result in &bignum. Unless it doesn't feel like
# it. If bignum is still NULL after this call, then the return value is
# actually the result. I hope. -exarkun
small_serial = _api.BN_hex2bn(bignum_serial, hex_serial)
if bignum_serial[0] == _api.NULL:
set_result = ASN1_INTEGER_set(
_api.X509_get_serialNumber(self._x509), small_serial)
if set_result:
# TODO Not tested
_raise_current_error()
else:
asn1_serial = _api.BN_to_ASN1_INTEGER(bignum_serial[0], _api.NULL)
_api.BN_free(bignum_serial[0])
if asn1_serial == _api.NULL:
# TODO Not tested
_raise_current_error()
set_result = _api.X509_set_serialNumber(self._x509, asn1_serial)
if not set_result:
# TODO Not tested
_raise_current_error()
def get_serial_number(self):
"""
Return serial number of the certificate
:return: Serial number as a Python integer
"""
asn1_serial = _api.X509_get_serialNumber(self._x509)
bignum_serial = _api.ASN1_INTEGER_to_BN(asn1_serial, _api.NULL)
try:
hex_serial = _api.BN_bn2hex(bignum_serial)
try:
hexstring_serial = _api.string(hex_serial)
serial = int(hexstring_serial, 16)
return serial
finally:
_api.OPENSSL_free(hex_serial)
finally:
_api.BN_free(bignum_serial)
def gmtime_adj_notAfter(self, amount):
"""
Adjust the time stamp for when the certificate stops being valid
:param amount: The number of seconds by which to adjust the ending
validity time.
:type amount: :py:class:`int`
:return: None
"""
if not isinstance(amount, int):
raise TypeError("amount must be an integer")
notAfter = _api.X509_get_notAfter(self._x509)
_api.X509_gmtime_adj(notAfter, amount)
def has_expired(self):
"""
Check whether the certificate has expired.
:return: True if the certificate has expired, false otherwise
"""
now = int(time())
notAfter = _api.X509_get_notAfter(self._x509)
return _api.ASN1_UTCTIME_cmp_time_t(notAfter, now) < 0
def get_notBefore(self):
"""
Retrieve the time stamp for when the certificate starts being valid
:return: A string giving the timestamp, in the format::
YYYYMMDDhhmmssZ\n\
YYYYMMDDhhmmss+hhmm\n\
YYYYMMDDhhmmss-hhmm\n\
or None if there is no value set.
"""
def set_notBefore(self, when):
"""
Set the time stamp for when the certificate starts being valid
:param when: A string giving the timestamp, in the format:
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
:type when: :py:class:`bytes`
:return: None
"""
notBefore = _api.X509_get_notBefore(self._x509)
_api.ASN1_GENERALIZEDTIME_set_string(notBefore, when)
def set_notAfter(self, when):
"""
Set the time stamp for when the certificate stops being valid
:param when: A string giving the timestamp, in the format:
YYYYMMDDhhmmssZ
YYYYMMDDhhmmss+hhmm
YYYYMMDDhhmmss-hhmm
:type when: :py:class:`bytes`
:return: None
"""
notAfter = _api.X509_get_notAfter(self._x509)
_api.ASN1_GENERALIZEDTIME_set_string(notAfter, when)
X509Type = X509
def load_certificate(type, buffer):
"""
Load a certificate from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the certificate is stored in
:type buffer: :py:class:`bytes`
:return: The X509 object
"""
bio = _api.BIO_new_mem_buf(buffer, len(buffer))
try:
if type == FILETYPE_PEM:
x509 = _api.PEM_read_bio_X509(bio, _api.NULL, _api.NULL, _api.NULL)
elif type == FILETYPE_ASN1:
x509 = _api.d2i_X509_bio(bio, _api.NULL);
else:
raise ValueError(
"type argument must be FILETYPE_PEM or FILETYPE_ASN1")
finally:
_api.BIO_free(bio)
if x509 == _api.NULL:
_raise_current_error()
cert = X509.__new__(X509)
cert._x509 = x509
return cert
def dump_certificate(type, cert):
"""
Dump a certificate to a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param cert: The certificate to dump
:return: The buffer with the dumped certificate in
"""
bio = _api.BIO_new(_api.BIO_s_mem())
if type == FILETYPE_PEM:
result_code = _api.PEM_write_bio_X509(bio, cert._x509)
elif type == FILETYPE_ASN1:
result_code = _api.i2d_X509_bio(bio, cert._x509)
elif type == FILETYPE_TEXT:
result_code = _api.X509_print_ex(bio, cert._x509, 0, 0)
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
"FILETYPE_TEXT")
return _bio_to_string(bio)
def dump_privatekey(type, pkey, cipher=None, passphrase=None):
"""
Dump a private key to a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param pkey: The PKey to dump
:param cipher: (optional) if encrypted PEM format, the cipher to
use
:param passphrase: (optional) if encrypted PEM format, this can be either
the passphrase to use, or a callback for providing the
passphrase.
:return: The buffer with the dumped key in
:rtype: :py:data:`str`
"""
# TODO incomplete
bio = _api.BIO_new(_api.BIO_s_mem())
if type == FILETYPE_PEM:
result_code = _api.PEM_write_bio_PrivateKey(
bio, pkey._pkey, _api.NULL, _api.NULL, 0, _api.NULL, _api.NULL)
elif type == FILETYPE_ASN1:
result_code = _api.i2d_PrivateKey_bio(bio, pkey._pkey)
elif type == FILETYPE_TEXT:
rsa = _api.EVP_PKEY_get1_RSA(pkey._pkey)
result_code = _api.RSA_print(bio, rsa, 0)
# TODO RSA_free(rsa)?
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
"FILETYPE_TEXT")
if result_code == 0:
_raise_current_error()
return _bio_to_string(bio)
def load_privatekey(type, buffer, passphrase=None):
"""
Load a private key from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the key is stored in
:param passphrase: (optional) if encrypted PEM format, this can be
either the passphrase to use, or a callback for
providing the passphrase.
:return: The PKey object
"""
# TODO incomplete
bio = _api.BIO_new_mem_buf(buffer, len(buffer))
if type == FILETYPE_PEM:
evp_pkey = _api.PEM_read_bio_PrivateKey(bio, _api.NULL, _api.NULL, _api.NULL)
elif type == FILETYPE_ASN1:
evp_pkey = _api.d2i_PrivateKey_bio(bio, _api.NULL)
else:
raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
pkey = PKey.__new__(PKey)
pkey._pkey = evp_pkey
return pkey
def dump_certificate_request(type, req):
"""
Dump a certificate request to a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param req: The certificate request to dump
:return: The buffer with the dumped certificate request in
"""
def load_certificate_request(type, buffer):
"""
Load a certificate request from a buffer
:param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
:param buffer: The buffer the certificate request is stored in
:return: The X509Req object
"""