blob: ae669d0bbbb436f3f23bb71679e1e6d54e1774c1 [file] [log] [blame]
/*
* context.c
*
* Copyright (C) AB Strakt 2001, All rights reserved
* Copyright (C) Jean-Paul Calderone 2008, All rights reserved
*
* SSL Context objects and their methods.
* See the file RATIONALE for a short explanation of why this module was written.
*
* Reviewed 2001-07-23
*/
#include <Python.h>
#if PY_VERSION_HEX >= 0x02050000
# define PYARG_PARSETUPLE_FORMAT const char
#else
# define PYARG_PARSETUPLE_FORMAT char
#endif
#ifndef MS_WINDOWS
# include <sys/socket.h>
# include <netinet/in.h>
# if !(defined(__BEOS__) || defined(__CYGWIN__))
# include <netinet/tcp.h>
# endif
#else
# include <winsock.h>
# include <wincrypt.h>
#endif
#define SSL_MODULE
#include "ssl.h"
/*
* CALLBACKS
*
* Callbacks work like this: We provide a "global" callback in C which
* transforms the arguments into a Python argument tuple and calls the
* corresponding Python callback, and then parsing the return value back into
* things the C function can return.
*
* Three caveats:
* + How do we find the Context object where the Python callbacks are stored?
* + What about multithreading and execution frames?
* + What about Python callbacks that raise exceptions?
*
* The solution to the first issue is trivial if the callback provides
* "userdata" functionality. Since the only callbacks that don't provide
* userdata do provide a pointer to an SSL structure, we can associate an SSL
* object and a Connection one-to-one via the SSL_set/get_app_data()
* functions.
*
* The solution to the other issue is to rewrite the Py_BEGIN_ALLOW_THREADS
* macro allowing it (or rather a new macro) to specify where to save the
* thread state (in our case, as a member of the Connection/Context object) so
* we can retrieve it again before calling the Python callback.
*/
/*
* Globally defined passphrase callback. This is called from OpenSSL
* internally. The GIL will not be held when this function is invoked. It
* must not be held when the function returns.
*
* Arguments: buf - Buffer to store the returned passphrase in
* maxlen - Maximum length of the passphrase
* verify - If true, the passphrase callback should ask for a
* password twice and verify they're equal. If false, only
* ask once.
* arg - User data, always a Context object
* Returns: The length of the password if successful, 0 otherwise
*/
static int
global_passphrase_callback(char *buf, int maxlen, int verify, void *arg)
{
/*
* Initialize len here because we're always going to return it, and we
* might jump to the return before it gets initialized in any other way.
*/
int len = 0;
char *str;
PyObject *argv, *ret = NULL;
ssl_ContextObj *ctx = (ssl_ContextObj *)arg;
/*
* GIL isn't held yet. First things first - acquire it, or any Python API
* we invoke might segfault or blow up the sun. The reverse will be done
* before returning.
*/
MY_END_ALLOW_THREADS(ctx->tstate);
/* The Python callback is called with a (maxlen,verify,userdata) tuple */
argv = Py_BuildValue("(iiO)", maxlen, verify, ctx->passphrase_userdata);
/*
* XXX Didn't check argv to see if it was NULL. -exarkun
*/
ret = PyEval_CallObject(ctx->passphrase_callback, argv);
Py_DECREF(argv);
if (ret == NULL) {
/*
* XXX The callback raised an exception. At the very least, it should
* be printed out here. An *actual* solution would be to raise it up
* through OpenSSL. That might be a bit tricky, but it's probably
* possible. -exarkun
*/
goto out;
}
if (!PyObject_IsTrue(ret)) {
/*
* Returned "", or None, or something. Treat it as no passphrase.
*/
Py_DECREF(ret);
goto out;
}
if (!PyString_Check(ret)) {
/*
* XXX Returned something that wasn't a string. This is bogus. We
* should report an error or raise an exception (again, through OpenSSL
* - tricky). -exarkun
*/
Py_DECREF(ret);
goto out;
}
len = PyString_Size(ret);
if (len > maxlen) {
/*
* XXX Returned more than we said they were allowed to return. Report
* an error or raise an exception (tricky blah blah). -exarkun
*/
len = maxlen;
}
str = PyString_AsString(ret);
strncpy(buf, str, len);
Py_XDECREF(ret);
out:
/*
* This function is returning into OpenSSL. Release the GIL again.
*/
MY_BEGIN_ALLOW_THREADS(ctx->tstate);
return len;
}
/*
* Globally defined verify callback
*
* Arguments: ok - True everything is OK "so far", false otherwise
* x509_ctx - Contains the certificate being checked, the current
* error number and depth, and the Connection we're
* dealing with
* Returns: True if everything is okay, false otherwise
*/
static int
global_verify_callback(int ok, X509_STORE_CTX *x509_ctx)
{
PyObject *argv, *ret;
SSL *ssl;
ssl_ConnectionObj *conn;
crypto_X509Obj *cert;
int errnum, errdepth, c_ret;
// Get Connection object to check thread state
ssl = (SSL *)X509_STORE_CTX_get_app_data(x509_ctx);
conn = (ssl_ConnectionObj *)SSL_get_app_data(ssl);
MY_END_ALLOW_THREADS(conn->tstate);
cert = crypto_X509_New(X509_STORE_CTX_get_current_cert(x509_ctx), 0);
errnum = X509_STORE_CTX_get_error(x509_ctx);
errdepth = X509_STORE_CTX_get_error_depth(x509_ctx);
argv = Py_BuildValue("(OOiii)", (PyObject *)conn, (PyObject *)cert,
errnum, errdepth, ok);
Py_DECREF(cert);
ret = PyEval_CallObject(conn->context->verify_callback, argv);
Py_DECREF(argv);
if (ret != NULL && PyObject_IsTrue(ret)) {
X509_STORE_CTX_set_error(x509_ctx, X509_V_OK);
Py_DECREF(ret);
c_ret = 1;
} else {
c_ret = 0;
}
MY_BEGIN_ALLOW_THREADS(conn->tstate);
return c_ret;
}
/*
* Globally defined info callback. This is called from OpenSSL internally.
* The GIL will not be held when this function is invoked. It must not be held
* when the function returns.
*
* Arguments: ssl - The Connection
* where - The part of the SSL code that called us
* _ret - The return code of the SSL function that called us
* Returns: None
*/
static void
global_info_callback(const SSL *ssl, int where, int _ret)
{
ssl_ConnectionObj *conn = (ssl_ConnectionObj *)SSL_get_app_data(ssl);
PyObject *argv, *ret;
/*
* GIL isn't held yet. First things first - acquire it, or any Python API
* we invoke might segfault or blow up the sun. The reverse will be done
* before returning.
*/
MY_END_ALLOW_THREADS(conn->tstate);
argv = Py_BuildValue("(Oii)", (PyObject *)conn, where, _ret);
ret = PyEval_CallObject(conn->context->info_callback, argv);
Py_DECREF(argv);
if (ret == NULL) {
/*
* XXX - This should be reported somehow. -exarkun
*/
PyErr_Clear();
} else {
Py_DECREF(ret);
}
/*
* This function is returning into OpenSSL. Release the GIL again.
*/
MY_BEGIN_ALLOW_THREADS(conn->tstate);
return;
}
static char ssl_Context_doc[] = "\n\
Context(method) -> Context instance\n\
\n\
OpenSSL.SSL.Context instances define the parameters for setting up new SSL\n\
connections.\n\
\n\
@param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or\n\
TLSv1_METHOD.\n\
";
static char ssl_Context_load_verify_locations_doc[] = "\n\
Let SSL know where we can find trusted certificates for the certificate\n\
chain\n\
\n\
@param cafile: In which file we can find the certificates\n\
@param capath: In which directory we can find the certificates\n\
@return: None\n\
";
static PyObject *
ssl_Context_load_verify_locations(ssl_ContextObj *self, PyObject *args) {
char *cafile = NULL;
char *capath = NULL;
if (!PyArg_ParseTuple(args, "z|z:load_verify_locations", &cafile, &capath)) {
return NULL;
}
if (!SSL_CTX_load_verify_locations(self->ctx, cafile, capath))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_set_default_verify_paths_doc[] = "\n\
Use the platform-specific CA certificate locations\n\
\n\
@return: None\n\
";
static PyObject *
ssl_Context_set_default_verify_paths(ssl_ContextObj *self, PyObject *args) {
if (!PyArg_ParseTuple(args, ":set_default_verify_paths")) {
return NULL;
}
/*
* XXX Error handling for SSL_CTX_set_default_verify_paths is untested.
* -exarkun
*/
if (!SSL_CTX_set_default_verify_paths(self->ctx)) {
exception_from_error_queue();
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
};
static char ssl_Context_set_passwd_cb_doc[] = "\n\
Set the passphrase callback\n\
\n\
@param callback: The Python callback to use\n\
@param userdata: (optional) A Python object which will be given as\n\
argument to the callback\n\
@return: None\n\
";
static PyObject *
ssl_Context_set_passwd_cb(ssl_ContextObj *self, PyObject *args)
{
PyObject *callback = NULL, *userdata = Py_None;
if (!PyArg_ParseTuple(args, "O|O:set_passwd_cb", &callback, &userdata))
return NULL;
if (!PyCallable_Check(callback))
{
PyErr_SetString(PyExc_TypeError, "expected PyCallable");
return NULL;
}
Py_DECREF(self->passphrase_callback);
Py_INCREF(callback);
self->passphrase_callback = callback;
SSL_CTX_set_default_passwd_cb(self->ctx, global_passphrase_callback);
Py_DECREF(self->passphrase_userdata);
Py_INCREF(userdata);
self->passphrase_userdata = userdata;
SSL_CTX_set_default_passwd_cb_userdata(self->ctx, (void *)self);
Py_INCREF(Py_None);
return Py_None;
}
static crypto_X509Obj *
parse_certificate_argument(const char* format1, const char* format2, PyObject* args)
{
static PyTypeObject *crypto_X509_type = NULL;
crypto_X509Obj *cert;
/* We need to check that cert really is an X509 object before
we deal with it. The problem is we can't just quickly verify
the type (since that comes from another module). This should
do the trick (reasonably well at least): Once we have one
verified object, we use it's type object for future
comparisons. */
if (!crypto_X509_type)
{
if (!PyArg_ParseTuple(args, (PYARG_PARSETUPLE_FORMAT *)format1, &cert))
return NULL;
if (strcmp(cert->ob_type->tp_name, "X509") != 0 ||
cert->ob_type->tp_basicsize != sizeof(crypto_X509Obj))
{
PyErr_SetString(PyExc_TypeError, "Expected an X509 object");
return NULL;
}
crypto_X509_type = cert->ob_type;
}
else
if (!PyArg_ParseTuple(args, (PYARG_PARSETUPLE_FORMAT *)format2, crypto_X509_type,
&cert))
return NULL;
return cert;
}
static char ssl_Context_add_extra_chain_cert_doc[] = "\n\
Add certificate to chain\n\
\n\
@param certobj: The X509 certificate object to add to the chain\n\
@return: None\n\
";
static PyObject *
ssl_Context_add_extra_chain_cert(ssl_ContextObj *self, PyObject *args)
{
X509* cert_original;
crypto_X509Obj *cert = parse_certificate_argument(
"O:add_extra_chain_cert", "O!:add_extra_chain_cert", args);
if (cert == NULL)
{
return NULL;
}
if (!(cert_original = X509_dup(cert->x509)))
{
/* exception_from_error_queue(); */
PyErr_SetString(PyExc_RuntimeError, "X509_dup failed");
return NULL;
}
if (!SSL_CTX_add_extra_chain_cert(self->ctx, cert_original))
{
X509_free(cert_original);
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_certificate_chain_file_doc[] = "\n\
Load a certificate chain from a file\n\
\n\
@param certfile: The name of the certificate chain file\n\
@return: None\n\
";
static PyObject *
ssl_Context_use_certificate_chain_file(ssl_ContextObj *self, PyObject *args)
{
char *certfile;
if (!PyArg_ParseTuple(args, "s:use_certificate_chain_file", &certfile))
return NULL;
if (!SSL_CTX_use_certificate_chain_file(self->ctx, certfile))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_certificate_file_doc[] = "\n\
Load a certificate from a file\n\
\n\
@param certfile: The name of the certificate file\n\
@param filetype: (optional) The encoding of the file, default is PEM\n\
@return: None\n\
";
static PyObject *
ssl_Context_use_certificate_file(ssl_ContextObj *self, PyObject *args)
{
char *certfile;
int filetype = SSL_FILETYPE_PEM;
if (!PyArg_ParseTuple(args, "s|i:use_certificate_file", &certfile, &filetype))
return NULL;
if (!SSL_CTX_use_certificate_file(self->ctx, certfile, filetype))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_certificate_doc[] = "\n\
Load a certificate from a X509 object\n\
\n\
@param cert: The X509 object\n\
@return: None\n\
";
static PyObject *
ssl_Context_use_certificate(ssl_ContextObj *self, PyObject *args)
{
crypto_X509Obj *cert = parse_certificate_argument(
"O:use_certificate", "O!:use_certificate", args);
if (cert == NULL) {
return NULL;
}
if (!SSL_CTX_use_certificate(self->ctx, cert->x509))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_privatekey_file_doc[] = "\n\
Load a private key from a file\n\
\n\
@param keyfile: The name of the key file\n\
@param filetype: (optional) The encoding of the file, default is PEM\n\
@return: None\n\
";
static PyObject *
ssl_Context_use_privatekey_file(ssl_ContextObj *self, PyObject *args)
{
char *keyfile;
int filetype = SSL_FILETYPE_PEM, ret;
if (!PyArg_ParseTuple(args, "s|i:use_privatekey_file", &keyfile, &filetype))
return NULL;
MY_BEGIN_ALLOW_THREADS(self->tstate);
ret = SSL_CTX_use_PrivateKey_file(self->ctx, keyfile, filetype);
MY_END_ALLOW_THREADS(self->tstate);
if (PyErr_Occurred())
{
flush_error_queue();
return NULL;
}
if (!ret)
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_privatekey_doc[] = "\n\
Load a private key from a PKey object\n\
\n\
@param pkey: The PKey object\n\
@return: None\n\
";
static PyObject *
ssl_Context_use_privatekey(ssl_ContextObj *self, PyObject *args)
{
static PyTypeObject *crypto_PKey_type = NULL;
crypto_PKeyObj *pkey;
/* We need to check that cert really is a PKey object before
we deal with it. The problem is we can't just quickly verify
the type (since that comes from another module). This should
do the trick (reasonably well at least): Once we have one
verified object, we use it's type object for future
comparisons. */
if (!crypto_PKey_type)
{
if (!PyArg_ParseTuple(args, "O:use_privatekey", &pkey))
return NULL;
if (strcmp(pkey->ob_type->tp_name, "OpenSSL.crypto.PKey") != 0 ||
pkey->ob_type->tp_basicsize != sizeof(crypto_PKeyObj))
{
PyErr_SetString(PyExc_TypeError, "Expected a PKey object");
return NULL;
}
crypto_PKey_type = pkey->ob_type;
}
else
if (!PyArg_ParseTuple(args, "O!:use_privatekey", crypto_PKey_type, &pkey))
return NULL;
if (!SSL_CTX_use_PrivateKey(self->ctx, pkey->pkey))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_check_privatekey_doc[] = "\n\
Check that the private key and certificate match up\n\
\n\
@return: None (raises an exception if something's wrong)\n\
";
static PyObject *
ssl_Context_check_privatekey(ssl_ContextObj *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ":check_privatekey"))
return NULL;
if (!SSL_CTX_check_private_key(self->ctx))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_load_client_ca_doc[] = "\n\
Load the trusted certificates that will be sent to the client (basically\n\
telling the client \"These are the guys I trust\")\n\
\n\
@param cafile: The name of the certificates file\n\
@return: None\n\
";
static PyObject *
ssl_Context_load_client_ca(ssl_ContextObj *self, PyObject *args)
{
char *cafile;
if (!PyArg_ParseTuple(args, "s:load_client_ca", &cafile))
return NULL;
SSL_CTX_set_client_CA_list(self->ctx, SSL_load_client_CA_file(cafile));
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_set_session_id_doc[] = "\n\
Set the session identifier, this is needed if you want to do session\n\
resumption (which, ironically, isn't implemented yet)\n\
\n\
@param buf: A Python object that can be safely converted to a string\n\
@returns: None\n\
";
static PyObject *
ssl_Context_set_session_id(ssl_ContextObj *self, PyObject *args)
{
unsigned char *buf;
unsigned int len;
if (!PyArg_ParseTuple(args, "s#:set_session_id", &buf, &len))
return NULL;
if (!SSL_CTX_set_session_id_context(self->ctx, buf, len))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_set_verify_doc[] = "\n\
Set the verify mode and verify callback\n\
\n\
@param mode: The verify mode, this is either VERIFY_NONE or\n\
VERIFY_PEER combined with possible other flags\n\
@param callback: The Python callback to use\n\
@return: None\n\
\n\
See SSL_CTX_set_verify(3SSL) for further details.\n\
";
static PyObject *
ssl_Context_set_verify(ssl_ContextObj *self, PyObject *args)
{
int mode;
PyObject *callback = NULL;
if (!PyArg_ParseTuple(args, "iO:set_verify", &mode, &callback))
return NULL;
if (!PyCallable_Check(callback))
{
PyErr_SetString(PyExc_TypeError, "expected PyCallable");
return NULL;
}
Py_DECREF(self->verify_callback);
Py_INCREF(callback);
self->verify_callback = callback;
SSL_CTX_set_verify(self->ctx, mode, global_verify_callback);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_set_verify_depth_doc[] = "\n\
Set the verify depth\n\
\n\
@param depth: An integer specifying the verify depth\n\
@return: None\n\
";
static PyObject *
ssl_Context_set_verify_depth(ssl_ContextObj *self, PyObject *args)
{
int depth;
if (!PyArg_ParseTuple(args, "i:set_verify_depth", &depth))
return NULL;
SSL_CTX_set_verify_depth(self->ctx, depth);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_get_verify_mode_doc[] = "\n\
Get the verify mode\n\
\n\
@return: The verify mode\n\
";
static PyObject *
ssl_Context_get_verify_mode(ssl_ContextObj *self, PyObject *args)
{
int mode;
if (!PyArg_ParseTuple(args, ":get_verify_mode"))
return NULL;
mode = SSL_CTX_get_verify_mode(self->ctx);
return PyInt_FromLong((long)mode);
}
static char ssl_Context_get_verify_depth_doc[] = "\n\
Get the verify depth\n\
\n\
@return: The verify depth\n\
";
static PyObject *
ssl_Context_get_verify_depth(ssl_ContextObj *self, PyObject *args)
{
int depth;
if (!PyArg_ParseTuple(args, ":get_verify_depth"))
return NULL;
depth = SSL_CTX_get_verify_depth(self->ctx);
return PyInt_FromLong((long)depth);
}
static char ssl_Context_load_tmp_dh_doc[] = "\n\
Load parameters for Ephemeral Diffie-Hellman\n\
\n\
@param dhfile: The file to load EDH parameters from\n\
@return: None\n\
";
static PyObject *
ssl_Context_load_tmp_dh(ssl_ContextObj *self, PyObject *args)
{
char *dhfile;
BIO *bio;
DH *dh;
if (!PyArg_ParseTuple(args, "s:load_tmp_dh", &dhfile))
return NULL;
bio = BIO_new_file(dhfile, "r");
if (bio == NULL)
return PyErr_NoMemory();
dh = PEM_read_bio_DHparams(bio, NULL, NULL, NULL);
SSL_CTX_set_tmp_dh(self->ctx, dh);
DH_free(dh);
BIO_free(bio);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_set_cipher_list_doc[] = "\n\
Change the cipher list\n\
\n\
@param cipher_list: A cipher list, see ciphers(1)\n\
@return: None\n\
";
static PyObject *
ssl_Context_set_cipher_list(ssl_ContextObj *self, PyObject *args)
{
char *cipher_list;
if (!PyArg_ParseTuple(args, "s:set_cipher_list", &cipher_list))
return NULL;
if (!SSL_CTX_set_cipher_list(self->ctx, cipher_list))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_set_timeout_doc[] = "\n\
Set session timeout\n\
\n\
@param timeout: The timeout in seconds\n\
@return: The previous session timeout\n\
";
static PyObject *
ssl_Context_set_timeout(ssl_ContextObj *self, PyObject *args)
{
long t, ret;
if (!PyArg_ParseTuple(args, "l:set_timeout", &t))
return NULL;
ret = SSL_CTX_set_timeout(self->ctx, t);
return PyLong_FromLong(ret);
}
static char ssl_Context_get_timeout_doc[] = "\n\
Get the session timeout\n\
\n\
@return: The session timeout\n\
";
static PyObject *
ssl_Context_get_timeout(ssl_ContextObj *self, PyObject *args)
{
long ret;
if (!PyArg_ParseTuple(args, ":get_timeout"))
return NULL;
ret = SSL_CTX_get_timeout(self->ctx);
return PyLong_FromLong(ret);
}
static char ssl_Context_set_info_callback_doc[] = "\n\
Set the info callback\n\
\n\
@param callback: The Python callback to use\n\
@return: None\n\
";
static PyObject *
ssl_Context_set_info_callback(ssl_ContextObj *self, PyObject *args)
{
PyObject *callback;
if (!PyArg_ParseTuple(args, "O:set_info_callback", &callback))
return NULL;
if (!PyCallable_Check(callback))
{
PyErr_SetString(PyExc_TypeError, "expected PyCallable");
return NULL;
}
Py_DECREF(self->info_callback);
Py_INCREF(callback);
self->info_callback = callback;
SSL_CTX_set_info_callback(self->ctx, global_info_callback);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_get_app_data_doc[] = "\n\
Get the application data (supplied via set_app_data())\n\
\n\
@return: The application data\n\
";
static PyObject *
ssl_Context_get_app_data(ssl_ContextObj *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ":get_app_data"))
return NULL;
Py_INCREF(self->app_data);
return self->app_data;
}
static char ssl_Context_set_app_data_doc[] = "\n\
Set the application data (will be returned from get_app_data())\n\
\n\
@param data: Any Python object\n\
@return: None\n\
";
static PyObject *
ssl_Context_set_app_data(ssl_ContextObj *self, PyObject *args)
{
PyObject *data;
if (!PyArg_ParseTuple(args, "O:set_app_data", &data))
return NULL;
Py_DECREF(self->app_data);
Py_INCREF(data);
self->app_data = data;
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_get_cert_store_doc[] = "\n\
Get the certificate store for the context\n\
\n\
@return: A X509Store object\n\
";
static PyObject *
ssl_Context_get_cert_store(ssl_ContextObj *self, PyObject *args)
{
X509_STORE *store;
if (!PyArg_ParseTuple(args, ":get_cert_store"))
return NULL;
if ((store = SSL_CTX_get_cert_store(self->ctx)) == NULL)
{
Py_INCREF(Py_None);
return Py_None;
}
else
{
return (PyObject *)crypto_X509Store_New(store, 0);
}
}
static char ssl_Context_set_options_doc[] = "\n\
Add options. Options set before are not cleared!\n\
\n\
@param options: The options to add.\n\
@return: The new option bitmask.\n\
";
static PyObject *
ssl_Context_set_options(ssl_ContextObj *self, PyObject *args)
{
long options;
if (!PyArg_ParseTuple(args, "l:set_options", &options))
return NULL;
return PyInt_FromLong(SSL_CTX_set_options(self->ctx, options));
}
/*
* Member methods in the Context object
* ADD_METHOD(name) expands to a correct PyMethodDef declaration
* { 'name', (PyCFunction)ssl_Context_name, METH_VARARGS }
* for convenience
* ADD_ALIAS(name,real) creates an "alias" of the ssl_Context_real
* function with the name 'name'
*/
#define ADD_METHOD(name) { #name, (PyCFunction)ssl_Context_##name, METH_VARARGS, ssl_Context_##name##_doc }
static PyMethodDef ssl_Context_methods[] = {
ADD_METHOD(load_verify_locations),
ADD_METHOD(set_passwd_cb),
ADD_METHOD(set_default_verify_paths),
ADD_METHOD(use_certificate_chain_file),
ADD_METHOD(use_certificate_file),
ADD_METHOD(use_certificate),
ADD_METHOD(add_extra_chain_cert),
ADD_METHOD(use_privatekey_file),
ADD_METHOD(use_privatekey),
ADD_METHOD(check_privatekey),
ADD_METHOD(load_client_ca),
ADD_METHOD(set_session_id),
ADD_METHOD(set_verify),
ADD_METHOD(set_verify_depth),
ADD_METHOD(get_verify_mode),
ADD_METHOD(get_verify_depth),
ADD_METHOD(load_tmp_dh),
ADD_METHOD(set_cipher_list),
ADD_METHOD(set_timeout),
ADD_METHOD(get_timeout),
ADD_METHOD(set_info_callback),
ADD_METHOD(get_app_data),
ADD_METHOD(set_app_data),
ADD_METHOD(get_cert_store),
ADD_METHOD(set_options),
{ NULL, NULL }
};
#undef ADD_METHOD
/*
* Despite the name which might suggest otherwise, this is not the tp_init for
* the Context type. It's just the common initialization code shared by the
* two _{Nn}ew functions below.
*/
static ssl_ContextObj*
ssl_Context_init(ssl_ContextObj *self, int i_method) {
SSL_METHOD *method;
switch (i_method) {
case ssl_SSLv2_METHOD:
method = SSLv2_method();
break;
case ssl_SSLv23_METHOD:
method = SSLv23_method();
break;
case ssl_SSLv3_METHOD:
method = SSLv3_method();
break;
case ssl_TLSv1_METHOD:
method = TLSv1_method();
break;
default:
PyErr_SetString(PyExc_ValueError, "No such protocol");
return NULL;
}
self->ctx = SSL_CTX_new(method);
Py_INCREF(Py_None);
self->passphrase_callback = Py_None;
Py_INCREF(Py_None);
self->verify_callback = Py_None;
Py_INCREF(Py_None);
self->info_callback = Py_None;
Py_INCREF(Py_None);
self->passphrase_userdata = Py_None;
Py_INCREF(Py_None);
self->app_data = Py_None;
/* Some initialization that's required to operate smoothly in Python */
SSL_CTX_set_app_data(self->ctx, self);
SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |
SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |
SSL_MODE_AUTO_RETRY);
self->tstate = NULL;
return self;
}
/*
* This one is exposed in the CObject API. I want to deprecate it.
*/
ssl_ContextObj*
ssl_Context_New(int i_method) {
ssl_ContextObj *self;
self = PyObject_GC_New(ssl_ContextObj, &ssl_Context_Type);
if (self == NULL) {
return (ssl_ContextObj *)PyErr_NoMemory();
}
self = ssl_Context_init(self, i_method);
PyObject_GC_Track((PyObject *)self);
return self;
}
/*
* This one is the tp_new of the Context type. It's great.
*/
static PyObject*
ssl_Context_new(PyTypeObject *subtype, PyObject *args, PyObject *kwargs) {
int i_method;
ssl_ContextObj *self;
static char *kwlist[] = {"method", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i:Context", kwlist, &i_method)) {
return NULL;
}
self = (ssl_ContextObj *)subtype->tp_alloc(subtype, 1);
if (self == NULL) {
return NULL;
}
return (PyObject *)ssl_Context_init(self, i_method);
}
/*
* Call the visitproc on all contained objects.
*
* Arguments: self - The Context object
* visit - Function to call
* arg - Extra argument to visit
* Returns: 0 if all goes well, otherwise the return code from the first
* call that gave non-zero result.
*/
static int
ssl_Context_traverse(ssl_ContextObj *self, visitproc visit, void *arg)
{
int ret = 0;
if (ret == 0 && self->passphrase_callback != NULL)
ret = visit((PyObject *)self->passphrase_callback, arg);
if (ret == 0 && self->passphrase_userdata != NULL)
ret = visit((PyObject *)self->passphrase_userdata, arg);
if (ret == 0 && self->verify_callback != NULL)
ret = visit((PyObject *)self->verify_callback, arg);
if (ret == 0 && self->info_callback != NULL)
ret = visit((PyObject *)self->info_callback, arg);
if (ret == 0 && self->app_data != NULL)
ret = visit(self->app_data, arg);
return ret;
}
/*
* Decref all contained objects and zero the pointers.
*
* Arguments: self - The Context object
* Returns: Always 0.
*/
static int
ssl_Context_clear(ssl_ContextObj *self)
{
Py_XDECREF(self->passphrase_callback);
self->passphrase_callback = NULL;
Py_XDECREF(self->passphrase_userdata);
self->passphrase_userdata = NULL;
Py_XDECREF(self->verify_callback);
self->verify_callback = NULL;
Py_XDECREF(self->info_callback);
self->info_callback = NULL;
Py_XDECREF(self->app_data);
self->app_data = NULL;
return 0;
}
/*
* Deallocate the memory used by the Context object
*
* Arguments: self - The Context object
* Returns: None
*/
static void
ssl_Context_dealloc(ssl_ContextObj *self)
{
PyObject_GC_UnTrack((PyObject *)self);
SSL_CTX_free(self->ctx);
ssl_Context_clear(self);
PyObject_GC_Del(self);
}
PyTypeObject ssl_Context_Type = {
PyObject_HEAD_INIT(NULL)
0,
"OpenSSL.SSL.Context",
sizeof(ssl_ContextObj),
0,
(destructor)ssl_Context_dealloc, /* tp_dealloc */
NULL, /* print */
NULL, /* tp_getattr */
NULL, /* setattr */
NULL, /* compare */
NULL, /* repr */
NULL, /* as_number */
NULL, /* as_sequence */
NULL, /* as_mapping */
NULL, /* hash */
NULL, /* call */
NULL, /* str */
NULL, /* getattro */
NULL, /* setattro */
NULL, /* as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE, /* tp_flags */
ssl_Context_doc, /* tp_doc */
(traverseproc)ssl_Context_traverse, /* tp_traverse */
(inquiry)ssl_Context_clear, /* tp_clear */
NULL, /* tp_richcompare */
0, /* tp_weaklistoffset */
NULL, /* tp_iter */
NULL, /* tp_iternext */
ssl_Context_methods, /* tp_methods */
NULL, /* tp_members */
NULL, /* tp_getset */
NULL, /* tp_base */
NULL, /* tp_dict */
NULL, /* tp_descr_get */
NULL, /* tp_descr_set */
0, /* tp_dictoffset */
NULL, /* tp_init */
NULL, /* tp_alloc */
ssl_Context_new, /* tp_new */
};
/*
* Initialize the Context part of the SSL sub module
*
* Arguments: dict - The OpenSSL.SSL module
* Returns: 1 for success, 0 otherwise
*/
int
init_ssl_context(PyObject *module) {
if (PyType_Ready(&ssl_Context_Type) < 0) {
return 0;
}
if (PyModule_AddObject(module, "Context", (PyObject *)&ssl_Context_Type) < 0) {
return 0;
}
if (PyModule_AddObject(module, "ContextType", (PyObject *)&ssl_Context_Type) < 0) {
return 0;
}
return 1;
}