Add support for Next Protocol Negotiation.
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index 6409b8e..404f8b9 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -1434,6 +1434,84 @@
self.assertEqual([(server, b("foo1.example.com"))], args)
+class NextProtoNegotiationTests(TestCase, _LoopbackMixin):
+ """
+ Test for Next Protocol Negotiation in PyOpenSSL.
+ """
+ def test_npn_success(self):
+ advertise_args =[]
+ select_args = []
+ def advertise(conn):
+ advertise_args.append((conn,))
+ return b('\x08http/1.1\x06spdy/2')
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b('spdy/2')
+
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_npn_advertise_callback(advertise)
+
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_npn_select_callback(select)
+
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
+
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
+
+ client = Connection(client_context, None)
+ client.set_connect_state()
+
+ self._interactInMemory(server, client)
+
+ self.assertEqual([(server,)], advertise_args)
+ self.assertEqual([(client, b('\x08http/1.1\x06spdy/2'))], select_args)
+
+ self.assertEqual(server.get_next_proto_negotiated(), b('spdy/2'))
+ self.assertEqual(client.get_next_proto_negotiated(), b('spdy/2'))
+
+
+ def test_npn_client_fail(self):
+ advertise_args =[]
+ select_args = []
+ def advertise(conn):
+ advertise_args.append((conn,))
+ return b('\x08http/1.1\x06spdy/2')
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b('')
+
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_npn_advertise_callback(advertise)
+
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_npn_select_callback(select)
+
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
+
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
+
+ client = Connection(client_context, None)
+ client.set_connect_state()
+
+ # If the client doesn't return anything, the connection will fail.
+ self.assertRaises(Error, self._interactInMemory, server, client)
+
+ self.assertEqual([(server,)], advertise_args)
+ self.assertEqual([(client, b('\x08http/1.1\x06spdy/2'))], select_args)
+
+
class SessionTests(TestCase):
"""