Issue #14204: The ssl module now has support for the Next Protocol Negotiation extension, if available in the underlying OpenSSL library.
Patch by Colin Marc.
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index c6ce075..ada3c4b 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -879,6 +879,7 @@
                 try:
                     self.sslconn = self.server.context.wrap_socket(
                         self.sock, server_side=True)
+                    self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
                 except ssl.SSLError as e:
                     # XXX Various errors can have happened here, for example
                     # a mismatching protocol version, an invalid certificate,
@@ -901,6 +902,8 @@
                     cipher = self.sslconn.cipher()
                     if support.verbose and self.server.chatty:
                         sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
+                        sys.stdout.write(" server: selected protocol is now "
+                                + str(self.sslconn.selected_npn_protocol()) + "\n")
                     return True
 
             def read(self):
@@ -979,7 +982,7 @@
         def __init__(self, certificate=None, ssl_version=None,
                      certreqs=None, cacerts=None,
                      chatty=True, connectionchatty=False, starttls_server=False,
-                     ciphers=None, context=None):
+                     npn_protocols=None, ciphers=None, context=None):
             if context:
                 self.context = context
             else:
@@ -992,6 +995,8 @@
                     self.context.load_verify_locations(cacerts)
                 if certificate:
                     self.context.load_cert_chain(certificate)
+                if npn_protocols:
+                    self.context.set_npn_protocols(npn_protocols)
                 if ciphers:
                     self.context.set_ciphers(ciphers)
             self.chatty = chatty
@@ -1001,6 +1006,7 @@
             self.port = support.bind_port(self.sock)
             self.flag = None
             self.active = False
+            self.selected_protocols = []
             self.conn_errors = []
             threading.Thread.__init__(self)
             self.daemon = True
@@ -1195,6 +1201,7 @@
         Launch a server, connect a client to it and try various reads
         and writes.
         """
+        stats = {}
         server = ThreadedEchoServer(context=server_context,
                                     chatty=chatty,
                                     connectionchatty=False)
@@ -1220,12 +1227,14 @@
                 if connectionchatty:
                     if support.verbose:
                         sys.stdout.write(" client:  closing connection.\n")
-                stats = {
+                stats.update({
                     'compression': s.compression(),
                     'cipher': s.cipher(),
-                }
+                    'client_npn_protocol': s.selected_npn_protocol()
+                })
                 s.close()
-                return stats
+            stats['server_npn_protocols'] = server.selected_protocols
+        return stats
 
     def try_protocol_combo(server_protocol, client_protocol, expect_success,
                            certsreqs=None, server_options=0, client_options=0):
@@ -1853,6 +1862,43 @@
             if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
                 self.fail("Non-DH cipher: " + cipher[0])
 
+        def test_selected_npn_protocol(self):
+            # selected_npn_protocol() is None unless NPN is used
+            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+            context.load_cert_chain(CERTFILE)
+            stats = server_params_test(context, context,
+                                       chatty=True, connectionchatty=True)
+            self.assertIs(stats['client_npn_protocol'], None)
+
+        @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
+        def test_npn_protocols(self):
+            server_protocols = ['http/1.1', 'spdy/2']
+            protocol_tests = [
+                (['http/1.1', 'spdy/2'], 'http/1.1'),
+                (['spdy/2', 'http/1.1'], 'http/1.1'),
+                (['spdy/2', 'test'], 'spdy/2'),
+                (['abc', 'def'], 'abc')
+            ]
+            for client_protocols, expected in protocol_tests:
+                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+                server_context.load_cert_chain(CERTFILE)
+                server_context.set_npn_protocols(server_protocols)
+                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+                client_context.load_cert_chain(CERTFILE)
+                client_context.set_npn_protocols(client_protocols)
+                stats = server_params_test(client_context, server_context,
+                                           chatty=True, connectionchatty=True)
+
+                msg = "failed trying %s (s) and %s (c).\n" \
+                      "was expecting %s, but got %%s from the %%s" \
+                          % (str(server_protocols), str(client_protocols),
+                             str(expected))
+                client_result = stats['client_npn_protocol']
+                self.assertEqual(client_result, expected, msg % (client_result, "client"))
+                server_result = stats['server_npn_protocols'][-1] \
+                    if len(stats['server_npn_protocols']) else 'nothing'
+                self.assertEqual(server_result, expected, msg % (server_result, "server"))
+
 
 def test_main(verbose=False):
     if support.verbose: