Merge branch 'master' into feature/OP_SINGLE_ECDH_USE
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py
index 56f0eca..ae03169 100644
--- a/OpenSSL/SSL.py
+++ b/OpenSSL/SSL.py
@@ -136,13 +136,13 @@
SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START
SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE
+
class Error(Exception):
"""
An error occurred in an `OpenSSL.SSL` API.
"""
-
_raise_current_error = partial(_exception_from_error_queue, Error)
@@ -150,22 +150,18 @@
pass
-
class WantWriteError(Error):
pass
-
class WantX509LookupError(Error):
pass
-
class ZeroReturnError(Error):
pass
-
class SysCallError(Error):
pass
@@ -184,7 +180,6 @@
def __init__(self):
self._problems = []
-
def raise_if_problem(self):
"""
Raise an exception from the OpenSSL error queue or that was previously
@@ -390,7 +385,6 @@
return fd
-
def SSLeay_version(type):
"""
Return a string describing the version of OpenSSL in use.
@@ -415,7 +409,6 @@
return wrapper
-
def _requires_alpn(func):
"""
Wraps any function that requires ALPN support in OpenSSL, ensuring that
@@ -431,12 +424,10 @@
return wrapper
-
class Session(object):
pass
-
class Context(object):
"""
:py:obj:`OpenSSL.SSL.Context` instances define the parameters for setting up
@@ -455,7 +446,6 @@
for (identifier, name) in _methods.items()
if getattr(_lib, name, None) is not None)
-
def __init__(self, method):
"""
:param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or
@@ -502,7 +492,6 @@
# SSL_MODE_AUTO_RETRY);
self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE)
-
def load_verify_locations(self, cafile, capath=None):
"""
Let SSL know where we can find trusted certificates for the certificate
@@ -529,7 +518,6 @@
if not load_result:
_raise_current_error()
-
def _wrap_callback(self, callback):
@wraps(callback)
def wrapper(size, verify, userdata):
@@ -537,7 +525,6 @@
return _PassphraseHelper(
FILETYPE_PEM, wrapper, more_args=True, truncate=True)
-
def set_passwd_cb(self, callback, userdata=None):
"""
Set the passphrase callback
@@ -556,7 +543,6 @@
self._context, self._passphrase_callback)
self._passphrase_userdata = userdata
-
def set_default_verify_paths(self):
"""
Use the platform-specific CA certificate locations
@@ -568,7 +554,6 @@
# TODO: This is untested.
_raise_current_error()
-
def use_certificate_chain_file(self, certfile):
"""
Load a certificate chain from a file
@@ -584,7 +569,6 @@
if not result:
_raise_current_error()
-
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
"""
Load a certificate from a file
@@ -603,7 +587,6 @@
if not use_result:
_raise_current_error()
-
def use_certificate(self, cert):
"""
Load a certificate from a X509 object
@@ -618,7 +601,6 @@
if not use_result:
_raise_current_error()
-
def add_extra_chain_cert(self, certobj):
"""
Add certificate to chain
@@ -636,7 +618,6 @@
_lib.X509_free(copy)
_raise_current_error()
-
def _raise_passphrase_exception(self):
if self._passphrase_helper is None:
_raise_current_error()
@@ -644,7 +625,6 @@
if exception is not None:
raise exception
-
def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
"""
Load a private key from a file
@@ -666,7 +646,6 @@
if not use_result:
self._raise_passphrase_exception()
-
def use_privatekey(self, pkey):
"""
Load a private key from a PKey object
@@ -681,7 +660,6 @@
if not use_result:
self._raise_passphrase_exception()
-
def check_privatekey(self):
"""
Check that the private key and certificate match up
@@ -691,7 +669,6 @@
if not _lib.SSL_CTX_check_private_key(self._context):
_raise_current_error()
-
def load_client_ca(self, cafile):
"""
Load the trusted certificates that will be sent to the client (basically
@@ -725,14 +702,12 @@
return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
-
def get_session_cache_mode(self):
"""
:returns: The currently used cache mode.
"""
return _lib.SSL_CTX_get_session_cache_mode(self._context)
-
def set_verify(self, mode, callback):
"""
Set the verify mode and verify callback
@@ -754,7 +729,6 @@
self._verify_callback = self._verify_helper.callback
_lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
-
def set_verify_depth(self, depth):
"""
Set the verify depth
@@ -767,7 +741,6 @@
_lib.SSL_CTX_set_verify_depth(self._context, depth)
-
def get_verify_mode(self):
"""
Get the verify mode
@@ -776,7 +749,6 @@
"""
return _lib.SSL_CTX_get_verify_mode(self._context)
-
def get_verify_depth(self):
"""
Get the verify depth
@@ -785,7 +757,6 @@
"""
return _lib.SSL_CTX_get_verify_depth(self._context)
-
def load_tmp_dh(self, dhfile):
"""
Load parameters for Ephemeral Diffie-Hellman
@@ -806,7 +777,6 @@
dh = _ffi.gc(dh, _lib.DH_free)
_lib.SSL_CTX_set_tmp_dh(self._context, dh)
-
def set_tmp_ecdh(self, curve):
"""
Select a curve to use for ECDHE key exchange.
@@ -819,7 +789,6 @@
"""
_lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
-
def set_cipher_list(self, cipher_list):
"""
Change the cipher list
@@ -837,7 +806,6 @@
if not result:
_raise_current_error()
-
def set_client_ca_list(self, certificate_authorities):
"""
Set the list of preferred client certificate signers for this server context.
@@ -873,7 +841,6 @@
_lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
-
def add_client_ca(self, certificate_authority):
"""
Add the CA certificate to the list of preferred signers for this context.
@@ -893,7 +860,6 @@
# TODO: This is untested.
_raise_current_error()
-
def set_timeout(self, timeout):
"""
Set session timeout
@@ -906,7 +872,6 @@
return _lib.SSL_CTX_set_timeout(self._context, timeout)
-
def get_timeout(self):
"""
Get the session timeout
@@ -915,7 +880,6 @@
"""
return _lib.SSL_CTX_get_timeout(self._context)
-
def set_info_callback(self, callback):
"""
Set the info callback
@@ -930,7 +894,6 @@
"void (*)(const SSL *, int, int)", wrapper)
_lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
-
def get_app_data(self):
"""
Get the application data (supplied via set_app_data())
@@ -939,7 +902,6 @@
"""
return self._app_data
-
def set_app_data(self, data):
"""
Set the application data (will be returned from get_app_data())
@@ -949,7 +911,6 @@
"""
self._app_data = data
-
def get_cert_store(self):
"""
Get the certificate store for the context.
@@ -965,7 +926,6 @@
pystore._store = store
return pystore
-
def set_options(self, options):
"""
Add options. Options set before are not cleared!
@@ -978,7 +938,6 @@
return _lib.SSL_CTX_set_options(self._context, options)
-
def set_mode(self, mode):
"""
Add modes via bitmask. Modes set before are not cleared!
@@ -991,7 +950,6 @@
return _lib.SSL_CTX_set_mode(self._context, mode)
-
def set_tlsext_servername_callback(self, callback):
"""
Specify a callback function to be called when clients specify a server name.
@@ -1009,7 +967,6 @@
_lib.SSL_CTX_set_tlsext_servername_callback(
self._context, self._tlsext_servername_callback)
-
@_requires_npn
def set_npn_advertise_callback(self, callback):
"""
@@ -1027,7 +984,6 @@
_lib.SSL_CTX_set_next_protos_advertised_cb(
self._context, self._npn_advertise_callback, _ffi.NULL)
-
@_requires_npn
def set_npn_select_callback(self, callback):
"""
@@ -1085,7 +1041,6 @@
ContextType = Context
-
class Connection(object):
"""
"""
@@ -1141,7 +1096,6 @@
# TODO: This is untested.
_raise_current_error()
-
def __getattr__(self, name):
"""
Look up attributes on the wrapped socket object if they are not found on
@@ -1152,7 +1106,6 @@
else:
return getattr(self._socket, name)
-
def _raise_ssl_error(self, ssl, result):
if self._context._verify_helper is not None:
self._context._verify_helper.raise_if_problem()
@@ -1191,14 +1144,12 @@
else:
_raise_current_error()
-
def get_context(self):
"""
Get session context
"""
return self._context
-
def set_context(self, context):
"""
Switch this connection to a new session context
@@ -1212,7 +1163,6 @@
_lib.SSL_set_SSL_CTX(self._ssl, context._context)
self._context = context
-
def get_servername(self):
"""
Retrieve the servername extension value if provided in the client hello
@@ -1226,7 +1176,6 @@
return _ffi.string(name)
-
def set_tlsext_host_name(self, name):
"""
Set the value of the servername extension to send in the client hello.
@@ -1241,7 +1190,6 @@
# XXX I guess this can fail sometimes?
_lib.SSL_set_tlsext_host_name(self._ssl, name)
-
def pending(self):
"""
Get the number of bytes that can be safely read from the connection
@@ -1250,7 +1198,6 @@
"""
return _lib.SSL_pending(self._ssl)
-
def send(self, buf, flags=0):
"""
Send data on the connection. NOTE: If you get one of the WantRead,
@@ -1277,7 +1224,6 @@
return result
write = send
-
def sendall(self, buf, flags=0):
"""
Send "all" data on the connection. This calls send() repeatedly until
@@ -1308,7 +1254,6 @@
total_sent += result
left_to_send -= result
-
def recv(self, bufsiz, flags=None):
"""
Receive data on the connection. NOTE: If you get one of the WantRead,
@@ -1329,7 +1274,6 @@
return _ffi.buffer(buf, result)[:]
read = recv
-
def recv_into(self, buffer, nbytes=None, flags=None):
"""
Receive data on the connection and store the data into a buffer rather
@@ -1371,7 +1315,6 @@
return result
-
def _handle_bio_errors(self, bio, result):
if _lib.BIO_should_retry(bio):
if _lib.BIO_should_read(bio):
@@ -1390,7 +1333,6 @@
# TODO: This is untested.
_raise_current_error()
-
def bio_read(self, bufsiz):
"""
When using non-socket connections this function reads the "dirty" data
@@ -1412,7 +1354,6 @@
return _ffi.buffer(buf, result)[:]
-
def bio_write(self, buf):
"""
When using non-socket connections this function sends "dirty" data that
@@ -1431,7 +1372,6 @@
self._handle_bio_errors(self._into_ssl, result)
return result
-
def renegotiate(self):
"""
Renegotiate the session
@@ -1449,7 +1389,6 @@
result = _lib.SSL_do_handshake(self._ssl)
self._raise_ssl_error(self._ssl, result)
-
def renegotiate_pending(self):
"""
Check if there's a renegotiation in progress, it will return false once
@@ -1466,7 +1405,6 @@
"""
return _lib.SSL_total_renegotiations(self._ssl)
-
def connect(self, addr):
"""
Connect to remote host and set up client-side SSL
@@ -1477,7 +1415,6 @@
_lib.SSL_set_connect_state(self._ssl)
return self._socket.connect(addr)
-
def connect_ex(self, addr):
"""
Connect to remote host and set up client-side SSL. Note that if the socket's
@@ -1490,7 +1427,6 @@
self.set_connect_state()
return connect_ex(addr)
-
def accept(self):
"""
Accept incoming connection and set up SSL on it
@@ -1503,7 +1439,6 @@
conn.set_accept_state()
return (conn, addr)
-
def bio_shutdown(self):
"""
When using non-socket connections this function signals end of
@@ -1516,7 +1451,6 @@
_lib.BIO_set_mem_eof_return(self._into_ssl, 0)
-
def shutdown(self):
"""
Send closure alert
@@ -1533,7 +1467,6 @@
else:
return False
-
def get_cipher_list(self):
"""
Get the session cipher list
@@ -1548,7 +1481,6 @@
ciphers.append(_native(_ffi.string(result)))
return ciphers
-
def get_client_ca_list(self):
"""
Get CAs whose certificates are suggested for client authentication.
@@ -1577,7 +1509,6 @@
result.append(pyname)
return result
-
def makefile(self):
"""
The makefile() method is not implemented, since there is no dup semantics
@@ -1585,8 +1516,8 @@
:raise: NotImplementedError
"""
- raise NotImplementedError("Cannot make file object of OpenSSL.SSL.Connection")
-
+ raise NotImplementedError(
+ "Cannot make file object of OpenSSL.SSL.Connection")
def get_app_data(self):
"""
@@ -1596,7 +1527,6 @@
"""
return self._app_data
-
def set_app_data(self, data):
"""
Set application data
@@ -1606,7 +1536,6 @@
"""
self._app_data = data
-
def get_shutdown(self):
"""
Get shutdown state
@@ -1615,7 +1544,6 @@
"""
return _lib.SSL_get_shutdown(self._ssl)
-
def set_shutdown(self, state):
"""
Set shutdown state
@@ -1628,7 +1556,6 @@
_lib.SSL_set_shutdown(self._ssl, state)
-
def state_string(self):
"""
Get a verbose state description
@@ -1648,7 +1575,6 @@
self._ssl.s3.server_random,
_lib.SSL3_RANDOM_SIZE)[:]
-
def client_random(self):
"""
Get a copy of the client hello nonce.
@@ -1661,7 +1587,6 @@
self._ssl.s3.client_random,
_lib.SSL3_RANDOM_SIZE)[:]
-
def master_key(self):
"""
Get a copy of the master key.
@@ -1674,7 +1599,6 @@
self._ssl.session.master_key,
self._ssl.session.master_key_length)[:]
-
def sock_shutdown(self, *args, **kwargs):
"""
See shutdown(2)
@@ -1683,7 +1607,6 @@
"""
return self._socket.shutdown(*args, **kwargs)
-
def get_peer_certificate(self):
"""
Retrieve the other side's certificate (if any)
@@ -1697,7 +1620,6 @@
return pycert
return None
-
def get_peer_cert_chain(self):
"""
Retrieve the other side's certificate (if any)
@@ -1718,7 +1640,6 @@
result.append(pycert)
return result
-
def want_read(self):
"""
Checks if more data has to be read from the transport layer to complete an
@@ -1728,7 +1649,6 @@
"""
return _lib.SSL_want_read(self._ssl)
-
def want_write(self):
"""
Checks if there is data to write to the transport layer to complete an
@@ -1738,7 +1658,6 @@
"""
return _lib.SSL_want_write(self._ssl)
-
def set_accept_state(self):
"""
Set the connection to work in server mode. The handshake will be handled
@@ -1748,7 +1667,6 @@
"""
_lib.SSL_set_accept_state(self._ssl)
-
def set_connect_state(self):
"""
Set the connection to work in client mode. The handshake will be handled
@@ -1758,7 +1676,6 @@
"""
_lib.SSL_set_connect_state(self._ssl)
-
def get_session(self):
"""
Returns the Session currently used.
@@ -1774,7 +1691,6 @@
pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
return pysession
-
def set_session(self, session):
"""
Set the session to be used when the TLS/SSL connection is established.
@@ -1789,7 +1705,6 @@
if not result:
_raise_current_error()
-
def _get_finished_message(self, function):
"""
Helper to implement :py:meth:`get_finished` and
@@ -1824,7 +1739,6 @@
function(self._ssl, buf, size)
return _ffi.buffer(buf, size)[:]
-
def get_finished(self):
"""
Obtain the latest `handshake finished` message sent to the peer.
@@ -1835,7 +1749,6 @@
"""
return self._get_finished_message(_lib.SSL_get_finished)
-
def get_peer_finished(self):
"""
Obtain the latest `handshake finished` message received from the peer.
@@ -1846,7 +1759,6 @@
"""
return self._get_finished_message(_lib.SSL_get_peer_finished)
-
def get_cipher_name(self):
"""
Obtain the name of the currently used cipher.
@@ -1862,7 +1774,6 @@
name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
return name.decode("utf-8")
-
def get_cipher_bits(self):
"""
Obtain the number of secret bits of the currently used cipher.
@@ -1877,7 +1788,6 @@
else:
return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
-
def get_cipher_version(self):
"""
Obtain the protocol version of the currently used cipher.
@@ -1893,7 +1803,6 @@
version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
return version.decode("utf-8")
-
def get_protocol_version_name(self):
"""
Obtain the protocol version of the current connection.
@@ -1906,7 +1815,6 @@
version = _ffi.string(_lib.SSL_get_version(self._ssl))
return version.decode("utf-8")
-
def get_protocol_version(self):
"""
Obtain the protocol version of the current connection.
@@ -1918,7 +1826,6 @@
version = _lib.SSL_version(self._ssl)
return version
-
@_requires_npn
def get_next_proto_negotiated(self):
"""
@@ -1931,7 +1838,6 @@
return _ffi.buffer(data[0], data_len[0])[:]
-
@_requires_alpn
def set_alpn_protos(self, protos):
"""
@@ -1955,7 +1861,6 @@
input_str_len = _ffi.cast("unsigned", len(protostr))
_lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
-
@_requires_alpn
def get_alpn_proto_negotiated(self):
"""
@@ -1972,7 +1877,6 @@
return _ffi.buffer(data[0], data_len[0])[:]
-
ConnectionType = Connection
# This is similar to the initialization calls at the end of OpenSSL/crypto.py
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py
index a4b29e3..074ef3d 100644
--- a/OpenSSL/_util.py
+++ b/OpenSSL/_util.py
@@ -5,6 +5,7 @@
from cryptography.hazmat.bindings.openssl.binding import Binding
binding = Binding()
+binding.init_static_locks()
ffi = binding.ffi
lib = binding.lib
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index 446f7f8..845c0c1 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -29,7 +29,6 @@
TYPE_DSA = _lib.EVP_PKEY_DSA
-
class Error(Exception):
"""
An error occurred in an `OpenSSL.crypto` API.
@@ -39,7 +38,6 @@
_raise_current_error = partial(_exception_from_error_queue, Error)
-
def _untested_error(where):
"""
An OpenSSL API failed somehow. Additionally, the failure which was
@@ -49,7 +47,6 @@
raise RuntimeError("Unknown %s failure" % (where,))
-
def _new_mem_buf(buffer=None):
"""
Allocate a new OpenSSL memory BIO.
@@ -77,7 +74,6 @@
return bio
-
def _bio_to_string(bio):
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
@@ -87,7 +83,6 @@
return _ffi.buffer(result_buffer[0], buffer_length)[:]
-
def _set_asn1_time(boundary, when):
"""
The the time value of an ASN1 time object.
@@ -118,7 +113,6 @@
_untested_error()
-
def _get_asn1_time(timestamp):
"""
Retrieve the time value of an ASN1 time object.
@@ -157,7 +151,6 @@
return string_result
-
class PKey(object):
"""
A class representing an DSA or RSA public key or key pair.
@@ -170,7 +163,6 @@
self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
self._initialized = False
-
def generate_key(self, type, bits):
"""
Generate a key pair of the given type, with the given number of bits.
@@ -237,7 +229,6 @@
self._initialized = True
-
def check(self):
"""
Check the consistency of an RSA private key.
@@ -262,7 +253,6 @@
return True
_raise_current_error()
-
def type(self):
"""
Returns the type of the key
@@ -271,7 +261,6 @@
"""
return self._pkey.type
-
def bits(self):
"""
Returns the number of bits of the key
@@ -282,7 +271,6 @@
PKeyType = PKey
-
class _EllipticCurve(object):
"""
A representation of a supported elliptic curve.
@@ -307,7 +295,6 @@
return super(_EllipticCurve, self).__ne__(other)
return NotImplemented
-
@classmethod
def _load_elliptic_curves(cls, lib):
"""
@@ -330,7 +317,6 @@
for c in builtin_curves)
return set()
-
@classmethod
def _get_elliptic_curves(cls, lib):
"""
@@ -345,7 +331,6 @@
cls._curves = cls._load_elliptic_curves(lib)
return cls._curves
-
@classmethod
def from_nid(cls, lib, nid):
"""
@@ -363,7 +348,6 @@
"""
return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
-
def __init__(self, lib, nid, name):
"""
:param _lib: The :py:mod:`cryptography` binding instance used to
@@ -381,11 +365,9 @@
self._nid = nid
self.name = name
-
def __repr__(self):
return "<Curve %r>" % (self.name,)
-
def _to_EC_KEY(self):
"""
Create a new OpenSSL EC_KEY structure initialized to use this curve.
@@ -397,7 +379,6 @@
return _ffi.gc(key, _lib.EC_KEY_free)
-
def get_elliptic_curves():
"""
Return a set of objects representing the elliptic curves supported in the
@@ -413,7 +394,6 @@
return _EllipticCurve._get_elliptic_curves(_lib)
-
def get_elliptic_curve(name):
"""
Return a single curve object selected by name.
@@ -432,7 +412,6 @@
raise ValueError("unknown curve name", name)
-
class X509Name(object):
"""
An X.509 Distinguished Name.
@@ -467,7 +446,6 @@
name = _lib.X509_NAME_dup(name._name)
self._name = _ffi.gc(name, _lib.X509_NAME_free)
-
def __setattr__(self, name, value):
if name.startswith('_'):
return super(X509Name, self).__setattr__(name, value)
@@ -476,7 +454,7 @@
# isinstance.
if type(name) is not str:
raise TypeError("attribute name must be string, not '%.200s'" % (
- type(value).__name__,))
+ type(value).__name__,))
nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
@@ -504,7 +482,6 @@
if not add_result:
_raise_current_error()
-
def __getattr__(self, name):
"""
Find attribute. An X509Name object has the following attributes:
@@ -545,7 +522,6 @@
_lib.OPENSSL_free(result_buffer[0])
return result
-
def _cmp(op):
def f(self, other):
if not isinstance(other, X509Name):
@@ -578,7 +554,6 @@
return "<X509Name object '%s'>" % (
_native(_ffi.string(result_buffer)),)
-
def hash(self):
"""
Return an integer representation of the first four bytes of the
@@ -591,7 +566,6 @@
"""
return _lib.X509_NAME_hash(self._name)
-
def der(self):
"""
Return the DER encoding of this name.
@@ -609,7 +583,6 @@
_lib.OPENSSL_free(result_buffer[0])
return string_result
-
def get_components(self):
"""
Returns the components of this name, as a sequence of 2-tuples.
@@ -628,19 +601,17 @@
name = _lib.OBJ_nid2sn(nid)
result.append((
- _ffi.string(name),
- _ffi.string(
- _lib.ASN1_STRING_data(fval),
- _lib.ASN1_STRING_length(fval))))
+ _ffi.string(name),
+ _ffi.string(
+ _lib.ASN1_STRING_data(fval),
+ _lib.ASN1_STRING_length(fval))))
return result
-
X509NameType = X509Name
-
class X509Extension(object):
"""
An X.509 v3 certificate extension.
@@ -677,7 +648,8 @@
# Initialize the subject and issuer, if appropriate. ctx is a local,
# and as far as I can tell none of the X509V3_* APIs invoked here steal
- # any references, so no need to mess with reference counts or duplicates.
+ # any references, so no need to mess with reference counts or
+ # duplicates.
if issuer is not None:
if not isinstance(issuer, X509):
raise TypeError("issuer must be an X509 instance")
@@ -702,7 +674,6 @@
_raise_current_error()
self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
-
@property
def _nid(self):
return _lib.OBJ_obj2nid(self._extension.object)
@@ -711,7 +682,7 @@
_lib.GEN_EMAIL: "email",
_lib.GEN_DNS: "DNS",
_lib.GEN_URI: "URI",
- }
+ }
def _subjectAltNameString(self):
method = _lib.X509V3_EXT_get(self._extension)
@@ -749,7 +720,6 @@
parts.append(label + ":" + value)
return ", ".join(parts)
-
def __str__(self):
"""
:return: a nice text representation of the extension
@@ -765,7 +735,6 @@
return _native(_bio_to_string(bio))
-
def get_critical(self):
"""
Returns the critical field of this X.509 extension.
@@ -774,7 +743,6 @@
"""
return _lib.X509_EXTENSION_get_critical(self._extension)
-
def get_short_name(self):
"""
Returns the short type name of this X.509 extension.
@@ -790,7 +758,6 @@
nid = _lib.OBJ_obj2nid(obj)
return _ffi.string(_lib.OBJ_nid2sn(nid))
-
def get_data(self):
"""
Returns the data of the X509 extension, encoded as ASN.1.
@@ -807,20 +774,18 @@
return _ffi.buffer(char_result, result_length)[:]
-
X509ExtensionType = X509Extension
-
class X509Req(object):
"""
An X.509 certificate signing requests.
"""
+
def __init__(self):
req = _lib.X509_REQ_new()
self._req = _ffi.gc(req, _lib.X509_REQ_free)
-
def set_pubkey(self, pkey):
"""
Set the public key of the certificate signing request.
@@ -835,7 +800,6 @@
# TODO: This is untested.
_raise_current_error()
-
def get_pubkey(self):
"""
Get the public key of the certificate signing request.
@@ -852,7 +816,6 @@
pkey._only_public = True
return pkey
-
def set_version(self, version):
"""
Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
@@ -865,7 +828,6 @@
if not set_result:
_raise_current_error()
-
def get_version(self):
"""
Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
@@ -876,7 +838,6 @@
"""
return _lib.X509_REQ_get_version(self._req)
-
def get_subject(self):
"""
Return the subject of this certificate signing request.
@@ -899,7 +860,6 @@
return name
-
def add_extensions(self, extensions):
"""
Add extensions to the certificate signing request.
@@ -927,7 +887,6 @@
# TODO: This is untested.
_raise_current_error()
-
def get_extensions(self):
"""
Get X.509 extensions in the certificate signing request.
@@ -945,7 +904,6 @@
exts.append(ext)
return exts
-
def sign(self, pkey, digest):
"""
Sign the certificate signing request with this key and digest type.
@@ -972,7 +930,6 @@
# TODO: This is untested.
_raise_current_error()
-
def verify(self, pkey):
"""
Verifies the signature on this certificate signing request.
@@ -994,21 +951,19 @@
return result
-
X509ReqType = X509Req
-
class X509(object):
"""
An X.509 certificate.
"""
+
def __init__(self):
# TODO Allocation failure? And why not __new__ instead of __init__?
x509 = _lib.X509_new()
self._x509 = _ffi.gc(x509, _lib.X509_free)
-
def set_version(self, version):
"""
Set the version number of the certificate.
@@ -1023,7 +978,6 @@
_lib.X509_set_version(self._x509, version)
-
def get_version(self):
"""
Return the version number of the certificate.
@@ -1033,7 +987,6 @@
"""
return _lib.X509_get_version(self._x509)
-
def get_pubkey(self):
"""
Get the public key of the certificate.
@@ -1049,7 +1002,6 @@
pkey._only_public = True
return pkey
-
def set_pubkey(self, pkey):
"""
Set the public key of the certificate.
@@ -1066,7 +1018,6 @@
if not set_result:
_raise_current_error()
-
def sign(self, pkey, digest):
"""
Sign the certificate with this key and digest type.
@@ -1096,7 +1047,6 @@
if not sign_result:
_raise_current_error()
-
def get_signature_algorithm(self):
"""
Return the signature algorithm used in the certificate.
@@ -1114,7 +1064,6 @@
raise ValueError("Undefined signature algorithm")
return _ffi.string(_lib.OBJ_nid2ln(nid))
-
def digest(self, digest_name):
"""
Return the digest of the X509 object.
@@ -1142,9 +1091,8 @@
_raise_current_error()
return b":".join([
- b16encode(ch).upper() for ch
- in _ffi.buffer(result_buffer, result_length[0])])
-
+ b16encode(ch).upper() for ch
+ in _ffi.buffer(result_buffer, result_length[0])])
def subject_name_hash(self):
"""
@@ -1155,7 +1103,6 @@
"""
return _lib.X509_subject_name_hash(self._x509)
-
def set_serial_number(self, serial):
"""
Set the serial number of the certificate.
@@ -1197,7 +1144,6 @@
# TODO Not tested
_raise_current_error()
-
def get_serial_number(self):
"""
Return the serial number of this certificate.
@@ -1218,7 +1164,6 @@
finally:
_lib.BN_free(bignum_serial)
-
def gmtime_adj_notAfter(self, amount):
"""
Adjust the time stamp on which the certificate stops being valid.
@@ -1234,7 +1179,6 @@
notAfter = _lib.X509_get_notAfter(self._x509)
_lib.X509_gmtime_adj(notAfter, amount)
-
def gmtime_adj_notBefore(self, amount):
"""
Adjust the timestamp on which the certificate starts being valid.
@@ -1248,7 +1192,6 @@
notBefore = _lib.X509_get_notBefore(self._x509)
_lib.X509_gmtime_adj(notBefore, amount)
-
def has_expired(self):
"""
Check whether the certificate has expired.
@@ -1262,11 +1205,9 @@
return _lib.ASN1_UTCTIME_cmp_time_t(
_ffi.cast('ASN1_UTCTIME*', notAfter), now) < 0
-
def _get_boundary_time(self, which):
return _get_asn1_time(which(self._x509))
-
def get_notBefore(self):
"""
Get the timestamp at which the certificate starts being valid.
@@ -1282,11 +1223,9 @@
"""
return self._get_boundary_time(_lib.X509_get_notBefore)
-
def _set_boundary_time(self, which, when):
return _set_asn1_time(which(self._x509), when)
-
def set_notBefore(self, when):
"""
Set the timestamp at which the certificate starts being valid.
@@ -1304,7 +1243,6 @@
"""
return self._set_boundary_time(_lib.X509_get_notBefore, when)
-
def get_notAfter(self):
"""
Get the timestamp at which the certificate stops being valid.
@@ -1320,7 +1258,6 @@
"""
return self._get_boundary_time(_lib.X509_get_notAfter)
-
def set_notAfter(self, when):
"""
Set the timestamp at which the certificate stops being valid.
@@ -1338,7 +1275,6 @@
"""
return self._set_boundary_time(_lib.X509_get_notAfter, when)
-
def _get_name(self, which):
name = X509Name.__new__(X509Name)
name._name = which(self._x509)
@@ -1352,7 +1288,6 @@
return name
-
def _set_name(self, which, name):
if not isinstance(name, X509Name):
raise TypeError("name must be an X509Name")
@@ -1361,7 +1296,6 @@
# TODO: This is untested.
_raise_current_error()
-
def get_issuer(self):
"""
Return the issuer of this certificate.
@@ -1374,7 +1308,6 @@
"""
return self._get_name(_lib.X509_get_issuer_name)
-
def set_issuer(self, issuer):
"""
Set the issuer of this certificate.
@@ -1386,7 +1319,6 @@
"""
return self._set_name(_lib.X509_set_issuer_name, issuer)
-
def get_subject(self):
"""
Return the subject of this certificate.
@@ -1399,7 +1331,6 @@
"""
return self._get_name(_lib.X509_get_subject_name)
-
def set_subject(self, subject):
"""
Set the subject of this certificate.
@@ -1411,7 +1342,6 @@
"""
return self._set_name(_lib.X509_set_subject_name, subject)
-
def get_extension_count(self):
"""
Get the number of extensions on this certificate.
@@ -1423,7 +1353,6 @@
"""
return _lib.X509_get_ext_count(self._x509)
-
def add_extensions(self, extensions):
"""
Add extensions to the certificate.
@@ -1440,7 +1369,6 @@
if not add_result:
_raise_current_error()
-
def get_extension(self, index):
"""
Get a specific extension of the certificate by index.
@@ -1465,20 +1393,18 @@
return ext
-
X509Type = X509
-
class X509Store(object):
"""
An X509 certificate store.
"""
+
def __init__(self):
store = _lib.X509_STORE_new()
self._store = _ffi.gc(store, _lib.X509_STORE_free)
-
def add_cert(self, cert):
"""
Adds the certificate :py:data:`cert` to this store.
@@ -1509,6 +1435,7 @@
:ivar certificate: The certificate which caused verificate failure.
:type certificate: :class:`X509`
"""
+
def __init__(self, message, certificate):
super(X509StoreContextError, self).__init__(message)
self.certificate = certificate
@@ -1553,7 +1480,6 @@
# :py:meth:`_init` have no adverse affect.
self._init()
-
def _init(self):
"""
Set up the store context for a subsequent verification operation.
@@ -1562,7 +1488,6 @@
if ret <= 0:
_raise_current_error()
-
def _cleanup(self):
"""
Internally cleans up the store context.
@@ -1572,7 +1497,6 @@
"""
_lib.X509_STORE_CTX_cleanup(self._store_ctx)
-
def _exception_from_context(self):
"""
Convert an OpenSSL native context error failure into a Python
@@ -1595,7 +1519,6 @@
pycert._x509 = _ffi.gc(_cert, _lib.X509_free)
return X509StoreContextError(errors, pycert)
-
def set_store(self, store):
"""
Set the context's trust store.
@@ -1607,7 +1530,6 @@
"""
self._store = store
-
def verify_certificate(self):
"""
Verify a certificate in a context.
@@ -1629,7 +1551,6 @@
raise self._exception_from_context()
-
def load_certificate(type, buffer):
"""
Load a certificate from a buffer
@@ -1739,7 +1660,6 @@
return _bio_to_string(bio)
-
def _X509_REVOKED_dup(original):
copy = _lib.X509_REVOKED_new()
if copy == _ffi.NULL:
@@ -1766,7 +1686,6 @@
return copy
-
class Revoked(object):
"""
A certificate revocation.
@@ -1790,7 +1709,6 @@
revoked = _lib.X509_REVOKED_new()
self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free)
-
def set_serial(self, hex_str):
"""
Set the serial number.
@@ -1815,7 +1733,6 @@
_lib.ASN1_INTEGER_free)
_lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
-
def get_serial(self):
"""
Get the serial number.
@@ -1835,7 +1752,6 @@
return _bio_to_string(bio)
-
def _delete_reason(self):
stack = self._revoked.extensions
for i in range(_lib.sk_X509_EXTENSION_num(stack)):
@@ -1845,7 +1761,6 @@
_lib.sk_X509_EXTENSION_delete(stack, i)
break
-
def set_reason(self, reason):
"""
Set the reason of this revocation.
@@ -1889,7 +1804,6 @@
# TODO: This is untested.
_raise_current_error()
-
def get_reason(self):
"""
Set the reason of this revocation.
@@ -1917,7 +1831,6 @@
return _bio_to_string(bio)
-
def all_reasons(self):
"""
Return a list of all the supported reason strings.
@@ -1930,7 +1843,6 @@
"""
return self._crl_reasons[:]
-
def set_rev_date(self, when):
"""
Set the revocation timestamp.
@@ -1941,7 +1853,6 @@
"""
return _set_asn1_time(self._revoked.revocationDate, when)
-
def get_rev_date(self):
"""
Get the revocation timestamp.
@@ -1952,11 +1863,11 @@
return _get_asn1_time(self._revoked.revocationDate)
-
class CRL(object):
"""
A certificate revocation list.
"""
+
def __init__(self):
"""
Create a new empty certificate revocation list.
@@ -1964,7 +1875,6 @@
crl = _lib.X509_CRL_new()
self._crl = _ffi.gc(crl, _lib.X509_CRL_free)
-
def get_revoked(self):
"""
Return the revocations in this certificate revocation list.
@@ -1986,7 +1896,6 @@
if results:
return tuple(results)
-
def add_revoked(self, revoked):
"""
Add a revoked (by value not reference) to the CRL structure
@@ -2010,7 +1919,6 @@
# TODO: This is untested.
_raise_current_error()
-
def export(self, cert, key, type=FILETYPE_PEM, days=100,
digest=_UNSPECIFIED):
"""
@@ -2057,7 +1965,8 @@
# TODO: This is untested.
_raise_current_error()
- # A scratch time object to give different values to different CRL fields
+ # A scratch time object to give different values to different CRL
+ # fields
sometime = _lib.ASN1_TIME_new()
if sometime == _ffi.NULL:
# TODO: This is untested.
@@ -2093,7 +2002,6 @@
CRLType = CRL
-
class PKCS7(object):
def type_is_signed(self):
"""
@@ -2105,7 +2013,6 @@
return True
return False
-
def type_is_enveloped(self):
"""
Check if this NID_pkcs7_enveloped object
@@ -2116,7 +2023,6 @@
return True
return False
-
def type_is_signedAndEnveloped(self):
"""
Check if this NID_pkcs7_signedAndEnveloped object
@@ -2127,7 +2033,6 @@
return True
return False
-
def type_is_data(self):
"""
Check if this NID_pkcs7_data object
@@ -2138,7 +2043,6 @@
return True
return False
-
def get_type_name(self):
"""
Returns the type name of the PKCS7 structure
@@ -2152,18 +2056,17 @@
PKCS7Type = PKCS7
-
class PKCS12(object):
"""
A PKCS #12 archive.
"""
+
def __init__(self):
self._pkey = None
self._cert = None
self._cacerts = None
self._friendlyname = None
-
def get_certificate(self):
"""
Get the certificate in the PKCS #12 structure.
@@ -2173,7 +2076,6 @@
"""
return self._cert
-
def set_certificate(self, cert):
"""
Set the certificate in the PKCS #12 structure.
@@ -2187,7 +2089,6 @@
raise TypeError("cert must be an X509 instance")
self._cert = cert
-
def get_privatekey(self):
"""
Get the private key in the PKCS #12 structure.
@@ -2197,7 +2098,6 @@
"""
return self._pkey
-
def set_privatekey(self, pkey):
"""
Set the certificate portion of the PKCS #12 structure.
@@ -2211,7 +2111,6 @@
raise TypeError("pkey must be a PKey instance")
self._pkey = pkey
-
def get_ca_certificates(self):
"""
Get the CA certificates in the PKCS #12 structure.
@@ -2223,7 +2122,6 @@
if self._cacerts is not None:
return tuple(self._cacerts)
-
def set_ca_certificates(self, cacerts):
"""
Replace or set the CA certificates within the PKCS12 object.
@@ -2243,7 +2141,6 @@
raise TypeError("iterable must only contain X509 instances")
self._cacerts = cacerts
-
def set_friendlyname(self, name):
"""
Set the friendly name in the PKCS #12 structure.
@@ -2259,7 +2156,6 @@
raise TypeError("name must be a byte string or None (not %r)" % (name,))
self._friendlyname = name
-
def get_friendlyname(self):
"""
Get the friendly name in the PKCS# 12 structure.
@@ -2269,7 +2165,6 @@
"""
return self._friendlyname
-
def export(self, passphrase=None, iter=2048, maciter=1):
"""
Dump a PKCS12 object as a string.
@@ -2331,20 +2226,18 @@
return _bio_to_string(bio)
-
PKCS12Type = PKCS12
-
class NetscapeSPKI(object):
"""
A Netscape SPKI object.
"""
+
def __init__(self):
spki = _lib.NETSCAPE_SPKI_new()
self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free)
-
def sign(self, pkey, digest):
"""
Sign the certificate request with this key and digest type.
@@ -2372,7 +2265,6 @@
# TODO: This is untested.
_raise_current_error()
-
def verify(self, key):
"""
Verifies a signature on a certificate request.
@@ -2391,7 +2283,6 @@
_raise_current_error()
return True
-
def b64_encode(self):
"""
Generate a base64 encoded representation of this SPKI object.
@@ -2404,7 +2295,6 @@
_lib.CRYPTO_free(encoded)
return result
-
def get_pubkey(self):
"""
Get the public key of this certificate.
@@ -2421,7 +2311,6 @@
pkey._only_public = True
return pkey
-
def set_pubkey(self, pkey):
"""
Set the public key of the certificate
@@ -2435,11 +2324,9 @@
_raise_current_error()
-
NetscapeSPKIType = NetscapeSPKI
-
class _PassphraseHelper(object):
def __init__(self, type, passphrase, more_args=False, truncate=False):
if type != FILETYPE_PEM and passphrase is not None:
@@ -2449,7 +2336,6 @@
self._truncate = truncate
self._problems = []
-
@property
def callback(self):
if self._passphrase is None:
@@ -2461,7 +2347,6 @@
else:
raise TypeError("Last argument must be string or callable")
-
@property
def callback_args(self):
if self._passphrase is None:
@@ -2473,7 +2358,6 @@
else:
raise TypeError("Last argument must be string or callable")
-
def raise_if_problem(self, exceptionType=Error):
try:
_exception_from_error_queue(exceptionType)
@@ -2483,7 +2367,6 @@
raise self._problems[0]
return from_queue
-
def _read_passphrase(self, buf, size, rwflag, userdata):
try:
if self._more_args:
@@ -2505,7 +2388,6 @@
return 0
-
def load_privatekey(type, buffer, passphrase=None):
"""
Load a private key from a buffer
@@ -2541,7 +2423,6 @@
return pkey
-
def dump_certificate_request(type, req):
"""
Dump a certificate request to a buffer
@@ -2568,7 +2449,6 @@
return _bio_to_string(bio)
-
def load_certificate_request(type, buffer):
"""
Load a certificate request from a buffer
@@ -2598,7 +2478,6 @@
return x509req
-
def sign(pkey, data, digest):
"""
Sign data with a digest
@@ -2633,7 +2512,6 @@
return _ffi.buffer(signature_buffer, signature_length[0])[:]
-
def verify(cert, signature, data, digest):
"""
Verify a signature.
@@ -2696,7 +2574,6 @@
return result
-
def load_pkcs7_data(type, buffer):
"""
Load pkcs7 data from a buffer
@@ -2727,7 +2604,6 @@
return pypkcs7
-
def load_pkcs12(buffer, passphrase=None):
"""
Load a PKCS12 object from a buffer
@@ -2808,35 +2684,6 @@
return pkcs12
-def _initialize_openssl_threads(get_ident, Lock):
- import _ssl
- return
-
- locks = list(Lock() for n in range(_lib.CRYPTO_num_locks()))
-
- def locking_function(mode, index, filename, line):
- if mode & _lib.CRYPTO_LOCK:
- locks[index].acquire()
- else:
- locks[index].release()
-
- _lib.CRYPTO_set_id_callback(
- _ffi.callback("unsigned long (*)(void)", get_ident))
-
- _lib.CRYPTO_set_locking_callback(
- _ffi.callback(
- "void (*)(int, int, const char*, int)", locking_function))
-
-
-try:
- from thread import get_ident
- from threading import Lock
-except ImportError:
- pass
-else:
- _initialize_openssl_threads(get_ident, Lock)
- del get_ident, Lock
-
# There are no direct unit tests for this initialization. It is tested
# indirectly since it is necessary for functions like dump_privatekey when
# using encryption.
@@ -2853,7 +2700,6 @@
_lib.SSL_load_error_strings()
-
# Set the default string mask to match OpenSSL upstream (since 2005) and
# RFC5280 recommendations.
_lib.ASN1_STRING_set_default_mask_asc(b'utf8only')
diff --git a/OpenSSL/test/test_crypto.py b/OpenSSL/test/test_crypto.py
index 5b8ab77..409410a 100644
--- a/OpenSSL/test/test_crypto.py
+++ b/OpenSSL/test/test_crypto.py
@@ -36,6 +36,7 @@
)
from OpenSSL._util import native, lib
+
def normalize_certificate_pem(pem):
return dump_certificate(FILETYPE_PEM, load_certificate(FILETYPE_PEM, pem))
@@ -430,7 +431,6 @@
self.x509.set_notBefore(now)
self.x509.set_notAfter(expire)
-
def tearDown(self):
"""
Forget all of the pyOpenSSL objects so they can be garbage collected,
@@ -439,7 +439,6 @@
self.pkey = self.req = self.x509 = self.subject = None
super(X509ExtTests, self).tearDown()
-
def test_str(self):
"""
The string representation of :py:class:`X509Extension` instances as returned by
@@ -451,7 +450,6 @@
str(X509Extension(b('basicConstraints'), True, b('CA:false'))),
'CA:FALSE')
-
def test_type(self):
"""
:py:class:`X509Extension` and :py:class:`X509ExtensionType` refer to the same type object
@@ -462,7 +460,6 @@
X509Extension,
'X509Extension', b('basicConstraints'), True, b('CA:true'))
-
def test_construction(self):
"""
:py:class:`X509Extension` accepts an extension type name, a critical flag,
@@ -481,7 +478,6 @@
"%r is of type %r, should be %r" % (
comment, type(comment), X509ExtensionType))
-
def test_invalid_extension(self):
"""
:py:class:`X509Extension` raises something if it is passed a bad extension
@@ -500,7 +496,6 @@
Error, X509Extension, b('proxyCertInfo'), True,
b('language:id-ppl-anyLanguage,pathlen:1,policy:text:AB'))
-
def test_get_critical(self):
"""
:py:meth:`X509ExtensionType.get_critical` returns the value of the
@@ -511,7 +506,6 @@
ext = X509Extension(b('basicConstraints'), False, b('CA:true'))
self.assertFalse(ext.get_critical())
-
def test_get_short_name(self):
"""
:py:meth:`X509ExtensionType.get_short_name` returns a string giving the short
@@ -522,7 +516,6 @@
ext = X509Extension(b('nsComment'), True, b('foo bar'))
self.assertEqual(ext.get_short_name(), b('nsComment'))
-
def test_get_data(self):
"""
:py:meth:`X509Extension.get_data` returns a string giving the data of the
@@ -532,7 +525,6 @@
# Expect to get back the DER encoded form of CA:true.
self.assertEqual(ext.get_data(), b('0\x03\x01\x01\xff'))
-
def test_get_data_wrong_args(self):
"""
:py:meth:`X509Extension.get_data` raises :py:exc:`TypeError` if passed any arguments.
@@ -542,7 +534,6 @@
self.assertRaises(TypeError, ext.get_data, "foo")
self.assertRaises(TypeError, ext.get_data, 7)
-
def test_unused_subject(self):
"""
The :py:data:`subject` parameter to :py:class:`X509Extension` may be provided for an
@@ -557,7 +548,6 @@
self.assertTrue(b('X509v3 Basic Constraints:') in text)
self.assertTrue(b('CA:TRUE') in text)
-
def test_subject(self):
"""
If an extension requires a subject, the :py:data:`subject` parameter to
@@ -570,7 +560,6 @@
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue(b('X509v3 Subject Key Identifier:') in text)
-
def test_missing_subject(self):
"""
If an extension requires a subject and the :py:data:`subject` parameter is
@@ -579,7 +568,6 @@
self.assertRaises(
Error, X509Extension, b('subjectKeyIdentifier'), False, b('hash'))
-
def test_invalid_subject(self):
"""
If the :py:data:`subject` parameter is given a value which is not an
@@ -591,7 +579,6 @@
X509Extension,
'basicConstraints', False, 'CA:TRUE', subject=badObj)
-
def test_unused_issuer(self):
"""
The :py:data:`issuer` parameter to :py:class:`X509Extension` may be provided for an
@@ -605,7 +592,6 @@
self.assertTrue(b('X509v3 Basic Constraints:') in text)
self.assertTrue(b('CA:TRUE') in text)
-
def test_issuer(self):
"""
If an extension requires an issuer, the :py:data:`issuer` parameter to
@@ -620,7 +606,6 @@
self.assertTrue(b('X509v3 Authority Key Identifier:') in text)
self.assertTrue(b('DirName:/CN=Yoda root CA') in text)
-
def test_missing_issuer(self):
"""
If an extension requires an issue and the :py:data:`issuer` parameter is given
@@ -632,7 +617,6 @@
b('authorityKeyIdentifier'), False,
b('keyid:always,issuer:always'))
-
def test_invalid_issuer(self):
"""
If the :py:data:`issuer` parameter is given a value which is not an
@@ -646,11 +630,11 @@
issuer=badObj)
-
class PKeyTests(TestCase):
"""
Unit tests for :py:class:`OpenSSL.crypto.PKey`.
"""
+
def test_type(self):
"""
:py:class:`PKey` and :py:class:`PKeyType` refer to the same type object
@@ -659,7 +643,6 @@
self.assertIdentical(PKey, PKeyType)
self.assertConsistentType(PKey, 'PKey')
-
def test_construction(self):
"""
:py:class:`PKey` takes no arguments and returns a new :py:class:`PKey` instance.
@@ -670,7 +653,6 @@
isinstance(key, PKeyType),
"%r is of type %r, should be %r" % (key, type(key), PKeyType))
-
def test_pregeneration(self):
"""
:py:attr:`PKeyType.bits` and :py:attr:`PKeyType.type` return :py:data:`0` before the key is
@@ -682,7 +664,6 @@
self.assertEqual(key.bits(), 0)
self.assertRaises(TypeError, key.check)
-
def test_failedGeneration(self):
"""
:py:meth:`PKeyType.generate_key` takes two arguments, the first giving the key
@@ -717,7 +698,6 @@
# self.assertRaises(Error, key.generate_key, TYPE_DSA, -7)
-
def test_rsaGeneration(self):
"""
:py:meth:`PKeyType.generate_key` generates an RSA key when passed
@@ -730,7 +710,6 @@
self.assertEqual(key.bits(), bits)
self.assertTrue(key.check())
-
def test_dsaGeneration(self):
"""
:py:meth:`PKeyType.generate_key` generates a DSA key when passed
@@ -746,7 +725,6 @@
# self.assertEqual(key.bits(), bits)
# self.assertRaises(TypeError, key.check)
-
def test_regeneration(self):
"""
:py:meth:`PKeyType.generate_key` can be called multiple times on the same
@@ -758,7 +736,6 @@
self.assertEqual(key.type(), type)
self.assertEqual(key.bits(), bits)
-
def test_inconsistentKey(self):
"""
:py:`PKeyType.check` returns :py:exc:`Error` if the key is not consistent.
@@ -766,7 +743,6 @@
key = load_privatekey(FILETYPE_PEM, inconsistentPrivateKeyPEM)
self.assertRaises(Error, key.check)
-
def test_check_wrong_args(self):
"""
:py:meth:`PKeyType.check` raises :py:exc:`TypeError` if called with any arguments.
@@ -775,7 +751,6 @@
self.assertRaises(TypeError, PKey().check, object())
self.assertRaises(TypeError, PKey().check, 1)
-
def test_check_public_key(self):
"""
:py:meth:`PKeyType.check` raises :py:exc:`TypeError` if only the public
@@ -790,11 +765,11 @@
self.assertRaises(TypeError, pub.check)
-
class X509NameTests(TestCase):
"""
Unit tests for :py:class:`OpenSSL.crypto.X509Name`.
"""
+
def _x509name(self, **attrs):
# XXX There's no other way to get a new X509Name yet.
name = X509().get_subject()
@@ -807,7 +782,6 @@
setattr(name, k, v)
return name
-
def test_type(self):
"""
The type of X509Name objects is :py:class:`X509NameType`.
@@ -822,7 +796,6 @@
"%r is of type %r, should be %r" % (
name, type(name), X509NameType))
-
def test_onlyStringAttributes(self):
"""
Attempting to set a non-:py:data:`str` attribute name on an :py:class:`X509NameType`
@@ -842,7 +815,6 @@
self.assertRaises(TypeError, setattr, name, None, "hello")
self.assertRaises(TypeError, setattr, name, 30, "hello")
-
def test_setInvalidAttribute(self):
"""
Attempting to set any attribute name on an :py:class:`X509NameType` instance for
@@ -852,7 +824,6 @@
name = self._x509name()
self.assertRaises(AttributeError, setattr, name, "no such thing", None)
-
def test_attributes(self):
"""
:py:class:`X509NameType` instances have attributes for each standard (?)
@@ -872,7 +843,6 @@
self.assertEqual(name.commonName, "quux")
self.assertEqual(name.CN, "quux")
-
def test_copy(self):
"""
:py:class:`X509Name` creates a new :py:class:`X509NameType` instance with all the same
@@ -893,7 +863,6 @@
name.emailAddress = "quux@example.com"
self.assertEqual(copy.emailAddress, "bar@example.com")
-
def test_repr(self):
"""
:py:func:`repr` passed an :py:class:`X509NameType` instance should return a string
@@ -905,7 +874,6 @@
repr(name),
"<X509Name object '/emailAddress=bar/CN=foo'>")
-
def test_comparison(self):
"""
:py:class:`X509NameType` instances should compare based on their NIDs.
@@ -980,7 +948,6 @@
assertGreaterThan(self._x509name(CN="def"),
self._x509name(CN="abc"))
-
def test_hash(self):
"""
:py:meth:`X509Name.hash` returns an integer hash based on the value of the
@@ -992,7 +959,6 @@
a.CN = "bar"
self.assertNotEqual(a.hash(), b.hash())
-
def test_der(self):
"""
:py:meth:`X509Name.der` returns the DER encoded form of the name.
@@ -1003,7 +969,6 @@
b('0\x1b1\x0b0\t\x06\x03U\x04\x06\x13\x02US'
'1\x0c0\n\x06\x03U\x04\x03\x0c\x03foo'))
-
def test_get_components(self):
"""
:py:meth:`X509Name.get_components` returns a :py:data:`list` of
@@ -1019,7 +984,6 @@
a.get_components(),
[(b("CN"), b("foo")), (b("OU"), b("bar"))])
-
def test_load_nul_byte_attribute(self):
"""
An :py:class:`OpenSSL.crypto.X509Name` from an
@@ -1031,7 +995,6 @@
self.assertEqual(
"null.python.org\x00example.org", subject.commonName)
-
def test_setAttributeFailure(self):
"""
If the value of an attribute cannot be set for some reason then
@@ -1042,11 +1005,11 @@
self.assertRaises(Error, setattr, name, "O", b"x" * 512)
-
class _PKeyInteractionTestsMixin:
"""
Tests which involve another thing and a PKey.
"""
+
def signable(self):
"""
Return something with a :py:meth:`set_pubkey`, :py:meth:`set_pubkey`,
@@ -1054,7 +1017,6 @@
"""
raise NotImplementedError()
-
def test_signWithUngenerated(self):
"""
:py:meth:`X509Req.sign` raises :py:exc:`ValueError` when pass a
@@ -1064,7 +1026,6 @@
key = PKey()
self.assertRaises(ValueError, request.sign, key, GOOD_DIGEST)
-
def test_signWithPublicKey(self):
"""
:py:meth:`X509Req.sign` raises :py:exc:`ValueError` when pass a
@@ -1077,7 +1038,6 @@
pub = request.get_pubkey()
self.assertRaises(ValueError, request.sign, pub, GOOD_DIGEST)
-
def test_signWithUnknownDigest(self):
"""
:py:meth:`X509Req.sign` raises :py:exc:`ValueError` when passed a digest name which is
@@ -1088,7 +1048,6 @@
key.generate_key(TYPE_RSA, 512)
self.assertRaises(ValueError, request.sign, key, BAD_DIGEST)
-
def test_sign(self):
"""
:py:meth:`X509Req.sign` succeeds when passed a private key object and a valid
@@ -1109,19 +1068,17 @@
self.assertRaises(Error, request.verify, key)
-
-
class X509ReqTests(TestCase, _PKeyInteractionTestsMixin):
"""
Tests for :py:class:`OpenSSL.crypto.X509Req`.
"""
+
def signable(self):
"""
Create and return a new :py:class:`X509Req`.
"""
return X509Req()
-
def test_type(self):
"""
:py:obj:`X509Req` and :py:obj:`X509ReqType` refer to the same type object and can be
@@ -1130,7 +1087,6 @@
self.assertIdentical(X509Req, X509ReqType)
self.assertConsistentType(X509Req, 'X509Req')
-
def test_construction(self):
"""
:py:obj:`X509Req` takes no arguments and returns an :py:obj:`X509ReqType` instance.
@@ -1140,7 +1096,6 @@
isinstance(request, X509ReqType),
"%r is of type %r, should be %r" % (request, type(request), X509ReqType))
-
def test_version(self):
"""
:py:obj:`X509ReqType.set_version` sets the X.509 version of the certificate
@@ -1154,7 +1109,6 @@
request.set_version(3)
self.assertEqual(request.get_version(), 3)
-
def test_version_wrong_args(self):
"""
:py:obj:`X509ReqType.set_version` raises :py:obj:`TypeError` if called with the wrong
@@ -1168,7 +1122,6 @@
self.assertRaises(TypeError, request.set_version, 1, 2)
self.assertRaises(TypeError, request.get_version, None)
-
def test_get_subject(self):
"""
:py:obj:`X509ReqType.get_subject` returns an :py:obj:`X509Name` for the subject of
@@ -1186,7 +1139,6 @@
subject.commonName = "bar"
self.assertEqual(subject.commonName, "bar")
-
def test_get_subject_wrong_args(self):
"""
:py:obj:`X509ReqType.get_subject` raises :py:obj:`TypeError` if called with any
@@ -1195,7 +1147,6 @@
request = X509Req()
self.assertRaises(TypeError, request.get_subject, None)
-
def test_add_extensions(self):
"""
:py:obj:`X509Req.add_extensions` accepts a :py:obj:`list` of :py:obj:`X509Extension`
@@ -1203,14 +1154,13 @@
"""
request = X509Req()
request.add_extensions([
- X509Extension(b('basicConstraints'), True, b('CA:false'))])
+ X509Extension(b('basicConstraints'), True, b('CA:false'))])
exts = request.get_extensions()
self.assertEqual(len(exts), 1)
self.assertEqual(exts[0].get_short_name(), b('basicConstraints'))
self.assertEqual(exts[0].get_critical(), 1)
self.assertEqual(exts[0].get_data(), b('0\x00'))
-
def test_get_extensions(self):
"""
:py:obj:`X509Req.get_extensions` returns a :py:obj:`list` of
@@ -1220,8 +1170,8 @@
exts = request.get_extensions()
self.assertEqual(exts, [])
request.add_extensions([
- X509Extension(b('basicConstraints'), True, b('CA:true')),
- X509Extension(b('keyUsage'), False, b('digitalSignature'))])
+ X509Extension(b('basicConstraints'), True, b('CA:true')),
+ X509Extension(b('keyUsage'), False, b('digitalSignature'))])
exts = request.get_extensions()
self.assertEqual(len(exts), 2)
self.assertEqual(exts[0].get_short_name(), b('basicConstraints'))
@@ -1231,7 +1181,6 @@
self.assertEqual(exts[1].get_critical(), 0)
self.assertEqual(exts[1].get_data(), b('\x03\x02\x07\x80'))
-
def test_add_extensions_wrong_args(self):
"""
:py:obj:`X509Req.add_extensions` raises :py:obj:`TypeError` if called with the wrong
@@ -1245,7 +1194,6 @@
self.assertRaises(ValueError, request.add_extensions, [object()])
self.assertRaises(TypeError, request.add_extensions, [], None)
-
def test_verify_wrong_args(self):
"""
:py:obj:`X509Req.verify` raises :py:obj:`TypeError` if called with zero
@@ -1257,7 +1205,6 @@
self.assertRaises(TypeError, request.verify, object())
self.assertRaises(TypeError, request.verify, PKey(), object())
-
def test_verify_uninitialized_key(self):
"""
:py:obj:`X509Req.verify` raises :py:obj:`OpenSSL.crypto.Error` if called
@@ -1267,7 +1214,6 @@
pkey = PKey()
self.assertRaises(Error, request.verify, pkey)
-
def test_verify_wrong_key(self):
"""
:py:obj:`X509Req.verify` raises :py:obj:`OpenSSL.crypto.Error` if called
@@ -1280,7 +1226,6 @@
another_pkey = load_privatekey(FILETYPE_PEM, client_key_pem)
self.assertRaises(Error, request.verify, another_pkey)
-
def test_verify_success(self):
"""
:py:obj:`X509Req.verify` returns :py:obj:`True` if called with a
@@ -1293,7 +1238,6 @@
self.assertEqual(True, request.verify(pkey))
-
class X509Tests(TestCase, _PKeyInteractionTestsMixin):
"""
Tests for :py:obj:`OpenSSL.crypto.X509`.
@@ -1320,13 +1264,13 @@
WpOdIpB8KksUTCzV591Nr1wd
-----END CERTIFICATE-----
"""
+
def signable(self):
"""
Create and return a new :py:obj:`X509`.
"""
return X509()
-
def test_type(self):
"""
:py:obj:`X509` and :py:obj:`X509Type` refer to the same type object and can be used
@@ -1335,7 +1279,6 @@
self.assertIdentical(X509, X509Type)
self.assertConsistentType(X509, 'X509')
-
def test_construction(self):
"""
:py:obj:`X509` takes no arguments and returns an instance of :py:obj:`X509Type`.
@@ -1351,7 +1294,6 @@
self.assertEqual(type(certificate), X509Type)
self.assertEqual(type(certificate), X509)
-
def test_get_version_wrong_args(self):
"""
:py:obj:`X509.get_version` raises :py:obj:`TypeError` if invoked with any arguments.
@@ -1359,7 +1301,6 @@
cert = X509()
self.assertRaises(TypeError, cert.get_version, None)
-
def test_set_version_wrong_args(self):
"""
:py:obj:`X509.set_version` raises :py:obj:`TypeError` if invoked with the wrong number
@@ -1370,7 +1311,6 @@
self.assertRaises(TypeError, cert.set_version, None)
self.assertRaises(TypeError, cert.set_version, 1, None)
-
def test_version(self):
"""
:py:obj:`X509.set_version` sets the certificate version number.
@@ -1380,7 +1320,6 @@
cert.set_version(1234)
self.assertEquals(cert.get_version(), 1234)
-
def test_get_serial_number_wrong_args(self):
"""
:py:obj:`X509.get_serial_number` raises :py:obj:`TypeError` if invoked with any
@@ -1389,7 +1328,6 @@
cert = X509()
self.assertRaises(TypeError, cert.get_serial_number, None)
-
def test_serial_number(self):
"""
The serial number of an :py:obj:`X509Type` can be retrieved and modified with
@@ -1410,7 +1348,6 @@
certificate.set_serial_number(2 ** 128 + 1)
self.assertEqual(certificate.get_serial_number(), 2 ** 128 + 1)
-
def _setBoundTest(self, which):
"""
:py:obj:`X509Type.set_notBefore` takes a string in the format of an ASN1
@@ -1447,7 +1384,6 @@
self.assertRaises(TypeError, set, b("20040203040506Z"), b("20040203040506Z"))
self.assertRaises(TypeError, get, b("foo bar"))
-
# XXX ASN1_TIME (not GENERALIZEDTIME)
def test_set_notBefore(self):
@@ -1458,7 +1394,6 @@
"""
self._setBoundTest("Before")
-
def test_set_notAfter(self):
"""
:py:obj:`X509Type.set_notAfter` takes a string in the format of an ASN1
@@ -1467,7 +1402,6 @@
"""
self._setBoundTest("After")
-
def test_get_notBefore(self):
"""
:py:obj:`X509Type.get_notBefore` returns a string in the format of an ASN1
@@ -1477,7 +1411,6 @@
cert = load_certificate(FILETYPE_PEM, self.pemData)
self.assertEqual(cert.get_notBefore(), b("20090325123658Z"))
-
def test_get_notAfter(self):
"""
:py:obj:`X509Type.get_notAfter` returns a string in the format of an ASN1
@@ -1487,7 +1420,6 @@
cert = load_certificate(FILETYPE_PEM, self.pemData)
self.assertEqual(cert.get_notAfter(), b("20170611123658Z"))
-
def test_gmtime_adj_notBefore_wrong_args(self):
"""
:py:obj:`X509Type.gmtime_adj_notBefore` raises :py:obj:`TypeError` if called with the
@@ -1498,7 +1430,6 @@
self.assertRaises(TypeError, cert.gmtime_adj_notBefore, None)
self.assertRaises(TypeError, cert.gmtime_adj_notBefore, 123, None)
-
def test_gmtime_adj_notBefore(self):
"""
:py:obj:`X509Type.gmtime_adj_notBefore` changes the not-before timestamp to be
@@ -1511,7 +1442,6 @@
not_before_max = datetime.utcnow() + timedelta(seconds=100)
self.assertTrue(not_before_min <= not_before <= not_before_max)
-
def test_gmtime_adj_notAfter_wrong_args(self):
"""
:py:obj:`X509Type.gmtime_adj_notAfter` raises :py:obj:`TypeError` if called with the
@@ -1522,7 +1452,6 @@
self.assertRaises(TypeError, cert.gmtime_adj_notAfter, None)
self.assertRaises(TypeError, cert.gmtime_adj_notAfter, 123, None)
-
def test_gmtime_adj_notAfter(self):
"""
:py:obj:`X509Type.gmtime_adj_notAfter` changes the not-after timestamp to be
@@ -1535,7 +1464,6 @@
not_after_max = datetime.utcnow() + timedelta(seconds=100)
self.assertTrue(not_after_min <= not_after <= not_after_max)
-
def test_has_expired_wrong_args(self):
"""
:py:obj:`X509Type.has_expired` raises :py:obj:`TypeError` if called with any
@@ -1544,7 +1472,6 @@
cert = X509()
self.assertRaises(TypeError, cert.has_expired, None)
-
def test_has_expired(self):
"""
:py:obj:`X509Type.has_expired` returns :py:obj:`True` if the certificate's not-after
@@ -1554,7 +1481,6 @@
cert.gmtime_adj_notAfter(-1)
self.assertTrue(cert.has_expired())
-
def test_has_not_expired(self):
"""
:py:obj:`X509Type.has_expired` returns :py:obj:`False` if the certificate's not-after
@@ -1572,7 +1498,6 @@
cert = load_certificate(FILETYPE_PEM, root_cert_pem)
self.assertFalse(cert.has_expired())
-
def test_digest(self):
"""
:py:obj:`X509.digest` returns a string giving ":"-separated hex-encoded words
@@ -1588,7 +1513,6 @@
cert.digest("MD5"),
b("19:B3:05:26:2B:F8:F2:FF:0B:8F:21:07:A8:28:B8:75"))
-
def _extcert(self, pkey, extensions):
cert = X509()
cert.set_pubkey(pkey)
@@ -1603,7 +1527,6 @@
return load_certificate(
FILETYPE_PEM, dump_certificate(FILETYPE_PEM, cert))
-
def test_extension_count(self):
"""
:py:obj:`X509.get_extension_count` returns the number of extensions that are
@@ -1627,7 +1550,6 @@
c = self._extcert(pkey, [ca, key, subjectAltName])
self.assertEqual(c.get_extension_count(), 3)
-
def test_get_extension(self):
"""
:py:obj:`X509.get_extension` takes an integer and returns an :py:obj:`X509Extension`
@@ -1660,7 +1582,6 @@
self.assertRaises(IndexError, cert.get_extension, 4)
self.assertRaises(TypeError, cert.get_extension, "hello")
-
def test_nullbyte_subjectAltName(self):
"""
The fields of a `subjectAltName` extension on an X509 may contain NUL
@@ -1678,7 +1599,6 @@
"IP Address:192.0.2.1, IP Address:2001:DB8:0:0:0:0:0:1\n"),
b(str(ext)))
-
def test_invalid_digest_algorithm(self):
"""
:py:obj:`X509.digest` raises :py:obj:`ValueError` if called with an unrecognized hash
@@ -1687,7 +1607,6 @@
cert = X509()
self.assertRaises(ValueError, cert.digest, BAD_DIGEST)
-
def test_get_subject_wrong_args(self):
"""
:py:obj:`X509.get_subject` raises :py:obj:`TypeError` if called with any arguments.
@@ -1695,7 +1614,6 @@
cert = X509()
self.assertRaises(TypeError, cert.get_subject, None)
-
def test_get_subject(self):
"""
:py:obj:`X509.get_subject` returns an :py:obj:`X509Name` instance.
@@ -1708,7 +1626,6 @@
[(b('C'), b('US')), (b('ST'), b('IL')), (b('L'), b('Chicago')),
(b('O'), b('Testing')), (b('CN'), b('Testing Root CA'))])
-
def test_set_subject_wrong_args(self):
"""
:py:obj:`X509.set_subject` raises a :py:obj:`TypeError` if called with the wrong
@@ -1734,7 +1651,6 @@
cert.get_subject().get_components(),
[(b('C'), b('AU')), (b('O'), b('Unit Tests'))])
-
def test_get_issuer_wrong_args(self):
"""
:py:obj:`X509.get_issuer` raises :py:obj:`TypeError` if called with any arguments.
@@ -1742,7 +1658,6 @@
cert = X509()
self.assertRaises(TypeError, cert.get_issuer, None)
-
def test_get_issuer(self):
"""
:py:obj:`X509.get_issuer` returns an :py:obj:`X509Name` instance.
@@ -1756,7 +1671,6 @@
[(b('C'), b('US')), (b('ST'), b('IL')), (b('L'), b('Chicago')),
(b('O'), b('Testing')), (b('CN'), b('Testing Root CA'))])
-
def test_set_issuer_wrong_args(self):
"""
:py:obj:`X509.set_issuer` raises a :py:obj:`TypeError` if called with the wrong
@@ -1767,7 +1681,6 @@
self.assertRaises(TypeError, cert.set_issuer, None)
self.assertRaises(TypeError, cert.set_issuer, cert.get_issuer(), None)
-
def test_set_issuer(self):
"""
:py:obj:`X509.set_issuer` changes the issuer of the certificate to the one
@@ -1782,7 +1695,6 @@
cert.get_issuer().get_components(),
[(b('C'), b('AU')), (b('O'), b('Unit Tests'))])
-
def test_get_pubkey_uninitialized(self):
"""
When called on a certificate with no public key, :py:obj:`X509.get_pubkey`
@@ -1791,7 +1703,6 @@
cert = X509()
self.assertRaises(Error, cert.get_pubkey)
-
def test_subject_name_hash_wrong_args(self):
"""
:py:obj:`X509.subject_name_hash` raises :py:obj:`TypeError` if called with any
@@ -1800,7 +1711,6 @@
cert = X509()
self.assertRaises(TypeError, cert.subject_name_hash, None)
-
def test_subject_name_hash(self):
"""
:py:obj:`X509.subject_name_hash` returns the hash of the certificate's subject
@@ -1809,11 +1719,10 @@
cert = load_certificate(FILETYPE_PEM, self.pemData)
self.assertIn(
cert.subject_name_hash(),
- [3350047874, # OpenSSL 0.9.8, MD5
- 3278919224, # OpenSSL 1.0.0, SHA1
+ [3350047874, # OpenSSL 0.9.8, MD5
+ 3278919224, # OpenSSL 1.0.0, SHA1
])
-
def test_get_signature_algorithm(self):
"""
:py:obj:`X509Type.get_signature_algorithm` returns a string which means
@@ -1823,7 +1732,6 @@
self.assertEqual(
b("sha1WithRSAEncryption"), cert.get_signature_algorithm())
-
def test_get_undefined_signature_algorithm(self):
"""
:py:obj:`X509Type.get_signature_algorithm` raises :py:obj:`ValueError` if the
@@ -1856,11 +1764,11 @@
self.assertRaises(ValueError, cert.get_signature_algorithm)
-
class X509StoreTests(TestCase):
"""
Test for :py:obj:`OpenSSL.crypto.X509Store`.
"""
+
def test_type(self):
"""
:py:obj:`X509StoreType` is a type object.
@@ -1868,14 +1776,12 @@
self.assertIdentical(X509Store, X509StoreType)
self.assertConsistentType(X509Store, 'X509Store')
-
def test_add_cert_wrong_args(self):
store = X509Store()
self.assertRaises(TypeError, store.add_cert)
self.assertRaises(TypeError, store.add_cert, object())
self.assertRaises(TypeError, store.add_cert, X509(), object())
-
def test_add_cert(self):
"""
:py:obj:`X509Store.add_cert` adds a :py:obj:`X509` instance to the
@@ -1885,7 +1791,6 @@
store = X509Store()
store.add_cert(cert)
-
def test_add_cert_rejects_duplicate(self):
"""
:py:obj:`X509Store.add_cert` raises :py:obj:`OpenSSL.crypto.Error` if an
@@ -1897,7 +1802,6 @@
self.assertRaises(Error, store.add_cert, cert)
-
class PKCS12Tests(TestCase):
"""
Test for :py:obj:`OpenSSL.crypto.PKCS12` and :py:obj:`OpenSSL.crypto.load_pkcs12`.
@@ -1911,7 +1815,6 @@
self.assertIdentical(PKCS12, PKCS12Type)
self.assertConsistentType(PKCS12, 'PKCS12')
-
def test_empty_construction(self):
"""
:py:obj:`PKCS12` returns a new instance of :py:obj:`PKCS12` with no certificate,
@@ -1923,7 +1826,6 @@
self.assertEqual(None, p12.get_ca_certificates())
self.assertEqual(None, p12.get_friendlyname())
-
def test_type_errors(self):
"""
The :py:obj:`PKCS12` setter functions (:py:obj:`set_certificate`, :py:obj:`set_privatekey`,
@@ -1944,7 +1846,6 @@
self.assertRaises(TypeError, p12.set_friendlyname, 6)
self.assertRaises(TypeError, p12.set_friendlyname, ('foo', 'bar'))
-
def test_key_only(self):
"""
A :py:obj:`PKCS12` with only a private key can be exported using
@@ -1971,7 +1872,6 @@
# future this will be improved.
self.assertTrue(isinstance(p12.get_privatekey(), (PKey, type(None))))
-
def test_cert_only(self):
"""
A :py:obj:`PKCS12` with only a certificate can be exported using
@@ -2006,7 +1906,6 @@
cleartextCertificatePEM,
dump_certificate(FILETYPE_PEM, p12.get_ca_certificates()[0]))
-
def gen_pkcs12(self, cert_pem=None, key_pem=None, ca_pem=None, friendly_name=None):
"""
Generate a PKCS12 object with components from PEM. Verify that the set
@@ -2027,7 +1926,6 @@
self.assertEqual(ret, None)
return p12
-
def check_recovery(self, p12_str, key=None, cert=None, ca=None, passwd=b"",
extra=()):
"""
@@ -2050,7 +1948,6 @@
b"pass:" + passwd, b"-nokeys", *extra)
self.assertEqual(recovered_cert[-len(ca):], ca)
-
def verify_pkcs12_container(self, p12):
"""
Verify that the PKCS#12 container contains the correct client
@@ -2065,7 +1962,6 @@
(client_cert_pem, client_key_pem, None),
(cert_pem, key_pem, p12.get_ca_certificates()))
-
def test_load_pkcs12(self):
"""
A PKCS12 string generated using the openssl command line can be loaded
@@ -2078,7 +1974,6 @@
p12 = load_pkcs12(p12_str, passphrase=passwd)
self.verify_pkcs12_container(p12)
-
def test_load_pkcs12_text_passphrase(self):
"""
A PKCS12 string generated using the openssl command line can be loaded
@@ -2103,7 +1998,6 @@
self.verify_pkcs12_container(p12)
-
def test_load_pkcs12_no_passphrase(self):
"""
A PKCS12 string generated using openssl command line can be loaded with
@@ -2116,7 +2010,6 @@
p12 = load_pkcs12(p12_str)
self.verify_pkcs12_container(p12)
-
def _dump_and_load(self, dump_passphrase, load_passphrase):
"""
A helper method to dump and load a PKCS12 object.
@@ -2125,7 +2018,6 @@
dumped_p12 = p12.export(passphrase=dump_passphrase, iter=2, maciter=3)
return load_pkcs12(dumped_p12, passphrase=load_passphrase)
-
def test_load_pkcs12_null_passphrase_load_empty(self):
"""
A PKCS12 string can be dumped with a null passphrase, loaded with an
@@ -2135,7 +2027,6 @@
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=None, load_passphrase=b''))
-
def test_load_pkcs12_null_passphrase_load_null(self):
"""
A PKCS12 string can be dumped with a null passphrase, loaded with a
@@ -2145,7 +2036,6 @@
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=None, load_passphrase=None))
-
def test_load_pkcs12_empty_passphrase_load_empty(self):
"""
A PKCS12 string can be dumped with an empty passphrase, loaded with an
@@ -2155,7 +2045,6 @@
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=b'', load_passphrase=b''))
-
def test_load_pkcs12_empty_passphrase_load_null(self):
"""
A PKCS12 string can be dumped with an empty passphrase, loaded with a
@@ -2165,7 +2054,6 @@
self.verify_pkcs12_container(
self._dump_and_load(dump_passphrase=b'', load_passphrase=None))
-
def test_load_pkcs12_garbage(self):
"""
:py:obj:`load_pkcs12` raises :py:obj:`OpenSSL.crypto.Error` when passed a string
@@ -2176,7 +2064,6 @@
self.assertEqual(e.args[0][0][0], 'asn1 encoding routines')
self.assertEqual(len(e.args[0][0]), 3)
-
def test_replace(self):
"""
:py:obj:`PKCS12.set_certificate` replaces the certificate in a PKCS12 cluster.
@@ -2188,7 +2075,7 @@
p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
client_cert = load_certificate(FILETYPE_PEM, client_cert_pem)
- p12.set_ca_certificates([root_cert]) # not a tuple
+ p12.set_ca_certificates([root_cert]) # not a tuple
self.assertEqual(1, len(p12.get_ca_certificates()))
self.assertEqual(root_cert, p12.get_ca_certificates()[0])
p12.set_ca_certificates([client_cert, root_cert])
@@ -2196,7 +2083,6 @@
self.assertEqual(client_cert, p12.get_ca_certificates()[0])
self.assertEqual(root_cert, p12.get_ca_certificates()[1])
-
def test_friendly_name(self):
"""
The *friendlyName* of a PKCS12 can be set and retrieved via
@@ -2220,7 +2106,6 @@
dumped_p12, key=server_key_pem, cert=server_cert_pem,
ca=root_cert_pem, passwd=passwd)
-
def test_various_empty_passphrases(self):
"""
Test that missing, None, and '' passphrases are identical for PKCS12
@@ -2236,7 +2121,6 @@
dumped_p12, key=client_key_pem, cert=client_cert_pem,
ca=root_cert_pem, passwd=passwd)
-
def test_removing_ca_cert(self):
"""
Passing :py:obj:`None` to :py:obj:`PKCS12.set_ca_certificates` removes all CA
@@ -2246,7 +2130,6 @@
p12.set_ca_certificates(None)
self.assertEqual(None, p12.get_ca_certificates())
-
def test_export_without_mac(self):
"""
Exporting a PKCS12 with a :py:obj:`maciter` of ``-1`` excludes the MAC
@@ -2259,7 +2142,6 @@
dumped_p12, key=server_key_pem, cert=server_cert_pem,
passwd=passwd, extra=(b"-nomacver",))
-
def test_load_without_mac(self):
"""
Loading a PKCS12 without a MAC does something other than crash.
@@ -2280,7 +2162,6 @@
# versions do.
pass
-
def test_zero_len_list_for_ca(self):
"""
A PKCS12 with an empty CA certificates list can be exported.
@@ -2294,7 +2175,6 @@
# dumped_p12, key=server_key_pem, cert=server_cert_pem,
# passwd=passwd)
-
def test_export_without_args(self):
"""
All the arguments to :py:obj:`PKCS12.export` are optional.
@@ -2304,7 +2184,6 @@
self.check_recovery(
dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"")
-
def test_export_without_bytes(self):
"""
Test :py:obj:`PKCS12.export` with text not bytes as passphrase
@@ -2324,7 +2203,6 @@
self.check_recovery(
dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"randomtext")
-
def test_key_cert_mismatch(self):
"""
:py:obj:`PKCS12.export` raises an exception when a key and certificate
@@ -2334,10 +2212,11 @@
self.assertRaises(Error, p12.export)
-
# These quoting functions taken directly from Twisted's twisted.python.win32.
_cmdLineQuoteRe = re.compile(br'(\\*)"')
_cmdLineQuoteRe2 = re.compile(br'(\\+)\Z')
+
+
def cmdLineQuote(s):
"""
Internal method for quoting a single command-line argument.
@@ -2355,7 +2234,6 @@
return b'"' + s + b'"'
-
def quoteArguments(arguments):
"""
Quote an iterable of command-line arguments for passing to CreateProcess or
@@ -2371,7 +2249,6 @@
return b' '.join(map(cmdLineQuote, arguments))
-
def _runopenssl(pem, *args):
"""
Run the command line openssl tool with the given arguments and write
@@ -2392,7 +2269,6 @@
return output
-
class FunctionTests(TestCase):
"""
Tests for free-functions in the :py:obj:`OpenSSL.crypto` module.
@@ -2404,7 +2280,6 @@
"""
self.assertRaises(ValueError, load_privatekey, 100, root_key_pem)
-
def test_load_privatekey_invalid_passphrase_type(self):
"""
:py:obj:`load_privatekey` raises :py:obj:`TypeError` if passed a passphrase that is
@@ -2415,7 +2290,6 @@
load_privatekey,
FILETYPE_PEM, encryptedPrivateKeyPEMPassphrase, object())
-
def test_load_privatekey_wrong_args(self):
"""
:py:obj:`load_privatekey` raises :py:obj:`TypeError` if called with the wrong number
@@ -2423,7 +2297,6 @@
"""
self.assertRaises(TypeError, load_privatekey)
-
def test_load_privatekey_wrongPassphrase(self):
"""
:py:obj:`load_privatekey` raises :py:obj:`OpenSSL.crypto.Error` when it is passed an
@@ -2433,7 +2306,6 @@
Error,
load_privatekey, FILETYPE_PEM, encryptedPrivateKeyPEM, b("quack"))
-
def test_load_privatekey_passphraseWrongType(self):
"""
:py:obj:`load_privatekey` raises :py:obj:`ValueError` when it is passed a passphrase
@@ -2456,7 +2328,6 @@
encryptedPrivateKeyPEMPassphrase)
self.assertTrue(isinstance(key, PKeyType))
-
def test_load_privatekey_passphrase_exception(self):
"""
If the passphrase callback raises an exception, that exception is raised
@@ -2476,6 +2347,7 @@
incorrect passphrase.
"""
called = []
+
def cb(*a):
called.append(None)
return b("quack")
@@ -2484,7 +2356,6 @@
load_privatekey, FILETYPE_PEM, encryptedPrivateKeyPEM, cb)
self.assertTrue(called)
-
def test_load_privatekey_passphraseCallback(self):
"""
:py:obj:`load_privatekey` can create a :py:obj:`PKey` object from an encrypted PEM
@@ -2492,6 +2363,7 @@
password.
"""
called = []
+
def cb(writing):
called.append(writing)
return encryptedPrivateKeyPEMPassphrase
@@ -2499,7 +2371,6 @@
self.assertTrue(isinstance(key, PKeyType))
self.assertEqual(called, [False])
-
def test_load_privatekey_passphrase_wrong_return_type(self):
"""
:py:obj:`load_privatekey` raises :py:obj:`ValueError` if the passphrase
@@ -2510,7 +2381,6 @@
load_privatekey,
FILETYPE_PEM, encryptedPrivateKeyPEM, lambda *args: 3)
-
def test_dump_privatekey_wrong_args(self):
"""
:py:obj:`dump_privatekey` raises :py:obj:`TypeError` if called with the wrong number
@@ -2521,7 +2391,6 @@
self.assertRaises(
TypeError, dump_privatekey, FILETYPE_PEM, PKey(), GOOD_CIPHER)
-
def test_dump_privatekey_unknown_cipher(self):
"""
:py:obj:`dump_privatekey` raises :py:obj:`ValueError` if called with an unrecognized
@@ -2533,7 +2402,6 @@
ValueError, dump_privatekey,
FILETYPE_PEM, key, BAD_CIPHER, "passphrase")
-
def test_dump_privatekey_invalid_passphrase_type(self):
"""
:py:obj:`dump_privatekey` raises :py:obj:`TypeError` if called with a passphrase which
@@ -2545,7 +2413,6 @@
TypeError,
dump_privatekey, FILETYPE_PEM, key, GOOD_CIPHER, object())
-
def test_dump_privatekey_invalid_filetype(self):
"""
:py:obj:`dump_privatekey` raises :py:obj:`ValueError` if called with an unrecognized
@@ -2555,7 +2422,6 @@
key.generate_key(TYPE_RSA, 512)
self.assertRaises(ValueError, dump_privatekey, 100, key)
-
def test_load_privatekey_passphraseCallbackLength(self):
"""
:py:obj:`crypto.load_privatekey` should raise an error when the passphrase
@@ -2581,7 +2447,6 @@
self.assertEqual(loadedKey.type(), key.type())
self.assertEqual(loadedKey.bits(), key.bits())
-
def test_dump_privatekey_passphraseWrongType(self):
"""
:py:obj:`dump_privatekey` raises :py:obj:`ValueError` when it is passed a passphrase
@@ -2611,7 +2476,6 @@
good_text = _runopenssl(dumped_pem, b"x509", b"-noout", b"-text")
self.assertEqual(dumped_text, good_text)
-
def test_dump_privatekey_pem(self):
"""
:py:obj:`dump_privatekey` writes a PEM
@@ -2621,7 +2485,6 @@
dumped_pem = dump_privatekey(FILETYPE_PEM, key)
self.assertEqual(dumped_pem, cleartextPrivateKeyPEM)
-
def test_dump_privatekey_asn1(self):
"""
:py:obj:`dump_privatekey` writes a DER
@@ -2637,7 +2500,6 @@
dumped_pem2 = dump_privatekey(FILETYPE_PEM, key2)
self.assertEqual(dumped_pem2, cleartextPrivateKeyPEM)
-
def test_dump_privatekey_text(self):
"""
:py:obj:`dump_privatekey` writes a text
@@ -2668,7 +2530,6 @@
self.assertEqual(dumped_text, good_text)
self.assertRaises(ValueError, dump_certificate_request, 100, req)
-
def test_dump_privatekey_passphraseCallback(self):
"""
:py:obj:`dump_privatekey` writes an encrypted PEM when given a callback which
@@ -2676,6 +2537,7 @@
"""
passphrase = b("foo")
called = []
+
def cb(writing):
called.append(writing)
return passphrase
@@ -2688,7 +2550,6 @@
self.assertEqual(loadedKey.type(), key.type())
self.assertEqual(loadedKey.bits(), key.bits())
-
def test_dump_privatekey_passphrase_exception(self):
"""
:py:obj:`dump_privatekey` should not overwrite the exception raised
@@ -2723,7 +2584,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertTrue(isinstance(pkcs7, PKCS7Type))
-
def test_load_pkcs7_data_asn1(self):
"""
:py:obj:`load_pkcs7_data` accepts a bytes containing ASN1 data
@@ -2732,7 +2592,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_ASN1, pkcs7DataASN1)
self.assertTrue(isinstance(pkcs7, PKCS7Type))
-
def test_load_pkcs7_data_invalid(self):
"""
If the data passed to :py:obj:`load_pkcs7_data` is invalid,
@@ -2741,11 +2600,11 @@
self.assertRaises(Error, load_pkcs7_data, FILETYPE_PEM, b"foo")
-
class LoadCertificateTests(TestCase):
"""
Tests for :py:obj:`load_certificate_request`.
"""
+
def test_badFileType(self):
"""
If the file type passed to :py:obj:`load_certificate_request` is
@@ -2755,11 +2614,11 @@
self.assertRaises(ValueError, load_certificate_request, object(), b"")
-
class PKCS7Tests(TestCase):
"""
Tests for :py:obj:`PKCS7Type`.
"""
+
def test_type(self):
"""
:py:obj:`PKCS7Type` is a type object.
@@ -2770,7 +2629,6 @@
# XXX This doesn't currently work.
# self.assertIdentical(PKCS7, PKCS7Type)
-
# XXX Opposite results for all these following methods
def test_type_is_signed_wrong_args(self):
@@ -2781,7 +2639,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertRaises(TypeError, pkcs7.type_is_signed, None)
-
def test_type_is_signed(self):
"""
:py:obj:`PKCS7Type.type_is_signed` returns :py:obj:`True` if the PKCS7 object is of
@@ -2790,7 +2647,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertTrue(pkcs7.type_is_signed())
-
def test_type_is_enveloped_wrong_args(self):
"""
:py:obj:`PKCS7Type.type_is_enveloped` raises :py:obj:`TypeError` if called with any
@@ -2799,7 +2655,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertRaises(TypeError, pkcs7.type_is_enveloped, None)
-
def test_type_is_enveloped(self):
"""
:py:obj:`PKCS7Type.type_is_enveloped` returns :py:obj:`False` if the PKCS7 object is
@@ -2808,7 +2663,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertFalse(pkcs7.type_is_enveloped())
-
def test_type_is_signedAndEnveloped_wrong_args(self):
"""
:py:obj:`PKCS7Type.type_is_signedAndEnveloped` raises :py:obj:`TypeError` if called
@@ -2817,7 +2671,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertRaises(TypeError, pkcs7.type_is_signedAndEnveloped, None)
-
def test_type_is_signedAndEnveloped(self):
"""
:py:obj:`PKCS7Type.type_is_signedAndEnveloped` returns :py:obj:`False` if the PKCS7
@@ -2826,7 +2679,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertFalse(pkcs7.type_is_signedAndEnveloped())
-
def test_type_is_data(self):
"""
:py:obj:`PKCS7Type.type_is_data` returns :py:obj:`False` if the PKCS7 object is not of
@@ -2835,7 +2687,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertFalse(pkcs7.type_is_data())
-
def test_type_is_data_wrong_args(self):
"""
:py:obj:`PKCS7Type.type_is_data` raises :py:obj:`TypeError` if called with any
@@ -2844,7 +2695,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertRaises(TypeError, pkcs7.type_is_data, None)
-
def test_get_type_name_wrong_args(self):
"""
:py:obj:`PKCS7Type.get_type_name` raises :py:obj:`TypeError` if called with any
@@ -2853,7 +2703,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertRaises(TypeError, pkcs7.get_type_name, None)
-
def test_get_type_name(self):
"""
:py:obj:`PKCS7Type.get_type_name` returns a :py:obj:`str` giving the type name.
@@ -2861,7 +2710,6 @@
pkcs7 = load_pkcs7_data(FILETYPE_PEM, pkcs7Data)
self.assertEquals(pkcs7.get_type_name(), b('pkcs7-signedData'))
-
def test_attribute(self):
"""
If an attribute other than one of the methods tested here is accessed on
@@ -2871,18 +2719,17 @@
self.assertRaises(AttributeError, getattr, pkcs7, "foo")
-
class NetscapeSPKITests(TestCase, _PKeyInteractionTestsMixin):
"""
Tests for :py:obj:`OpenSSL.crypto.NetscapeSPKI`.
"""
+
def signable(self):
"""
Return a new :py:obj:`NetscapeSPKI` for use with signing tests.
"""
return NetscapeSPKI()
-
def test_type(self):
"""
:py:obj:`NetscapeSPKI` and :py:obj:`NetscapeSPKIType` refer to the same type object
@@ -2891,7 +2738,6 @@
self.assertIdentical(NetscapeSPKI, NetscapeSPKIType)
self.assertConsistentType(NetscapeSPKI, 'NetscapeSPKI')
-
def test_construction(self):
"""
:py:obj:`NetscapeSPKI` returns an instance of :py:obj:`NetscapeSPKIType`.
@@ -2899,7 +2745,6 @@
nspki = NetscapeSPKI()
self.assertTrue(isinstance(nspki, NetscapeSPKIType))
-
def test_invalid_attribute(self):
"""
Accessing a non-existent attribute of a :py:obj:`NetscapeSPKI` instance causes
@@ -2908,7 +2753,6 @@
nspki = NetscapeSPKI()
self.assertRaises(AttributeError, lambda: nspki.foo)
-
def test_b64_encode(self):
"""
:py:obj:`NetscapeSPKI.b64_encode` encodes the certificate to a base64 blob.
@@ -2918,11 +2762,11 @@
self.assertTrue(isinstance(blob, binary_type))
-
class RevokedTests(TestCase):
"""
Tests for :py:obj:`OpenSSL.crypto.Revoked`
"""
+
def test_construction(self):
"""
Confirm we can create :py:obj:`OpenSSL.crypto.Revoked`. Check
@@ -2935,7 +2779,6 @@
self.assertEquals(revoked.get_rev_date(), None)
self.assertEquals(revoked.get_reason(), None)
-
def test_construction_wrong_args(self):
"""
Calling :py:obj:`OpenSSL.crypto.Revoked` with any arguments results
@@ -2945,7 +2788,6 @@
self.assertRaises(TypeError, Revoked, 1)
self.assertRaises(TypeError, Revoked, "foo")
-
def test_serial(self):
"""
Confirm we can set and get serial numbers from
@@ -2968,7 +2810,6 @@
self.assertRaises(TypeError, revoked.get_serial, None)
self.assertRaises(TypeError, revoked.get_serial, "")
-
def test_date(self):
"""
Confirm we can set and get revocation dates from
@@ -2985,7 +2826,6 @@
date = revoked.get_rev_date()
self.assertEqual(date, now)
-
def test_reason(self):
"""
Confirm we can set and get revocation reasons from
@@ -3001,12 +2841,11 @@
self.assertEquals(
reason.lower().replace(b(' '), b('')),
r.lower().replace(b(' '), b('')))
- r = reason # again with the resp of get
+ r = reason # again with the resp of get
revoked.set_reason(None)
self.assertEqual(revoked.get_reason(), None)
-
def test_set_reason_wrong_arguments(self):
"""
Calling :py:obj:`OpenSSL.crypto.Revoked.set_reason` with other than
@@ -3017,7 +2856,6 @@
self.assertRaises(TypeError, revoked.set_reason, 100)
self.assertRaises(ValueError, revoked.set_reason, b('blue'))
-
def test_get_reason_wrong_arguments(self):
"""
Calling :py:obj:`OpenSSL.crypto.Revoked.get_reason` with any
@@ -3029,7 +2867,6 @@
self.assertRaises(TypeError, revoked.get_reason, "foo")
-
class CRLTests(TestCase):
"""
Tests for :py:obj:`OpenSSL.crypto.CRL`
@@ -3046,7 +2883,6 @@
self.assertTrue(isinstance(crl, CRL))
self.assertEqual(crl.get_revoked(), None)
-
def test_construction_wrong_args(self):
"""
Calling :py:obj:`OpenSSL.crypto.CRL` with any number of arguments
@@ -3056,7 +2892,6 @@
self.assertRaises(TypeError, CRL, "")
self.assertRaises(TypeError, CRL, None)
-
def _get_crl(self):
"""
Get a new ``CRL`` with a revocation.
@@ -3070,7 +2905,6 @@
crl.add_revoked(revoked)
return crl
-
def test_export_pem(self):
"""
If not passed a format, ``CRL.export`` returns a "PEM" format string
@@ -3090,7 +2924,6 @@
b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA')
)
-
def test_export_der(self):
"""
If passed ``FILETYPE_ASN1`` for the format, ``CRL.export`` returns a
@@ -3110,7 +2943,6 @@
b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA')
)
-
def test_export_text(self):
"""
If passed ``FILETYPE_TEXT`` for the format, ``CRL.export`` returns a
@@ -3128,7 +2960,6 @@
dumped_text = crl.export(self.cert, self.pkey, type=FILETYPE_TEXT)
self.assertEqual(text, dumped_text)
-
def test_export_custom_digest(self):
"""
If passed the name of a digest function, ``CRL.export`` uses a
@@ -3139,7 +2970,6 @@
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Signature Algorithm: sha1'))
-
def test_export_md5_digest(self):
"""
If passed md5 as the digest function, ``CRL.export`` uses md5 and does
@@ -3153,7 +2983,6 @@
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Signature Algorithm: md5'))
-
def test_export_default_digest(self):
"""
If not passed the name of a digest function, ``CRL.export`` uses a
@@ -3171,7 +3000,6 @@
text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Signature Algorithm: md5'))
-
def test_export_invalid(self):
"""
If :py:obj:`CRL.export` is used with an uninitialized :py:obj:`X509`
@@ -3180,7 +3008,6 @@
crl = CRL()
self.assertRaises(Error, crl.export, X509(), PKey())
-
def test_add_revoked_keyword(self):
"""
:py:obj:`OpenSSL.CRL.add_revoked` accepts its single argument as the
@@ -3191,7 +3018,6 @@
crl.add_revoked(revoked=revoked)
self.assertTrue(isinstance(crl.get_revoked()[0], Revoked))
-
def test_export_wrong_args(self):
"""
Calling :py:obj:`OpenSSL.CRL.export` with fewer than two or more than
@@ -3209,7 +3035,6 @@
self.assertRaises(TypeError, crl.export, self.cert, self.pkey, None, 10)
self.assertRaises(TypeError, crl.export, self.cert, FILETYPE_PEM, None)
-
def test_export_unknown_filetype(self):
"""
Calling :py:obj:`OpenSSL.CRL.export` with a file type other than
@@ -3232,7 +3057,6 @@
self.cert, self.pkey, FILETYPE_PEM, 10, b"strange-digest"
)
-
def test_get_revoked(self):
"""
Use python to create a simple CRL with two revocations.
@@ -3259,7 +3083,6 @@
self.assertEqual(revs[0].get_rev_date(), now)
self.assertEqual(revs[1].get_rev_date(), now)
-
def test_get_revoked_wrong_args(self):
"""
Calling :py:obj:`OpenSSL.CRL.get_revoked` with any arguments results
@@ -3271,7 +3094,6 @@
self.assertRaises(TypeError, crl.get_revoked, "")
self.assertRaises(TypeError, crl.get_revoked, "", 1, None)
-
def test_add_revoked_wrong_args(self):
"""
Calling :py:obj:`OpenSSL.CRL.add_revoked` with other than one
@@ -3282,7 +3104,6 @@
self.assertRaises(TypeError, crl.add_revoked, 1, 2)
self.assertRaises(TypeError, crl.add_revoked, "foo", "bar")
-
def test_load_crl(self):
"""
Load a known CRL and inspect its revocations. Both
@@ -3305,7 +3126,6 @@
self.assertEqual(revs[1].get_serial(), b('0100'))
self.assertEqual(revs[1].get_reason(), b('Superseded'))
-
def test_load_crl_wrong_args(self):
"""
Calling :py:obj:`OpenSSL.crypto.load_crl` with other than two
@@ -3315,7 +3135,6 @@
self.assertRaises(TypeError, load_crl, FILETYPE_PEM)
self.assertRaises(TypeError, load_crl, FILETYPE_PEM, crlData, None)
-
def test_load_crl_bad_filetype(self):
"""
Calling :py:obj:`OpenSSL.crypto.load_crl` with an unknown file type
@@ -3323,7 +3142,6 @@
"""
self.assertRaises(ValueError, load_crl, 100, crlData)
-
def test_load_crl_bad_data(self):
"""
Calling :py:obj:`OpenSSL.crypto.load_crl` with file data which can't
@@ -3352,7 +3170,6 @@
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
self.assertEqual(store_ctx.verify_certificate(), None)
-
def test_reuse(self):
"""
:py:obj:`verify_certificate` can be called multiple times with the same
@@ -3365,7 +3182,6 @@
self.assertEqual(store_ctx.verify_certificate(), None)
self.assertEqual(store_ctx.verify_certificate(), None)
-
def test_trusted_self_signed(self):
"""
:py:obj:`verify_certificate` returns ``None`` when called with a self-signed
@@ -3376,7 +3192,6 @@
store_ctx = X509StoreContext(store, self.root_cert)
self.assertEqual(store_ctx.verify_certificate(), None)
-
def test_untrusted_self_signed(self):
"""
:py:obj:`verify_certificate` raises error when a self-signed certificate is
@@ -3388,7 +3203,6 @@
self.assertEqual(e.args[0][2], 'self signed certificate')
self.assertEqual(e.certificate.get_subject().CN, 'Testing Root CA')
-
def test_invalid_chain_no_root(self):
"""
:py:obj:`verify_certificate` raises error when a root certificate is missing
@@ -3401,7 +3215,6 @@
self.assertEqual(e.args[0][2], 'unable to get issuer certificate')
self.assertEqual(e.certificate.get_subject().CN, 'intermediate')
-
def test_invalid_chain_no_intermediate(self):
"""
:py:obj:`verify_certificate` raises error when an intermediate certificate is
@@ -3433,11 +3246,11 @@
self.assertEqual(store_ctx.verify_certificate(), None)
-
class SignVerifyTests(TestCase):
"""
Tests for :py:obj:`OpenSSL.crypto.sign` and :py:obj:`OpenSSL.crypto.verify`.
"""
+
def test_sign_verify(self):
"""
:py:obj:`sign` generates a cryptographic signature which :py:obj:`verify` can check.
@@ -3459,7 +3272,8 @@
for digest in ['md5', 'sha1']:
sig = sign(priv_key, content, digest)
- # Verify the signature of content, will throw an exception if error.
+ # Verify the signature of content, will throw an exception if
+ # error.
verify(good_cert, sig, content, digest)
# This should fail because the certificate doesn't match the
@@ -3478,7 +3292,6 @@
self.assertRaises(
ValueError, verify, good_cert, sig, content, "strange-digest")
-
def test_sign_verify_with_text(self):
"""
:py:obj:`sign` generates a cryptographic signature which :py:obj:`verify` can check.
@@ -3519,7 +3332,6 @@
)
self.assertIs(w[-1].category, DeprecationWarning)
-
def test_sign_nulls(self):
"""
:py:obj:`sign` produces a signature for a string with embedded nulls.
@@ -3531,19 +3343,18 @@
verify(good_cert, sig, content, "sha1")
-
class EllipticCurveTests(TestCase):
"""
Tests for :py:class:`_EllipticCurve`, :py:obj:`get_elliptic_curve`, and
:py:obj:`get_elliptic_curves`.
"""
+
def test_set(self):
"""
:py:obj:`get_elliptic_curves` returns a :py:obj:`set`.
"""
self.assertIsInstance(get_elliptic_curves(), set)
-
def test_some_curves(self):
"""
If :py:mod:`cryptography` has elliptic curve support then the set
@@ -3559,7 +3370,6 @@
else:
self.assertFalse(curves)
-
def test_a_curve(self):
"""
:py:obj:`get_elliptic_curve` can be used to retrieve a particular
@@ -3572,7 +3382,6 @@
else:
self.assertRaises(ValueError, get_elliptic_curve, u("prime256v1"))
-
def test_not_a_curve(self):
"""
:py:obj:`get_elliptic_curve` raises :py:class:`ValueError` if called
@@ -3581,7 +3390,6 @@
self.assertRaises(
ValueError, get_elliptic_curve, u("this curve was just invented"))
-
def test_repr(self):
"""
The string representation of a curve object includes simply states the
@@ -3592,7 +3400,6 @@
curve = next(iter(curves))
self.assertEqual("<Curve %r>" % (curve.name,), repr(curve))
-
def test_to_EC_KEY(self):
"""
The curve object can export a version of itself as an EC_KEY* via the
@@ -3607,11 +3414,11 @@
curve._to_EC_KEY()
-
class EllipticCurveFactory(object):
"""
A helper to get the names of two curves.
"""
+
def __init__(self):
curves = iter(get_elliptic_curves())
try:
@@ -3621,7 +3428,6 @@
self.curve_name = self.another_curve_name = None
-
class EllipticCurveEqualityTests(TestCase, EqualityTestsMixin):
"""
Tests :py:type:`_EllipticCurve`\ 's implementation of ``==`` and ``!=``.
@@ -3631,14 +3437,12 @@
if curve_factory.curve_name is None:
skip = "There are no curves available there can be no curve objects."
-
def anInstance(self):
"""
Get the curve object for an arbitrary curve supported by the system.
"""
return get_elliptic_curve(self.curve_factory.curve_name)
-
def anotherInstance(self):
"""
Get the curve object for an arbitrary curve supported by the system -
@@ -3647,7 +3451,6 @@
return get_elliptic_curve(self.curve_factory.another_curve_name)
-
class EllipticCurveHashTests(TestCase):
"""
Tests for :py:type:`_EllipticCurve`\ 's implementation of hashing (thus use
@@ -3658,7 +3461,6 @@
if curve_factory.curve_name is None:
skip = "There are no curves available there can be no curve objects."
-
def test_contains(self):
"""
The ``in`` operator reports that a :py:type:`set` containing a curve
@@ -3668,7 +3470,6 @@
curves = set([curve])
self.assertIn(curve, curves)
-
def test_does_not_contain(self):
"""
The ``in`` operator reports that a :py:type:`set` not containing a
@@ -3679,6 +3480,5 @@
self.assertNotIn(curve, curves)
-
if __name__ == '__main__':
main()
diff --git a/OpenSSL/test/test_rand.py b/OpenSSL/test/test_rand.py
index 3053d1a..d5d75cb 100644
--- a/OpenSSL/test/test_rand.py
+++ b/OpenSSL/test/test_rand.py
@@ -17,8 +17,8 @@
class RandTests(TestCase):
def test_bytes_wrong_args(self):
"""
- :py:obj:`OpenSSL.rand.bytes` raises :py:obj:`TypeError` if called with the wrong
- number of arguments or with a non-:py:obj:`int` argument.
+ :py:obj:`OpenSSL.rand.bytes` raises :py:obj:`TypeError` if called with
+ the wrong number of arguments or with a non-:py:obj:`int` argument.
"""
self.assertRaises(TypeError, rand.bytes)
self.assertRaises(TypeError, rand.bytes, None)
@@ -40,7 +40,7 @@
b1 = rand.bytes(50)
self.assertEqual(len(b1), 50)
b2 = rand.bytes(num_bytes=50) # parameter by name
- self.assertNotEqual(b1, b2) # Hip, Hip, Horay! FIPS complaince
+ self.assertNotEqual(b1, b2) # Hip, Hip, Horay! FIPS complaince
b3 = rand.bytes(num_bytes=0)
self.assertEqual(len(b3), 0)
exc = self.assertRaises(ValueError, rand.bytes, -1)
@@ -48,8 +48,9 @@
def test_add_wrong_args(self):
"""
- When called with the wrong number of arguments, or with arguments not of
- type :py:obj:`str` and :py:obj:`int`, :py:obj:`OpenSSL.rand.add` raises :py:obj:`TypeError`.
+ When called with the wrong number of arguments, or with arguments not
+ of type :py:obj:`str` and :py:obj:`int`, :py:obj:`OpenSSL.rand.add`
+ raises :py:obj:`TypeError`.
"""
self.assertRaises(TypeError, rand.add)
self.assertRaises(TypeError, rand.add, b("foo"), None)
@@ -64,8 +65,9 @@
def test_seed_wrong_args(self):
"""
- When called with the wrong number of arguments, or with a non-:py:obj:`str`
- argument, :py:obj:`OpenSSL.rand.seed` raises :py:obj:`TypeError`.
+ When called with the wrong number of arguments, or with
+ a non-:py:obj:`str` argument, :py:obj:`OpenSSL.rand.seed` raises
+ :py:obj:`TypeError`.
"""
self.assertRaises(TypeError, rand.seed)
self.assertRaises(TypeError, rand.seed, None)
@@ -79,15 +81,15 @@
def test_status_wrong_args(self):
"""
- :py:obj:`OpenSSL.rand.status` raises :py:obj:`TypeError` when called with any
- arguments.
+ :py:obj:`OpenSSL.rand.status` raises :py:obj:`TypeError` when called
+ with any arguments.
"""
self.assertRaises(TypeError, rand.status, None)
def test_status(self):
"""
- :py:obj:`OpenSSL.rand.status` returns :py:obj:`True` if the PRNG has sufficient
- entropy, :py:obj:`False` otherwise.
+ :py:obj:`OpenSSL.rand.status` returns :py:obj:`True` if the PRNG has
+ sufficient entropy, :py:obj:`False` otherwise.
"""
# It's hard to know what it is actually going to return. Different
# OpenSSL random engines decide differently whether they have enough
@@ -96,8 +98,9 @@
def test_egd_wrong_args(self):
"""
- :py:obj:`OpenSSL.rand.egd` raises :py:obj:`TypeError` when called with the wrong
- number of arguments or with arguments not of type :py:obj:`str` and :py:obj:`int`.
+ :py:obj:`OpenSSL.rand.egd` raises :py:obj:`TypeError` when called with
+ the wrong number of arguments or with arguments not of type
+ :py:obj:`str` and :py:obj:`int`.
"""
self.assertRaises(TypeError, rand.egd)
self.assertRaises(TypeError, rand.egd, None)
@@ -130,22 +133,23 @@
def test_cleanup_wrong_args(self):
"""
- :py:obj:`OpenSSL.rand.cleanup` raises :py:obj:`TypeError` when called with any
- arguments.
+ :py:obj:`OpenSSL.rand.cleanup` raises :py:obj:`TypeError` when called
+ with any arguments.
"""
self.assertRaises(TypeError, rand.cleanup, None)
def test_cleanup(self):
"""
- :py:obj:`OpenSSL.rand.cleanup` releases the memory used by the PRNG and returns
- :py:obj:`None`.
+ :py:obj:`OpenSSL.rand.cleanup` releases the memory used by the PRNG and
+ returns :py:obj:`None`.
"""
self.assertIdentical(rand.cleanup(), None)
def test_load_file_wrong_args(self):
"""
- :py:obj:`OpenSSL.rand.load_file` raises :py:obj:`TypeError` when called the wrong
- number of arguments or arguments not of type :py:obj:`str` and :py:obj:`int`.
+ :py:obj:`OpenSSL.rand.load_file` raises :py:obj:`TypeError` when called
+ the wrong number of arguments or arguments not of type :py:obj:`str`
+ and :py:obj:`int`.
"""
self.assertRaises(TypeError, rand.load_file)
self.assertRaises(TypeError, rand.load_file, "foo", None)
@@ -154,8 +158,9 @@
def test_write_file_wrong_args(self):
"""
- :py:obj:`OpenSSL.rand.write_file` raises :py:obj:`TypeError` when called with the
- wrong number of arguments or a non-:py:obj:`str` argument.
+ :py:obj:`OpenSSL.rand.write_file` raises :py:obj:`TypeError` when
+ called with the wrong number of arguments or a non-:py:obj:`str`
+ argument.
"""
self.assertRaises(TypeError, rand.write_file)
self.assertRaises(TypeError, rand.write_file, None)
diff --git a/OpenSSL/tsafe.py b/OpenSSL/tsafe.py
index 28df8cb..ba17d73 100644
--- a/OpenSSL/tsafe.py
+++ b/OpenSSL/tsafe.py
@@ -6,6 +6,7 @@
_RLock = threading.RLock
del threading
+
class Connection:
def __init__(self, *args):
self._ssl_conn = _ssl.Connection(*args)
@@ -16,9 +17,9 @@
'setblocking', 'fileno', 'shutdown', 'close', 'get_cipher_list',
'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
'makefile', 'get_app_data', 'set_app_data', 'state_string',
- 'sock_shutdown', 'get_peer_certificate', 'get_peer_cert_chain', 'want_read',
- 'want_write', 'set_connect_state', 'set_accept_state',
- 'connect_ex', 'sendall'):
+ 'sock_shutdown', 'get_peer_certificate', 'get_peer_cert_chain',
+ 'want_read', 'want_write', 'set_connect_state',
+ 'set_accept_state', 'connect_ex', 'sendall'):
exec("""def %s(self, *args):
self._lock.acquire()
try: