blob: 6b8d78a7abeb05da72fc1b487855393a30a81f7b [file] [log] [blame]
from functools import wraps
from OpenSSL.xSSL import *
from tls.c import api as _api
from OpenSSL.crypto import (
FILETYPE_PEM, _PassphraseHelper, PKey, X509, _raise_current_error)
_unspecified = object()
OPENSSL_VERSION_NUMBER = _api.OPENSSL_VERSION_NUMBER
SSLEAY_VERSION = _api.SSLEAY_VERSION
SSLEAY_CFLAGS = _api.SSLEAY_CFLAGS
SSLEAY_PLATFORM = _api.SSLEAY_PLATFORM
SSLEAY_DIR = _api.SSLEAY_DIR
SSLEAY_BUILT_ON = _api.SSLEAY_BUILT_ON
SENT_SHUTDOWN = _api.SSL_SENT_SHUTDOWN
RECEIVED_SHUTDOWN = _api.SSL_RECEIVED_SHUTDOWN
SSLv2_METHOD = 1
SSLv3_METHOD = 2
SSLv23_METHOD = 3
TLSv1_METHOD = 4
OP_NO_SSLv2 = _api.SSL_OP_NO_SSLv2
OP_NO_SSLv3 = _api.SSL_OP_NO_SSLv3
OP_SINGLE_DH_USE = _api.SSL_OP_SINGLE_DH_USE
VERIFY_PEER = _api.SSL_VERIFY_PEER
VERIFY_FAIL_IF_NO_PEER_CERT = _api.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
VERIFY_CLIENT_ONCE = _api.SSL_VERIFY_CLIENT_ONCE
VERIFY_NONE = _api.SSL_VERIFY_NONE
SESS_CACHE_OFF = _api.SSL_SESS_CACHE_OFF
SESS_CACHE_CLIENT = _api.SSL_SESS_CACHE_CLIENT
SESS_CACHE_SERVER = _api.SSL_SESS_CACHE_SERVER
SESS_CACHE_BOTH = _api.SSL_SESS_CACHE_BOTH
SESS_CACHE_NO_AUTO_CLEAR = _api.SSL_SESS_CACHE_NO_AUTO_CLEAR
SESS_CACHE_NO_INTERNAL_LOOKUP = _api.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
SESS_CACHE_NO_INTERNAL_STORE = _api.SSL_SESS_CACHE_NO_INTERNAL_STORE
SESS_CACHE_NO_INTERNAL = _api.SSL_SESS_CACHE_NO_INTERNAL
class Error(Exception):
pass
def SSLeay_version(type):
"""
Return a string describing the version of OpenSSL in use.
:param type: One of the SSLEAY_ constants defined in this module.
"""
return _api.string(_api.SSLeay_version(type))
class Context(object):
"""
:py:obj:`OpenSSL.SSL.Context` instances define the parameters for setting up
new SSL connections.
"""
_methods = {
# TODO
# SSLv2_METHOD: _api.SSLv2_method,
SSLv3_METHOD: _api.SSLv3_method,
TLSv1_METHOD: _api.TLSv1_method,
SSLv23_METHOD: _api.SSLv23_method,
}
def __init__(self, method):
"""
:param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or
TLSv1_METHOD.
"""
if not isinstance(method, int):
raise TypeError("method must be an integer")
try:
method_func = self._methods[method]
except KeyError:
raise ValueError("No such protocol")
method_obj = method_func()
context = _api.SSL_CTX_new(method_obj)
if context == _api.NULL:
1/0
self._context = context
self._passphrase_callback = None
self._verify_callback = None
self._info_callback = None
self._tlsext_servername_callback = None
self._passphrase_userdata = None
self._app_data = None
# SSL_CTX_set_app_data(self->ctx, self);
# SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |
# SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |
# SSL_MODE_AUTO_RETRY);
def load_verify_locations(self, cafile, capath=None):
"""
Let SSL know where we can find trusted certificates for the certificate
chain
:param cafile: In which file we can find the certificates
:param capath: In which directory we can find the certificates
:return: None
"""
if cafile is None:
cafile = _api.NULL
elif not isinstance(cafile, bytes):
raise TypeError("cafile must be None or a byte string")
if capath is None:
capath = _api.NULL
elif not isinstance(capath, bytes):
raise TypeError("capath must be None or a byte string")
load_result = _api.SSL_CTX_load_verify_locations(self._context, cafile, capath)
if not load_result:
_raise_current_error(Error)
def _wrap_callback(self, callback):
@wraps(callback)
def wrapped(size, verify, userdata):
return callback(size, verify, self._passphrase_userdata)
return _PassphraseHelper(
FILETYPE_PEM, wrapped, more_args=True, truncate=True)
def set_passwd_cb(self, callback, userdata=None):
"""
Set the passphrase callback
:param callback: The Python callback to use
:param userdata: (optional) A Python object which will be given as
argument to the callback
:return: None
"""
if not callable(callback):
raise TypeError("callback must be callable")
self._passphrase_helper = self._wrap_callback(callback)
self._passphrase_callback = self._passphrase_helper.callback
_api.SSL_CTX_set_default_passwd_cb(
self._context, self._passphrase_callback)
self._passphrase_userdata = userdata
def set_default_verify_paths(self):
"""
Use the platform-specific CA certificate locations
:return: None
"""
def use_certificate_chain_file(self, certfile):
"""
Load a certificate chain from a file
:param certfile: The name of the certificate chain file
:return: None
"""
def use_certificate_file(self, certfile, filetype=_unspecified):
"""
Load a certificate from a file
:param certfile: The name of the certificate file
:param filetype: (optional) The encoding of the file, default is PEM
:return: None
"""
def use_certificate(self, cert):
"""
Load a certificate from a X509 object
:param cert: The X509 object
:return: None
"""
def add_extra_chain_cert(self, certobj):
"""
Add certificate to chain
:param certobj: The X509 certificate object to add to the chain
:return: None
"""
if not isinstance(certobj, X509):
raise TypeError("certobj must be an X509 instance")
copy = _api.X509_dup(certobj._x509)
add_result = _api.SSL_CTX_add_extra_chain_cert(self._context, certobj._x509)
if not add_result:
# _api.X509_free(copy)
# _raise_current_error(Error)
1/0
def use_privatekey_file(self, keyfile, filetype=_unspecified):
"""
Load a private key from a file
:param keyfile: The name of the key file
:param filetype: (optional) The encoding of the file, default is PEM
:return: None
"""
if not isinstance(keyfile, bytes):
raise TypeError("keyfile must be a byte string")
if filetype is _unspecified:
filetype = FILETYPE_PEM
elif not isinstance(filetype, int):
raise TypeError("filetype must be an integer")
use_result = _api.SSL_CTX_use_PrivateKey_file(
self._context, keyfile, filetype)
if not use_result:
exception = self._passphrase_helper.raise_if_problem(Error)
if exception is not None:
raise exception
def use_privatekey(self, pkey):
"""
Load a private key from a PKey object
:param pkey: The PKey object
:return: None
"""
if not isinstance(pkey, PKey):
raise TypeError("pkey must be a PKey instance")
use_result = _api.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
if not use_result:
exception = self._passphrase_helper.raise_if_problem(Error)
if exception is not None:
raise exception
def check_privatekey(self):
"""
Check that the private key and certificate match up
:return: None (raises an exception if something's wrong)
"""
def load_client_ca(self, cafile):
"""
Load the trusted certificates that will be sent to the client (basically
telling the client "These are the guys I trust"). Does not actually
imply any of the certificates are trusted; that must be configured
separately.
:param cafile: The name of the certificates file
:return: None
"""
def set_session_id(self, buf):
"""
Set the session identifier. This is needed if you want to do session
resumption.
:param buf: A Python object that can be safely converted to a string
:returns: None
"""
def set_session_cache_mode(self, mode):
"""
Enable/disable session caching and specify the mode used.
:param mode: One or more of the SESS_CACHE_* flags (combine using
bitwise or)
:returns: The previously set caching mode.
"""
if not isinstance(mode, int):
raise TypeError("mode must be an integer")
return _api.SSL_CTX_set_session_cache_mode(self._context, mode)
def get_session_cache_mode(self):
"""
:returns: The currently used cache mode.
"""
return _api.SSL_CTX_get_session_cache_mode(self._context)
def set_verify(self, mode, callback):
"""
Set the verify mode and verify callback
:param mode: The verify mode, this is either VERIFY_NONE or
VERIFY_PEER combined with possible other flags
:param callback: The Python callback to use
:return: None
See SSL_CTX_set_verify(3SSL) for further details.
"""
if not isinstance(mode, int):
raise TypeError("mode must be an integer")
if not callable(callback):
raise TypeError("callback must be callable")
callback = _api.ffi.callback("int(*)(int, X509_STORE_CTX*)", callback)
_api.SSL_CTX_set_verify(self._context, mode, callback)
def set_verify_depth(self, depth):
"""
Set the verify depth
:param depth: An integer specifying the verify depth
:return: None
"""
if not isinstance(depth, int):
raise TypeError("depth must be an integer")
_api.SSL_CTX_set_verify_depth(self._context, depth)
def get_verify_mode(self):
"""
Get the verify mode
:return: The verify mode
"""
return _api.SSL_CTX_get_verify_mode(self._context)
def get_verify_depth(self):
"""
Get the verify depth
:return: The verify depth
"""
return _api.SSL_CTX_get_verify_depth(self._context)
def load_tmp_dh(self, dhfile):
"""
Load parameters for Ephemeral Diffie-Hellman
:param dhfile: The file to load EDH parameters from
:return: None
"""
if not isinstance(dhfile, bytes):
raise TypeError("dhfile must be a byte string")
bio = _api.BIO_new_file(dhfile, "r")
if bio == _api.NULL:
_raise_current_error(Error)
bio = _api.ffi.gc(bio, _api.BIO_free)
dh = _api.PEM_read_bio_DHparams(bio, _api.NULL, _api.NULL, _api.NULL)
dh = _api.ffi.gc(dh, _api.DH_free)
_api.SSL_CTX_set_tmp_dh(self._context, dh)
def set_cipher_list(self, cipher_list):
"""
Change the cipher list
:param cipher_list: A cipher list, see ciphers(1)
:return: None
"""
def set_client_ca_list(self, certificate_authorities):
"""
Set the list of preferred client certificate signers for this server context.
This list of certificate authorities will be sent to the client when the
server requests a client certificate.
:param certificate_authorities: a sequence of X509Names.
:return: None
"""
def add_client_ca(self, certificate_authority):
"""
Add the CA certificate to the list of preferred signers for this context.
The list of certificate authorities will be sent to the client when the
server requests a client certificate.
:param certificate_authority: certificate authority's X509 certificate.
:return: None
"""
def set_timeout(self, timeout):
"""
Set session timeout
:param timeout: The timeout in seconds
:return: The previous session timeout
"""
if not isinstance(timeout, int):
raise TypeError("timeout must be an integer")
return _api.SSL_CTX_set_timeout(self._context, timeout)
def get_timeout(self):
"""
Get the session timeout
:return: The session timeout
"""
return _api.SSL_CTX_get_timeout(self._context)
def set_info_callback(self, callback):
"""
Set the info callback
:param callback: The Python callback to use
:return: None
"""
def get_app_data(self):
"""
Get the application data (supplied via set_app_data())
:return: The application data
"""
return self._app_data
def set_app_data(self, data):
"""
Set the application data (will be returned from get_app_data())
:param data: Any Python object
:return: None
"""
self._app_data = data
def get_cert_store(self):
"""
Get the certificate store for the context
:return: A X509Store object
"""
def set_options(self, options):
"""
Add options. Options set before are not cleared!
:param options: The options to add.
:return: The new option bitmask.
"""
if not isinstance(options, options):
raise TypeError("options must be an integer")
return _api.SSL_CTX_set_options(self._context, options)
def set_mode(self, mode):
"""
Add modes via bitmask. Modes set before are not cleared!
:param mode: The mode to add.
:return: The new mode bitmask.
"""
if not isinstance(mode, int):
raise TypeError("mode must be an integer")
return _api.SSL_CTX_set_mode(self._context, mode)
def set_tlsext_servername_callback(self, callback):
"""
Specify a callback function to be called when clients specify a server name.
:param callback: The callback function. It will be invoked with one
argument, the Connection instance.
"""
ContextType = Context