Avoid talking to verisign.com for the get_peer_cert_chain test; put it on the right test case, too. Also some other minor cleanups suggested by pyflakes.
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index 5752d13..61409b1 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -12,7 +12,7 @@
from os.path import join
from unittest import main
-from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1
+from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
from OpenSSL.crypto import PKey, X509, X509Extension
from OpenSSL.crypto import dump_privatekey, load_privatekey
from OpenSSL.crypto import dump_certificate, load_certificate
@@ -22,14 +22,19 @@
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
-from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
-from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version
+from OpenSSL.SSL import (
+ VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE, VERIFY_NONE)
+from OpenSSL.SSL import (
+ Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version)
from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType
from OpenSSL.test.util import TestCase, bytes, b
-from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
-from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem
-from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem
+from OpenSSL.test.test_crypto import (
+ cleartextCertificatePEM, cleartextPrivateKeyPEM)
+from OpenSSL.test.test_crypto import (
+ client_cert_pem, client_key_pem, server_cert_pem, server_key_pem,
+ root_cert_pem)
+
try:
from OpenSSL.SSL import OP_NO_QUERY_MTU
except ImportError:
@@ -63,6 +68,7 @@
def verify_cb(conn, cert, errnum, depth, ok):
return ok
+
def socket_pair():
"""
Establish and return a pair of network sockets connected to each other.
@@ -105,6 +111,60 @@
conns.remove(conn)
+def _create_certificate_chain():
+ """
+ Construct and return a chain of certificates.
+
+ 1. A new self-signed certificate authority certificate (cacert)
+ 2. A new intermediate certificate signed by cacert (icert)
+ 3. A new server certificate signed by icert (scert)
+ """
+ caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
+
+ # Step 1
+ cakey = PKey()
+ cakey.generate_key(TYPE_RSA, 512)
+ cacert = X509()
+ cacert.get_subject().commonName = "Authority Certificate"
+ cacert.set_issuer(cacert.get_subject())
+ cacert.set_pubkey(cakey)
+ cacert.set_notBefore(b("20000101000000Z"))
+ cacert.set_notAfter(b("20200101000000Z"))
+ cacert.add_extensions([caext])
+ cacert.set_serial_number(0)
+ cacert.sign(cakey, "sha1")
+
+ # Step 2
+ ikey = PKey()
+ ikey.generate_key(TYPE_RSA, 512)
+ icert = X509()
+ icert.get_subject().commonName = "Intermediate Certificate"
+ icert.set_issuer(cacert.get_subject())
+ icert.set_pubkey(ikey)
+ icert.set_notBefore(b("20000101000000Z"))
+ icert.set_notAfter(b("20200101000000Z"))
+ icert.add_extensions([caext])
+ icert.set_serial_number(0)
+ icert.sign(cakey, "sha1")
+
+ # Step 3
+ skey = PKey()
+ skey.generate_key(TYPE_RSA, 512)
+ scert = X509()
+ scert.get_subject().commonName = "Server Certificate"
+ scert.set_issuer(icert.get_subject())
+ scert.set_pubkey(skey)
+ scert.set_notBefore(b("20000101000000Z"))
+ scert.set_notAfter(b("20200101000000Z"))
+ scert.add_extensions([
+ X509Extension(b('basicConstraints'), True, b('CA:false'))])
+ scert.set_serial_number(0)
+ scert.sign(ikey, "sha1")
+
+ return [(cakey, cacert), (ikey, icert), (skey, scert)]
+
+
+
class _LoopbackMixin:
"""
Helper mixin which defines methods for creating a connected socket pair and
@@ -150,7 +210,7 @@
# Give the side a chance to generate some more bytes, or
# succeed.
try:
- bytes = read.recv(2 ** 16)
+ data = read.recv(2 ** 16)
except WantReadError:
# It didn't succeed, so we'll hope it generated some
# output.
@@ -158,7 +218,7 @@
else:
# It did succeed, so we'll stop now and let the caller deal
# with it.
- return (read, bytes)
+ return (read, data)
while True:
# Keep copying as long as there's more stuff there.
@@ -175,6 +235,7 @@
write.bio_write(dirty)
+
class VersionTests(TestCase):
"""
Tests for version information exposed by
@@ -307,37 +368,6 @@
context = Context(TLSv1_METHOD)
self.assertRaises(TypeError, context.get_timeout, None)
- def test_get_peer_cert_chain(self):
- """
- L{Connection.get_peer_cert_chain} returns the tuple of certificates
- which the connected server returned for the certification verification.
- """
- # Testing this requires a server with a certificate signed by one of
- # the CAs in the platform CA location. Getting one of those costs
- # money. Fortunately (or unfortunately, depending on your
- # perspective), it's easy to think of a public server on the
- # internet which has such a certificate. Connecting to the network
- # in a unit test is bad, but it's the only way I can think of to
- # really test this. -exarkun
-
- # Arg, verisign.com doesn't speak TLSv1
- context = Context(SSLv3_METHOD)
-
- client = socket()
- client.connect(('verisign.com', 443))
- clientSSL = Connection(context, client)
- clientSSL.set_connect_state()
- clientSSL.do_handshake()
- cert = clientSSL.get_peer_certificate()
- certs = clientSSL.get_peer_cert_chain()
- self.assertEqual(dump_certificate(FILETYPE_PEM, cert),
- dump_certificate(FILETYPE_PEM, certs[0]))
- self.assertEqual(certs[0].get_subject().CN, 'www.verisign.com')
- self.assertEqual(certs[1].get_subject().CN,
- 'VeriSign Class 3 Extended Validation SSL SGC CA')
- self.assertEqual(certs[2].get_subject().CN,
- 'VeriSign Class 3 Public Primary Certification Authority - G5')
-
def test_timeout(self):
"""
@@ -669,59 +699,6 @@
self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
- def _create_certificate_chain(self):
- """
- Construct and return a chain of certificates.
-
- 1. A new self-signed certificate authority certificate (cacert)
- 2. A new intermediate certificate signed by cacert (icert)
- 3. A new server certificate signed by icert (scert)
- """
- caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
-
- # Step 1
- cakey = PKey()
- cakey.generate_key(TYPE_RSA, 512)
- cacert = X509()
- cacert.get_subject().commonName = "Authority Certificate"
- cacert.set_issuer(cacert.get_subject())
- cacert.set_pubkey(cakey)
- cacert.set_notBefore(b("20000101000000Z"))
- cacert.set_notAfter(b("20200101000000Z"))
- cacert.add_extensions([caext])
- cacert.set_serial_number(0)
- cacert.sign(cakey, "sha1")
-
- # Step 2
- ikey = PKey()
- ikey.generate_key(TYPE_RSA, 512)
- icert = X509()
- icert.get_subject().commonName = "Intermediate Certificate"
- icert.set_issuer(cacert.get_subject())
- icert.set_pubkey(ikey)
- icert.set_notBefore(b("20000101000000Z"))
- icert.set_notAfter(b("20200101000000Z"))
- icert.add_extensions([caext])
- icert.set_serial_number(0)
- icert.sign(cakey, "sha1")
-
- # Step 3
- skey = PKey()
- skey.generate_key(TYPE_RSA, 512)
- scert = X509()
- scert.get_subject().commonName = "Server Certificate"
- scert.set_issuer(icert.get_subject())
- scert.set_pubkey(skey)
- scert.set_notBefore(b("20000101000000Z"))
- scert.set_notAfter(b("20200101000000Z"))
- scert.add_extensions([
- X509Extension(b('basicConstraints'), True, b('CA:false'))])
- scert.set_serial_number(0)
- scert.sign(ikey, "sha1")
-
- return [(cakey, cacert), (ikey, icert), (skey, scert)]
-
-
def _handshake_test(self, serverContext, clientContext):
"""
Verify that a client and server created with the given contexts can
@@ -757,7 +734,7 @@
to it with a client which trusts cacert and requires verification to
succeed.
"""
- chain = self._create_certificate_chain()
+ chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
# Dump the CA certificate to a file because that's the only way to load
@@ -798,7 +775,7 @@
to it with a client which trusts cacert and requires verification to
succeed.
"""
- chain = self._create_certificate_chain()
+ chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
# Write out the chain file.
@@ -1126,6 +1103,49 @@
self.assertRaises(NotImplementedError, conn.makefile)
+ def test_get_peer_cert_chain_wrong_args(self):
+ """
+ L{Connection.get_peer_cert_chain} raises L{TypeError} if called with any
+ arguments.
+ """
+ conn = Connection(Context(TLSv1_METHOD), None)
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, 1)
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, "foo")
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, object())
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, [])
+
+
+ def test_get_peer_cert_chain(self):
+ """
+ L{Connection.get_peer_cert_chain} returns a list of certificates which
+ the connected server returned for the certification verification.
+ """
+ chain = _create_certificate_chain()
+ [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
+
+ serverContext = Context(TLSv1_METHOD)
+ serverContext.use_privatekey(skey)
+ serverContext.use_certificate(scert)
+ serverContext.add_extra_chain_cert(icert)
+ serverContext.add_extra_chain_cert(cacert)
+ server = Connection(serverContext, None)
+ server.set_accept_state()
+
+ # Create the client
+ clientContext = Context(TLSv1_METHOD)
+ clientContext.set_verify(VERIFY_NONE, verify_cb)
+ client = Connection(clientContext, None)
+ client.set_connect_state()
+
+ self._interactInMemory(client, server)
+
+ chain = client.get_peer_cert_chain()
+ self.assertEqual(len(chain), 3)
+ self.assertEqual(chain[0].get_subject().CN, "Server Certificate")
+ self.assertEqual(chain[1].get_subject().CN, "Intermediate Certificate")
+ self.assertEqual(chain[2].get_subject().CN, "Authority Certificate")
+
+
class ConnectionGetCipherListTests(TestCase):
"""