Add a new test case of connecting to ourself over an actual socket to pass data.
diff --git a/test/test_ssl.py b/test/test_ssl.py
index a51cfc4..69008a6 100644
--- a/test/test_ssl.py
+++ b/test/test_ssl.py
@@ -31,6 +31,31 @@
OP_NO_TICKET = None
+def socket_pair():
+ ''' Establish and return a pair of network sockets connected
+ to each other. '''
+ # Connect a pair of sockets
+ port = socket()
+ port.bind(('', 0))
+ port.listen(1)
+ client = socket()
+ client.setblocking(False)
+ client.connect_ex(port.getsockname())
+ server = port.accept()[0]
+ server.setblocking(False)
+
+ # Let's pass some unencrypted data to make sure our
+ # socket connection is fine.
+ stuff = "I've got a bad feeling about this."
+ server.send(stuff)
+ assert client.recv(1024) == stuff
+ stuff = "What?!?! The First Bank of Alderaan!"
+ client.send(stuff)
+ assert server.recv(1024) == stuff
+
+ return (server, client)
+
+
class ContextTests(TestCase):
"""
Unit tests for L{OpenSSL.SSL.Context}.
@@ -88,13 +113,7 @@
L{Context.set_info_callback} accepts a callable which will be invoked
when certain information about an SSL connection is available.
"""
- port = socket()
- port.bind(('', 0))
- port.listen(1)
-
- client = socket()
- client.setblocking(False)
- client.connect_ex(port.getsockname())
+ (server, client) = socket_pair()
clientSSL = Connection(Context(TLSv1_METHOD), client)
clientSSL.set_connect_state()
@@ -109,9 +128,6 @@
context.use_privatekey(
load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
- server, ignored = port.accept()
- server.setblocking(False)
-
serverSSL = Connection(context, server)
serverSSL.set_accept_state()
@@ -127,13 +143,7 @@
def _load_verify_locations_test(self, *args):
- port = socket()
- port.bind(('', 0))
- port.listen(1)
-
- client = socket()
- client.setblocking(False)
- client.connect_ex(port.getsockname())
+ (server, client) = socket_pair()
clientContext = Context(TLSv1_METHOD)
clientContext.load_verify_locations(*args)
@@ -146,9 +156,6 @@
clientSSL = Connection(clientContext, client)
clientSSL.set_connect_state()
- server, _ = port.accept()
- server.setblocking(False)
-
serverContext = Context(TLSv1_METHOD)
serverContext.use_certificate(
load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
@@ -412,7 +419,7 @@
"""
Tests for L{OpenSSL.SSL.Connection} using a memory BIO.
"""
- def _server(self):
+ def _server(self, sock=None):
# Create the server side Connection. This is mostly setup boilerplate
# - use TLSv1, use a particular certificate, etc.
server_ctx = Context(TLSv1_METHOD)
@@ -423,14 +430,14 @@
server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server_ctx.check_privatekey()
server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
- # Here the Connection is actually created. None is passed as the 2nd
- # parameter, indicating a memory BIO should be created.
- server_conn = Connection(server_ctx, None)
+ # Here the Connection is actually created. If None is passed as the 2nd
+ # parameter, it indicates a memory BIO should be created.
+ server_conn = Connection(server_ctx, sock)
server_conn.set_accept_state()
return server_conn
- def _client(self):
+ def _client(self, sock=None):
# Now create the client side Connection. Similar boilerplate to the above.
client_ctx = Context(TLSv1_METHOD)
client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
@@ -440,8 +447,7 @@
client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
client_ctx.check_privatekey()
client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
- # Again, None to create a new memory BIO.
- client_conn = Connection(client_ctx, None)
+ client_conn = Connection(client_ctx, sock)
client_conn.set_connect_state()
return client_conn
@@ -536,6 +542,42 @@
(server_conn, important_message[::-1]))
+ def test_socket_connect(self):
+ """
+ Just like test_connect() but with an actual socket.
+ """
+ (server, client) = socket_pair()
+
+ # Let the encryption begin...
+ client_conn = self._client(client)
+ client_conn.set_connect_state()
+ server_conn = self._server(server)
+ server_conn.set_accept_state()
+ # Establish the connection
+ established = False
+ while not established:
+ established = True # assume the best
+ for ssl in client_conn, server_conn:
+ try:
+ # Generally a recv() or send() could also work instead
+ # of do_handshake(), and we would stop on the first
+ # non-exception.
+ ssl.do_handshake()
+ except WantReadError:
+ established = False
+
+ important_message = "Help me Obi Wan Kenobi, you're my only hope."
+ client_conn.send(important_message)
+ msg = server_conn.recv(1024)
+ self.assertEqual(msg, important_message)
+
+ # Again in the other direction, just for fun.
+ important_message = important_message[::-1]
+ server_conn.send(important_message)
+ msg = client_conn.recv(1024)
+ self.assertEqual(msg, important_message)
+
+
def test_socketOverridesMemory(self):
"""
Test that L{OpenSSL.SSL.bio_read} and L{OpenSSL.SSL.bio_write} don't