Issue #12551: Provide a get_channel_binding() method on SSL sockets so as
to get channel binding data for the current SSL session (only the
"tls-unique" channel binding is implemented).  This allows the
implementation of certain authentication mechanisms such as SCRAM-SHA-1-PLUS.

Patch by Jacek Konieczny.
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index fd1cd2d..f3f0c54 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -321,6 +321,25 @@
             self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
                               server_hostname="some.hostname")
 
+    def test_unknown_channel_binding(self):
+        # should raise ValueError for unknown type
+        s = socket.socket(socket.AF_INET)
+        ss = ssl.wrap_socket(s)
+        with self.assertRaises(ValueError):
+            ss.get_channel_binding("unknown-type")
+
+    @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
+                         "'tls-unique' channel binding not available")
+    def test_tls_unique_channel_binding(self):
+        # unconnected should return None for known type
+        s = socket.socket(socket.AF_INET)
+        ss = ssl.wrap_socket(s)
+        self.assertIsNone(ss.get_channel_binding("tls-unique"))
+        # the same for server-side
+        s = socket.socket(socket.AF_INET)
+        ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)
+        self.assertIsNone(ss.get_channel_binding("tls-unique"))
+
 class ContextTests(unittest.TestCase):
 
     @skip_if_broken_ubuntu_ssl
@@ -826,6 +845,11 @@
                             self.sslconn = None
                             if support.verbose and self.server.connectionchatty:
                                 sys.stdout.write(" server: connection is now unencrypted...\n")
+                        elif stripped == b'CB tls-unique':
+                            if support.verbose and self.server.connectionchatty:
+                                sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
+                            data = self.sslconn.get_channel_binding("tls-unique")
+                            self.write(repr(data).encode("us-ascii") + b"\n")
                         else:
                             if (support.verbose and
                                 self.server.connectionchatty):
@@ -1625,6 +1649,73 @@
                 t.join()
                 server.close()
 
+        @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
+                             "'tls-unique' channel binding not available")
+        def test_tls_unique_channel_binding(self):
+            """Test tls-unique channel binding."""
+            if support.verbose:
+                sys.stdout.write("\n")
+
+            server = ThreadedEchoServer(CERTFILE,
+                                        certreqs=ssl.CERT_NONE,
+                                        ssl_version=ssl.PROTOCOL_TLSv1,
+                                        cacerts=CERTFILE,
+                                        chatty=True,
+                                        connectionchatty=False)
+            flag = threading.Event()
+            server.start(flag)
+            # wait for it to start
+            flag.wait()
+            # try to connect
+            s = ssl.wrap_socket(socket.socket(),
+                                server_side=False,
+                                certfile=CERTFILE,
+                                ca_certs=CERTFILE,
+                                cert_reqs=ssl.CERT_NONE,
+                                ssl_version=ssl.PROTOCOL_TLSv1)
+            s.connect((HOST, server.port))
+            try:
+                # get the data
+                cb_data = s.get_channel_binding("tls-unique")
+                if support.verbose:
+                    sys.stdout.write(" got channel binding data: {0!r}\n"
+                                     .format(cb_data))
+
+                # check if it is sane
+                self.assertIsNotNone(cb_data)
+                self.assertEqual(len(cb_data), 12) # True for TLSv1
+
+                # and compare with the peers version
+                s.write(b"CB tls-unique\n")
+                peer_data_repr = s.read().strip()
+                self.assertEqual(peer_data_repr,
+                                 repr(cb_data).encode("us-ascii"))
+                s.close()
+
+                # now, again
+                s = ssl.wrap_socket(socket.socket(),
+                                    server_side=False,
+                                    certfile=CERTFILE,
+                                    ca_certs=CERTFILE,
+                                    cert_reqs=ssl.CERT_NONE,
+                                    ssl_version=ssl.PROTOCOL_TLSv1)
+                s.connect((HOST, server.port))
+                new_cb_data = s.get_channel_binding("tls-unique")
+                if support.verbose:
+                    sys.stdout.write(" got another channel binding data: {0!r}\n"
+                                     .format(new_cb_data))
+                # is it really unique
+                self.assertNotEqual(cb_data, new_cb_data)
+                self.assertIsNotNone(cb_data)
+                self.assertEqual(len(cb_data), 12) # True for TLSv1
+                s.write(b"CB tls-unique\n")
+                peer_data_repr = s.read().strip()
+                self.assertEqual(peer_data_repr,
+                                 repr(new_cb_data).encode("us-ascii"))
+                s.close()
+            finally:
+                server.stop()
+                server.join()
 
 def test_main(verbose=False):
     if support.verbose: