new branch is changes lp:~rick-fdd/pyopenssl/pkcs12_mod_and_export applied to trunk
diff --git a/src/crypto/crypto.c b/src/crypto/crypto.c
index 21501c1..4a25eae 100644
--- a/src/crypto/crypto.c
+++ b/src/crypto/crypto.c
@@ -12,6 +12,7 @@
#include <Python.h>
#define crypto_MODULE
#include "crypto.h"
+#include "pkcs12.h"
static char crypto_doc[] = "\n\
Main file of crypto sub module.\n\
@@ -493,7 +494,6 @@
static PyObject *
crypto_load_pkcs12(PyObject *spam, PyObject *args)
{
- crypto_PKCS12Obj *crypto_PKCS12_New(PKCS12 *, char *);
int len;
char *buffer, *passphrase = NULL;
BIO *bio;
@@ -515,6 +515,23 @@
}
+static char crypto_PKCS12_doc[] = "\n\
+The factory function inserted in the module dictionary to create PKCS12\n\
+objects\n\
+\n\
+Arguments: spam - Always NULL\n\
+ args - The Python argument tuple, should be empty\n\
+Returns: The PKCS12 object\n\
+";
+
+static crypto_PKCS12Obj *
+crypto_PKCS12(PyObject *spam, PyObject *args)
+{
+ if (!PyArg_ParseTuple(args, ":PKCS12"))
+ return NULL;
+ return crypto_PKCS12_New(NULL, NULL);
+}
+
static char crypto_X509_verify_cert_error_string_doc[] = "\n\
Get X509 verify certificate error string.\n\
\n\
@@ -559,6 +576,7 @@
{ "load_pkcs12", (PyCFunction)crypto_load_pkcs12, METH_VARARGS, crypto_load_pkcs12_doc },
{ "X509_verify_cert_error_string", (PyCFunction)crypto_X509_verify_cert_error_string, METH_VARARGS, crypto_X509_verify_cert_error_string_doc },
{ "_exception_from_error_queue", (PyCFunction)crypto_exception_from_error_queue, METH_NOARGS, crypto_exception_from_error_queue_doc },
+ { "PKCS12", (PyCFunction)crypto_PKCS12, METH_VARARGS|METH_VARARGS, crypto_PKCS12_doc },
{ NULL, NULL }
};
diff --git a/src/crypto/pkcs12.c b/src/crypto/pkcs12.c
index 28ea2fe..de844d9 100644
--- a/src/crypto/pkcs12.c
+++ b/src/crypto/pkcs12.c
@@ -36,19 +36,86 @@
return self->cert;
}
+static char crypto_PKCS12_set_certificate_doc[] = "\n\
+Replace or set the certificate portion of the PKCS12 structure\n\
+\n\
+Arguments: self - The PKCS12 object\n\
+ args - The Python argument tuple \n\
+ cert - The new certificate\n\
+Returns: self\n\
+";
+static crypto_PKCS12Obj *
+crypto_PKCS12_set_certificate(crypto_PKCS12Obj *self, PyObject *args, PyObject *keywds)
+{
+ PyObject *cert = NULL;
+ static char *kwlist[] = {"cert", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, keywds, "O:set_certificate",
+ kwlist, &cert))
+ return NULL;
+
+ if (cert != Py_None && ! PyObject_IsInstance(cert, (PyObject *) &crypto_X509_Type)) {
+ PyErr_SetString(PyExc_TypeError, "cert must be type X509 or None");
+ return NULL;
+ }
+
+ Py_INCREF(cert); /* Make consistent before calling Py_DECREF() */
+ if(self->cert) {
+ Py_DECREF(self->cert);
+ }
+ self->cert = cert;
+
+ Py_INCREF(self);
+ return self;
+}
+
static char crypto_PKCS12_get_privatekey_doc[] = "\n\
Return private key portion of the PKCS12 structure\n\
\n\
@returns: PKey object containing the private key\n\
";
-static PyObject *
+//static PyObject *
+static crypto_PKeyObj *
crypto_PKCS12_get_privatekey(crypto_PKCS12Obj *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ":get_privatekey"))
return NULL;
Py_INCREF(self->key);
- return self->key;
+ return (PyObject *) self->key;
+}
+
+static char crypto_PKCS12_set_privatekey_doc[] = "\n\
+Replace or set the privatekey portion of the PKCS12 structure\n\
+\n\
+Arguments: self - The PKCS12 object\n\
+ args - The Python argument tuple \n\
+ pkey - The new private key\n\
+Returns: self\n\
+";
+static crypto_PKCS12Obj *
+crypto_PKCS12_set_privatekey(crypto_PKCS12Obj *self, PyObject *args, PyObject *keywds)
+{
+ PyObject *pkey = NULL;
+ static char *kwlist[] = {"pkey", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, keywds, "O:set_privatekey",
+ kwlist, &pkey))
+ return NULL;
+
+ if (pkey != Py_None && ! PyObject_IsInstance(pkey, (PyObject *) &crypto_PKey_Type)) {
+ PyErr_SetString(PyExc_TypeError, "pkey must be type X509 or None");
+ return NULL;
+ }
+
+ Py_INCREF(pkey); /* Make consistent before calling Py_DECREF() */
+ if(self->key) {
+ Py_DECREF(self->key);
+ }
+ self->key = pkey;
+
+ Py_INCREF(self);
+ return self;
}
static char crypto_PKCS12_get_ca_certificates_doc[] = "\n\
@@ -67,6 +134,119 @@
return self->cacerts;
}
+static char crypto_PKCS12_set_ca_certificates_doc[] = "\n\
+Replace or set the ca_certificates portion of the PKCS12 structure\n\
+\n\
+Arguments: self - The PKCS12 object\n\
+ args - The Python argument tuple \n\
+ cacerts - The new ca_certificates\n\
+Returns: self\n\
+";
+static crypto_PKCS12Obj *
+crypto_PKCS12_set_ca_certificates(crypto_PKCS12Obj *self, PyObject *args, PyObject *keywds)
+{
+ PyObject *cacerts;
+ static char *kwlist[] = {"cacerts", NULL};
+ int i;
+
+ if (!PyArg_ParseTupleAndKeywords(args, keywds, "O:set_ca_certificates",
+ kwlist, &cacerts))
+ return NULL;
+ if (cacerts == Py_None) {
+ /* We are good. */
+ } else if (PySequence_Check(cacerts)) { /* is iterable */
+ for(i = 0;i < PySequence_Length(cacerts);i++) { /* For each CA cert */
+ PyObject *obj;
+ obj = PySequence_GetItem(cacerts, i);
+ if (PyObject_Type(obj) != (PyObject *) &crypto_X509_Type) {
+ Py_DECREF(obj);
+ PyErr_SetString(PyExc_TypeError, "cacerts iterable must only contain X509Type");
+ return NULL;
+ }
+ Py_DECREF(obj);
+ }
+ } else {
+ PyErr_SetString(PyExc_TypeError, "cacerts must be an iterable or None");
+ return NULL;
+ }
+
+ Py_INCREF(cacerts); /* Make consistent before calling Py_DECREF() */
+ if(self->cacerts) {
+ Py_DECREF(self->cacerts);
+ }
+ self->cacerts = cacerts;
+
+ Py_INCREF(self);
+ return self;
+}
+
+static char crypto_PKCS12_export_doc[] = "\n\
+Dump a PKCS12 object to a buffer string\n\
+\n\
+Arguments: self - The PKCS12 object\n\
+ args - The Python argument tuple, should be:\n\
+ passphrase - (optional) for encrypting the PKCS12 string\n\
+ using 3DES-CBC\n\
+ friendly_name - stored in the file for display\n\
+ iter - number of iterations to use when encrypting\n\
+ maciter - number of iterations to use when creating the MAC.\n\
+ A special value of -1 means no MAC.\n\
+Returns: The buffer with the dumped pkcs12 in it\n\
+";
+
+static PyObject *
+crypto_PKCS12_export(crypto_PKCS12Obj *self, PyObject *args, PyObject *keywds)
+{
+ int buf_len;
+ PyObject *buffer;
+ char *temp, *passphrase = NULL, *friendly_name = NULL;
+ BIO *bio;
+ PKCS12 *p12;
+ EVP_PKEY *pkey = NULL;
+ STACK_OF(X509) *cacerts = NULL;
+ X509 *x509 = NULL;
+ int iter = PKCS12_DEFAULT_ITER;
+ int maciter = 0;
+ static char *kwlist[] = {"passphrase", "friendly_name", "iter", "maciter", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, keywds, "|zzii:export",
+ kwlist, &passphrase, &friendly_name, &iter, &maciter))
+ return NULL;
+
+ if (self->key && self->key != Py_None) {
+ pkey = ((crypto_PKeyObj*) self->key)->pkey;
+ }
+ if (self->cert && self->cert != Py_None) {
+ x509 = ((crypto_X509Obj*) self->cert)->x509;
+ }
+ cacerts = sk_X509_new_null();
+ if (self->cacerts && self->cacerts != Py_None) {
+ int i;
+ PyObject *obj;
+ for(i = 0;i < PySequence_Length(self->cacerts);i++) { /* For each CA cert */
+ obj = PySequence_GetItem(self->cacerts, i);
+ /* assert(PyObject_IsInstance(obj, (PyObject *) &crypto_X509_Type )); */
+ sk_X509_push(cacerts, (( crypto_X509Obj* ) obj)->x509);
+ Py_DECREF(obj);
+ }
+ }
+
+ p12 = PKCS12_create(passphrase, friendly_name, pkey, x509, cacerts,
+ NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
+ NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
+ iter, maciter, 0);
+ if( p12 == NULL ) {
+ exception_from_error_queue(crypto_Error);
+ return NULL;
+ }
+ bio = BIO_new(BIO_s_mem());
+ i2d_PKCS12_bio(bio, p12);
+ buf_len = BIO_get_mem_data(bio, &temp);
+ buffer = PyString_FromStringAndSize(temp, buf_len);
+ BIO_free(bio);
+ return buffer;
+}
+
/*
* ADD_METHOD(name) expands to a correct PyMethodDef declaration
* { 'name', (PyCFunction)crypto_PKCS12_name, METH_VARARGS, crypto_PKCS12_name_doc }
@@ -74,11 +254,17 @@
*/
#define ADD_METHOD(name) \
{ #name, (PyCFunction)crypto_PKCS12_##name, METH_VARARGS, crypto_PKCS12_##name##_doc }
+#define ADD_KW_METHOD(name) \
+ { #name, (PyCFunction)crypto_PKCS12_##name, METH_VARARGS | METH_KEYWORDS, crypto_PKCS12_##name##_doc }
static PyMethodDef crypto_PKCS12_methods[] =
{
ADD_METHOD(get_certificate),
+ ADD_KW_METHOD(set_certificate),
ADD_METHOD(get_privatekey),
+ ADD_KW_METHOD(set_privatekey),
ADD_METHOD(get_ca_certificates),
+ ADD_KW_METHOD(set_ca_certificates),
+ ADD_KW_METHOD(export),
{ NULL, NULL }
};
#undef ADD_METHOD
@@ -108,7 +294,7 @@
cacerts = sk_X509_new_null();
/* parse the PKCS12 lump */
- if (!(cacerts && PKCS12_parse(p12, passphrase, &pkey, &cert, &cacerts)))
+ if (p12 && !(cacerts && PKCS12_parse(p12, passphrase, &pkey, &cert, &cacerts)))
{
exception_from_error_queue(crypto_Error);
return NULL;
@@ -122,11 +308,20 @@
Py_INCREF(Py_None);
self->cacerts = Py_None;
- if ((self->cert = (PyObject *)crypto_X509_New(cert, 1)) == NULL)
- goto error;
-
- if ((self->key = (PyObject *)crypto_PKey_New(pkey, 1)) == NULL)
- goto error;
+ if (cert == NULL) {
+ Py_INCREF(Py_None);
+ self->cert = Py_None;
+ } else {
+ if ((self->cert = (PyObject *)crypto_X509_New(cert, 1)) == NULL)
+ goto error;
+ }
+ if (pkey == NULL) {
+ Py_INCREF(Py_None);
+ self->key = Py_None;
+ } else {
+ if ((self->key = (PyObject *)crypto_PKey_New(pkey, 1)) == NULL)
+ goto error;
+ }
/* Make a tuple for the CA certs */
cacert_count = sk_X509_num(cacerts);
diff --git a/src/crypto/pkcs12.h b/src/crypto/pkcs12.h
index 32c9ec4..0e724a9 100644
--- a/src/crypto/pkcs12.h
+++ b/src/crypto/pkcs12.h
@@ -27,4 +27,7 @@
PyObject *cacerts;
} crypto_PKCS12Obj;
+crypto_PKCS12Obj *
+crypto_PKCS12_New(PKCS12 *p12, char *passphrase);
+
#endif
diff --git a/test/test_crypto.py b/test/test_crypto.py
index aa22cd0..0a283e6 100644
--- a/test/test_crypto.py
+++ b/test/test_crypto.py
@@ -21,6 +21,8 @@
from OpenSSL.crypto import NetscapeSPKI, NetscapeSPKIType
from OpenSSL.test.util import TestCase
+from OpenSSL.crypto import load_pkcs12, PKCS12
+from subprocess import Popen, PIPE
cleartextCertificatePEM = """-----BEGIN CERTIFICATE-----
MIIC7TCCAlagAwIBAgIIPQzE4MbeufQwDQYJKoZIhvcNAQEFBQAwWDELMAkGA1UE
@@ -709,6 +711,129 @@
cert = load_certificate(FILETYPE_PEM, self.pemData)
self.assertEqual(cert.get_notBefore(), "20090325123658Z")
+class PKCS12Tests(TestCase):
+ """
+ Tests functions in the L{OpenSSL.crypto.PKCS12} module.
+ """
+ pemData = cleartextCertificatePEM + cleartextPrivateKeyPEM
+
+ def test_construction(self):
+ p12 = PKCS12()
+ self.assertEqual(None, p12.get_certificate())
+ self.assertEqual(None, p12.get_privatekey())
+ self.assertEqual(None, p12.get_ca_certificates())
+
+ def test_type_errors(self):
+ p12 = PKCS12()
+ self.assertRaises(TypeError, p12.set_certificate, 3)
+ self.assertRaises(TypeError, p12.set_privatekey, 3)
+ self.assertRaises(TypeError, p12.set_ca_certificates, 3)
+ self.assertRaises(TypeError, p12.set_ca_certificates, X509())
+ self.assertRaises(TypeError, p12.set_ca_certificates, (3, 4))
+
+ def test_key_only(self):
+ """
+ L{OpenSSL.crypto.PKCS12.export} and load a PKCS without a key
+ """
+ passwd = 'blah'
+ p12 = PKCS12()
+ pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
+ p12.set_privatekey( pkey )
+ self.assertEqual(None, p12.get_certificate())
+ self.assertEqual(pkey, p12.get_privatekey())
+ dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
+ p12 = load_pkcs12(dumped_p12, passwd)
+ self.assertEqual(None, p12.get_ca_certificates())
+ self.assertEqual(None, p12.get_certificate())
+ # It's actually in the pkcs12, but we silently don't find it (a key without a cert)
+ #self.assertEqual(cleartextPrivateKeyPEM, dump_privatekey(FILETYPE_PEM, p12.get_privatekey()))
+
+ def test_cert_only(self):
+ """
+ L{OpenSSL.crypto.PKCS12.export} and load a PKCS without a key.
+ Strangely, OpenSSL converts it to a CA cert.
+ """
+ passwd = 'blah'
+ p12 = PKCS12()
+ cert = load_certificate(FILETYPE_PEM, cleartextCertificatePEM)
+ p12.set_certificate( cert )
+ self.assertEqual(cert, p12.get_certificate())
+ self.assertEqual(None, p12.get_privatekey())
+ dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=3)
+ p12 = load_pkcs12(dumped_p12, passwd)
+ self.assertEqual(None, p12.get_privatekey())
+ self.assertEqual(None, p12.get_certificate())
+ self.assertEqual(cleartextCertificatePEM, dump_certificate(FILETYPE_PEM, p12.get_ca_certificates()[0]))
+
+
+ def test_export_and_load(self):
+ """
+ L{OpenSSL.crypto.PKCS12.export} and others
+ """
+ # use openssl program to create a p12 then load it
+ from OpenSSL.test.test_ssl import client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, root_cert_pem
+ passwd = 'whatever'
+ pem = client_key_pem + client_cert_pem
+ p12_str = Popen(["openssl", "pkcs12", '-export', '-clcerts', '-passout', 'pass:'+passwd], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(pem))[0]
+ p12 = load_pkcs12(p12_str, passwd)
+ # verify p12 using pkcs12 get_* functions
+ cert_pem = dump_certificate(FILETYPE_PEM, p12.get_certificate())
+ self.assertEqual(cert_pem, client_cert_pem)
+ key_pem = dump_privatekey(FILETYPE_PEM, p12.get_privatekey())
+ self.assertEqual(key_pem, client_key_pem)
+ self.assertEqual(None, p12.get_ca_certificates())
+ # dump cert and verify it using the openssl program
+ dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=0, friendly_name='blueberry')
+ recovered_key = Popen(["openssl", "pkcs12", '-nocerts', '-nodes', '-passin', 'pass:'+passwd ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12))[0]
+ self.assertEqual(recovered_key[-len(client_key_pem):], client_key_pem)
+ recovered_cert = Popen(["openssl", "pkcs12", '-clcerts', '-nodes', '-passin', 'pass:'+passwd, '-nokeys' ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12))[0]
+ self.assertEqual(recovered_cert[-len(client_cert_pem):], client_cert_pem)
+ # change the cert and key
+ p12.set_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
+ p12.set_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
+ root_cert = load_certificate(FILETYPE_PEM, root_cert_pem)
+ p12.set_ca_certificates( [ root_cert ] )
+ p12.set_ca_certificates( ( root_cert, ) )
+ self.assertEqual(1, len(p12.get_ca_certificates()))
+ self.assertEqual(root_cert, p12.get_ca_certificates()[0])
+ # recover changed cert and key using the openssl program
+ dumped_p12 = p12.export(passphrase=passwd, iter=2, maciter=0, friendly_name='Serverlicious')
+ recovered_key = Popen(["openssl", "pkcs12", '-nocerts', '-nodes', '-passin', 'pass:'+passwd ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12))[0]
+ self.assertEqual(recovered_key[-len(server_key_pem):], server_key_pem)
+ recovered_cert = Popen(["openssl", "pkcs12", '-clcerts', '-nodes', '-passin', 'pass:'+passwd, '-nokeys' ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12))[0]
+ self.assertEqual(recovered_cert[-len(server_cert_pem):], server_cert_pem)
+ recovered_cert = Popen(["openssl", "pkcs12", '-cacerts', '-nodes', '-passin', 'pass:'+passwd, '-nokeys' ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12))[0]
+ self.assertEqual(recovered_cert[-len(root_cert_pem):], root_cert_pem)
+ # Test other forms of no password
+ passwd = ''
+ dumped_p12_empty = p12.export(passphrase=passwd, iter=2, maciter=0, friendly_name='Sewer')
+ dumped_p12_none = p12.export(passphrase=None, iter=2, maciter=0, friendly_name='Sewer')
+ dumped_p12_nopw = p12.export( iter=2, maciter=0, friendly_name='Sewer')
+ recovered_empty = Popen(["openssl", "pkcs12", '-nodes', '-passin', 'pass:'+passwd ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12_empty))[0]
+ recovered_none = Popen(["openssl", "pkcs12", '-nodes', '-passin', 'pass:'+passwd ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12_none))[0]
+ recovered_nopw = Popen(["openssl", "pkcs12", '-nodes', '-passin', 'pass:'+passwd ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12_nopw))[0]
+ self.assertEqual(recovered_none, recovered_nopw)
+ self.assertEqual(recovered_none, recovered_empty)
+ # Test removing CA certs
+ p12.set_ca_certificates( None )
+ self.assertEqual(None, p12.get_ca_certificates())
+ # Test without MAC
+ dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
+ recovered_key = Popen(["openssl", "pkcs12", '-nocerts', '-nodes', '-passin', 'pass:'+passwd, '-nomacver' ], \
+ stdin=PIPE, stdout=PIPE).communicate(input=str(dumped_p12))[0]
+ self.assertEqual(recovered_key[-len(server_key_pem):], server_key_pem)
+ # We can't load PKCS12 without MAC, because we use PCKS_parse()
+ #p12 = load_pkcs12(dumped_p12, passwd)
+
def test_get_notAfter(self):
"""