A few cleanups and simplifications to the mem bio tests
trial's unittest doesn't actually have a main function, so stop trying to use it (so stop having a __main__ - doh)
improve/add test docstrings in a few places
factor the buffer flushing logic into a general `_loopback` method and then use it in the test; add another app-level exchange
to the main mem bio test
diff --git a/test/test_ssl.py b/test/test_ssl.py
index 0b68838..8ade92e 100644
--- a/test/test_ssl.py
+++ b/test/test_ssl.py
@@ -12,10 +12,10 @@
try:
# Prefer Twisted's TestCase, since it supports things like skips.
- from twisted.trial.unittest import TestCase, main
+ from twisted.trial.unittest import TestCase
except ImportError:
# Fall back to the stdlib TestCase though, since it kind of works.
- from unittest import TestCase, main
+ from unittest import TestCase
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey
from OpenSSL.SSL import WantReadError, Context, Connection, Error
@@ -421,12 +421,66 @@
def verify_cb(conn, cert, errnum, depth, ok):
return ok
-class BioTests(TestCase):
+class MemoryBIOTests(TestCase):
"""
- Tests L{OpenSSL.SSL.bio_read} and L{OpenSSL.SSL.bio_write} by
- connecting to ourself.
+ Tests for L{OpenSSL.SSL.Connection} using a memory BIO.
"""
+ def _loopback(self, client_conn, server_conn):
+ """
+ Try to read application bytes from each of the two L{Connection}
+ objects. Copy bytes back and forth between their send/receive buffers
+ for as long as there is anything to copy. When there is nothing more
+ to copy, return C{None}. If one of them actually manages to deliver
+ some application bytes, return a two-tuple of the connection from which
+ the bytes were read and the bytes themselves.
+ """
+ wrote = True
+ while wrote:
+ # Loop until neither side has anything to say
+ wrote = False
+
+ # Copy stuff from each side's send buffer to the other side's
+ # receive buffer.
+ for (read, write) in [(client_conn, server_conn),
+ (server_conn, client_conn)]:
+
+ # Give the side a chance to generate some more bytes, or
+ # succeed.
+ try:
+ bytes = read.recv(1024)
+ except WantReadError:
+ # It didn't succeed, so we'll hope it generated some
+ # output.
+ pass
+ else:
+ # It did succeed, so we'll stop now and let the caller deal
+ # with it.
+ return (read, bytes)
+
+ while True:
+ # Keep copying as long as there's more stuff there.
+ try:
+ dirty = read.bio_read(4096)
+ except WantReadError:
+ # Okay, nothing more waiting to be sent. Stop
+ # processing this send buffer.
+ break
+ else:
+ # Keep track of the fact that someone generated some
+ # output.
+ wrote = True
+ write.bio_write(dirty)
+
+
def test_connect(self):
+ """
+ Two L{Connection}s which use memory BIOs can be manually connected by
+ reading from the output of each and writing those bytes to the input of
+ the other and in this way establish a connection and exchange
+ application-level bytes with each other.
+ """
+ # Create the server side Connection. This is mostly setup boilerplate
+ # - use TLSv1, use a particular certificate, etc.
server_ctx = Context(TLSv1_METHOD)
server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
@@ -435,9 +489,12 @@
server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server_ctx.check_privatekey()
server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
+ # Here the Connection is actually created. None is passed as the 2nd
+ # parameter, indicating a memory BIO should be created.
server_conn = Connection(server_ctx, None)
server_conn.set_accept_state()
+ # Now create the client side Connection. Similar boilerplate to the above.
client_ctx = Context(TLSv1_METHOD)
client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
@@ -446,48 +503,39 @@
client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
client_ctx.check_privatekey()
client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
+ # Again, None to create a new memory BIO.
client_conn = Connection(client_ctx, None)
client_conn.set_connect_state()
- self.assertEqual( server_conn.master_key() , None)
- self.assertEqual( server_conn.client_random(), None)
- self.assertEqual( server_conn.server_random(), None)
+ # There should be no key or nonces yet.
+ self.assertIdentical(server_conn.master_key(), None)
+ self.assertIdentical(server_conn.client_random(), None)
+ self.assertIdentical(server_conn.server_random(), None)
+ # First, the handshake needs to happen. We'll deliver bytes back and
+ # forth between the client and server until neither of them feels like
+ # speaking any more.
+ self.assertIdentical(self._loopback(client_conn, server_conn), None)
+
+ # Now that the handshake is done, there should be a key and nonces.
+ self.assertNotIdentical(server_conn.master_key(), None)
+ self.assertNotIdentical(server_conn.client_random(), None)
+ self.assertNotIdentical(server_conn.server_random(), None)
+ self.assertNotIdentical(server_conn.client_random(), client_conn.client_random())
+ self.assertNotIdentical(server_conn.server_random(), client_conn.server_random())
+
+ # Here are the bytes we'll try to send.
important_message = 'One if by land, two if by sea.'
- try:
- client_conn.recv(1024)
- self.assertTrue(False)
- except WantReadError:
- dirty1 = client_conn.bio_read(4096)
- server_conn.bio_write(dirty1)
- try:
- server_conn.recv(1024)
- self.assertTrue(False)
- except WantReadError:
- dirty2 = server_conn.bio_read(4096)
- client_conn.bio_write(dirty2)
- try:
- client_conn.recv(1024)
- self.assertTrue(False)
- except WantReadError:
- dirty3 = client_conn.bio_read(4096)
- server_conn.bio_write(dirty3)
- try:
- server_conn.write(important_message)
- server_conn.recv(4096)
- self.assertTrue(False)
- except WantReadError:
- dirty4 = server_conn.bio_read(4096)
- client_conn.bio_write(dirty4)
- delivered_message = client_conn.recv(1024)
- self.assertEqual(important_message, delivered_message)
- self.assertTrue( len(client_conn.master_key() ) > 10 )
- self.assertTrue( len(client_conn.client_random()) > 10 )
- self.assertTrue( len(client_conn.server_random()) > 10 )
- self.assertEqual(client_conn.master_key() , server_conn.master_key())
- self.assertEqual(client_conn.client_random(), server_conn.client_random())
- self.assertEqual(client_conn.server_random(), server_conn.server_random())
+ server_conn.write(important_message)
+ self.assertEquals(
+ self._loopback(client_conn, server_conn),
+ (client_conn, important_message))
+
+ client_conn.write(important_message[::-1])
+ self.assertEquals(
+ self._loopback(client_conn, server_conn),
+ (server_conn, important_message[::-1]))
def test_socketOverridesMemory(self):
@@ -500,7 +548,3 @@
clientSSL = Connection(context, client)
self.assertRaises( TypeError, clientSSL.bio_read, 100)
self.assertRaises( TypeError, clientSSL.bio_write, "foo")
-
-
-if __name__ == '__main__':
- main()