Expose the OpenSSL API for using the system's default certificate store
diff --git a/ChangeLog b/ChangeLog
index 5a68f4a..dbb5894 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,14 @@
2008-12-28 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+ * src/ssl/context.c: Add a capath parameter to
+ Context.load_verify_locations to allow Python code to specify
+ either or both arguments to the underlying
+ SSL_CTX_load_verify_locations API.
+ * src/ssl/context.c: Add Context.set_default_verify_paths, a wrapper
+ around SSL_CTX_set_default_verify_paths.
+
+2008-12-28 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+
* test/test_crypto.py, src/crypto/x509req.c: Added get_version and
set_version_methods to X509ReqType based on patch from Wouter van
Bommel. Resolves LP#274418.
diff --git a/doc/pyOpenSSL.tex b/doc/pyOpenSSL.tex
index a1b303f..094d09f 100644
--- a/doc/pyOpenSSL.tex
+++ b/doc/pyOpenSSL.tex
@@ -764,9 +764,17 @@
when requesting a client certificate.
\end{methoddesc}
-\begin{methoddesc}[Context]{load_verify_locations}{pemfile}
-Specify where CA certificates for verification purposes are located. These are
-trusted certificates. Note that the certificates have to be in PEM format.
+\begin{methoddesc}[Context]{load_verify_locations}{pemfile, capath}
+Specify where CA certificates for verification purposes are located. These
+are trusted certificates. Note that the certificates have to be in PEM
+format. If capath is passed, it must be a directory prepared using the
+\code{c_rehash} tool included with OpenSSL. Either, but not both, of
+\var{pemfile} or \var{capath} may be \code{None}.
+\end{methoddesc}
+
+\begin{methoddesc}[Context]{set_default_verify_paths}{}
+Specify that the platform provided CA certificates are to be used for
+verification purposes.
\end{methoddesc}
\begin{methoddesc}[Context]{load_tmp_dh}{dhfile}
diff --git a/src/ssl/context.c b/src/ssl/context.c
index ed0eabe..00785d6 100644
--- a/src/ssl/context.c
+++ b/src/ssl/context.c
@@ -238,18 +238,20 @@
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
- cafile - Which file we can find the certificates\n\
+ cafile - In which file we can find the certificates\n\
+ capath - In which directory we can find the certificates\r\
Returns: None\n\
";
static PyObject *
-ssl_Context_load_verify_locations(ssl_ContextObj *self, PyObject *args)
-{
- char *cafile;
+ssl_Context_load_verify_locations(ssl_ContextObj *self, PyObject *args) {
+ char *cafile = NULL;
+ char *capath = NULL;
- if (!PyArg_ParseTuple(args, "s:load_verify_locations", &cafile))
+ if (!PyArg_ParseTuple(args, "z|z:load_verify_locations", &cafile, &capath)) {
return NULL;
+ }
- if (!SSL_CTX_load_verify_locations(self->ctx, cafile, NULL))
+ if (!SSL_CTX_load_verify_locations(self->ctx, cafile, capath))
{
exception_from_error_queue();
return NULL;
@@ -261,6 +263,33 @@
}
}
+static char ssl_Context_set_default_verify_paths_doc[] = "\n\
+Use the platform-specific CA certificate locations\n\
+\n\
+Arguments: self - The Context object\n\
+ args - None\n\
+\n\
+Returns: None\n\
+";
+static PyObject *
+ssl_Context_set_default_verify_paths(ssl_ContextObj *self, PyObject *args) {
+ if (!PyArg_ParseTuple(args, ":set_default_verify_paths")) {
+ return NULL;
+ }
+
+ /*
+ * XXX Error handling for SSL_CTX_set_default_verify_paths is untested.
+ * -exarkun
+ */
+ if (!SSL_CTX_set_default_verify_paths(self->ctx)) {
+ exception_from_error_queue();
+ return NULL;
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+};
+
+
static char ssl_Context_set_passwd_cb_doc[] = "\n\
Set the passphrase callback\n\
\n\
@@ -952,6 +981,7 @@
static PyMethodDef ssl_Context_methods[] = {
ADD_METHOD(load_verify_locations),
ADD_METHOD(set_passwd_cb),
+ ADD_METHOD(set_default_verify_paths),
ADD_METHOD(use_certificate_chain_file),
ADD_METHOD(use_certificate_file),
ADD_METHOD(use_certificate),
diff --git a/test/test_ssl.py b/test/test_ssl.py
index cd07cd5..de6c5e1 100644
--- a/test/test_ssl.py
+++ b/test/test_ssl.py
@@ -7,11 +7,13 @@
from unittest import TestCase
from tempfile import mktemp
from socket import socket
+from os import makedirs, symlink
+from os.path import join
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey
-from OpenSSL.SSL import WantReadError, Context, Connection
+from OpenSSL.SSL import WantReadError, Context, Connection, Error
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
-
+from OpenSSL.SSL import VERIFY_PEER
from OpenSSL.test.test_crypto import _Python23TestCaseHelper, cleartextCertificatePEM, cleartextPrivateKeyPEM
@@ -115,3 +117,131 @@
# Kind of lame. Just make sure it got called somehow.
self.assertTrue(called)
+
+
+ def _load_verify_locations_test(self, *args):
+ port = socket()
+ port.bind(('', 0))
+ port.listen(1)
+
+ client = socket()
+ client.setblocking(False)
+ client.connect_ex(port.getsockname())
+
+ clientContext = Context(TLSv1_METHOD)
+ clientContext.load_verify_locations(*args)
+ # Require that the server certificate verify properly or the
+ # connection will fail.
+ clientContext.set_verify(
+ VERIFY_PEER,
+ lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
+
+ clientSSL = Connection(clientContext, client)
+ clientSSL.set_connect_state()
+
+ server, _ = port.accept()
+ server.setblocking(False)
+
+ serverContext = Context(TLSv1_METHOD)
+ serverContext.use_certificate(
+ load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
+ serverContext.use_privatekey(
+ load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
+
+ serverSSL = Connection(serverContext, server)
+ serverSSL.set_accept_state()
+
+ for i in range(3):
+ for ssl in clientSSL, serverSSL:
+ try:
+ # Without load_verify_locations above, the handshake
+ # will fail:
+ # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
+ # 'certificate verify failed')]
+ ssl.do_handshake()
+ except WantReadError:
+ pass
+
+ cert = clientSSL.get_peer_certificate()
+ self.assertEqual(cert.get_subject().CN, 'pyopenssl.sf.net')
+
+ def test_load_verify_file(self):
+ """
+ L{Context.load_verify_locations} accepts a file name and uses the
+ certificates within for verification purposes.
+ """
+ cafile = self.mktemp()
+ fObj = file(cafile, 'w')
+ fObj.write(cleartextCertificatePEM)
+ fObj.close()
+
+ self._load_verify_locations_test(cafile)
+
+
+ def test_load_verify_invalid_file(self):
+ """
+ L{Context.load_verify_locations} raises L{Error} when passed a
+ non-existent cafile.
+ """
+ clientContext = Context(TLSv1_METHOD)
+ self.assertRaises(
+ Error, clientContext.load_verify_locations, self.mktemp())
+
+
+ def test_load_verify_directory(self):
+ """
+ L{Context.load_verify_locations} accepts a directory name and uses
+ the certificates within for verification purposes.
+ """
+ capath = self.mktemp()
+ makedirs(capath)
+ cafile = join(capath, 'cert.pem')
+ fObj = file(cafile, 'w')
+ fObj.write(cleartextCertificatePEM)
+ fObj.close()
+
+ # Hash value computed manually with c_rehash to avoid depending on
+ # c_rehash in the test suite.
+ symlink('cert.pem', join(capath, '07497d9e.0'))
+
+ self._load_verify_locations_test(None, capath)
+
+
+ def test_set_default_verify_paths(self):
+ """
+ L{Context.set_default_verify_paths} causes the platform-specific CA
+ certificate locations to be used for verification purposes.
+ """
+ # Testing this requires a server with a certificate signed by one of
+ # the CAs in the platform CA location. Getting one of those costs
+ # money. Fortunately (or unfortunately, depending on your
+ # perspective), it's easy to think of a public server on the
+ # internet which has such a certificate. Connecting to the network
+ # in a unit test is bad, but it's the only way I can think of to
+ # really test this. -exarkun
+
+ # Arg, verisign.com doesn't speak TLSv1
+ context = Context(SSLv3_METHOD)
+ context.set_default_verify_paths()
+ context.set_verify(
+ VERIFY_PEER,
+ lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
+
+ client = socket()
+ client.connect(('verisign.com', 443))
+ clientSSL = Connection(context, client)
+ clientSSL.set_connect_state()
+ clientSSL.do_handshake()
+ clientSSL.send('GET / HTTP/1.0\r\n\r\n')
+ self.assertTrue(clientSSL.recv(1024))
+
+
+ def test_set_default_verify_paths_signature(self):
+ """
+ L{Context.set_default_verify_paths} takes no arguments and raises
+ L{TypeError} if given any.
+ """
+ context = Context(TLSv1_METHOD)
+ self.assertRaises(TypeError, context.set_default_verify_paths, None)
+ self.assertRaises(TypeError, context.set_default_verify_paths, 1)
+ self.assertRaises(TypeError, context.set_default_verify_paths, "")