Issue #12551: Provide a get_channel_binding() method on SSL sockets so as
to get channel binding data for the current SSL session (only the
"tls-unique" channel binding is implemented).  This allows the
implementation of certain authentication mechanisms such as SCRAM-SHA-1-PLUS.

Patch by Jacek Konieczny.
diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
index 5342e78..0ac0ac1 100644
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -386,6 +386,13 @@
 
    .. versionadded:: 3.2
 
+.. data:: CHANNEL_BINDING_TYPES
+
+   List of supported TLS channel binding types.  Strings in this list
+   can be used as arguments to :meth:`SSLSocket.get_channel_binding`.
+
+   .. versionadded:: 3.3
+
 .. data:: OPENSSL_VERSION
 
    The version string of the OpenSSL library loaded by the interpreter::
@@ -495,6 +502,18 @@
    version of the SSL protocol that defines its use, and the number of secret
    bits being used.  If no connection has been established, returns ``None``.
 
+.. method:: SSLSocket.get_channel_binding(cb_type="tls-unique")
+
+   Get channel binding data for current connection, as a bytes object.  Returns
+   ``None`` if not connected or the handshake has not been completed.
+
+   The *cb_type* parameter allow selection of the desired channel binding
+   type. Valid channel binding types are listed in the
+   :data:`CHANNEL_BINDING_TYPES` list.  Currently only the 'tls-unique' channel
+   binding, defined by :rfc:`5929`, is supported.  :exc:`ValueError` will be
+   raised if an unsupported channel binding type is requested.
+
+   .. versionadded:: 3.3
 
 .. method:: SSLSocket.unwrap()
 
diff --git a/Lib/ssl.py b/Lib/ssl.py
index cde99fc..914e749 100644
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -99,6 +99,10 @@
 import traceback
 import errno
 
+if _ssl.HAS_TLS_UNIQUE:
+    CHANNEL_BINDING_TYPES = ['tls-unique']
+else:
+    CHANNEL_BINDING_TYPES = []
 
 class CertificateError(ValueError):
     pass
@@ -495,6 +499,21 @@
                               self.do_handshake_on_connect),
                 addr)
 
+    def get_channel_binding(self, cb_type="tls-unique"):
+        """Get channel binding data for current connection.  Raise ValueError
+        if the requested `cb_type` is not supported.  Return bytes of the data
+        or None if the data is not available (e.g. before the handshake).
+        """
+        if cb_type not in CHANNEL_BINDING_TYPES:
+            raise ValueError("Unsupported channel binding type")
+        if cb_type != "tls-unique":
+            raise NotImplementedError(
+                            "{0} channel binding type not implemented"
+                            .format(cb_type))
+        if self._sslobj is None:
+            return None
+        return self._sslobj.tls_unique_cb()
+
     def __del__(self):
         # sys.stderr.write("__del__ on %s\n" % repr(self))
         self._real_close()
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index fd1cd2d..f3f0c54 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -321,6 +321,25 @@
             self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
                               server_hostname="some.hostname")
 
+    def test_unknown_channel_binding(self):
+        # should raise ValueError for unknown type
+        s = socket.socket(socket.AF_INET)
+        ss = ssl.wrap_socket(s)
+        with self.assertRaises(ValueError):
+            ss.get_channel_binding("unknown-type")
+
+    @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
+                         "'tls-unique' channel binding not available")
+    def test_tls_unique_channel_binding(self):
+        # unconnected should return None for known type
+        s = socket.socket(socket.AF_INET)
+        ss = ssl.wrap_socket(s)
+        self.assertIsNone(ss.get_channel_binding("tls-unique"))
+        # the same for server-side
+        s = socket.socket(socket.AF_INET)
+        ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)
+        self.assertIsNone(ss.get_channel_binding("tls-unique"))
+
 class ContextTests(unittest.TestCase):
 
     @skip_if_broken_ubuntu_ssl
@@ -826,6 +845,11 @@
                             self.sslconn = None
                             if support.verbose and self.server.connectionchatty:
                                 sys.stdout.write(" server: connection is now unencrypted...\n")
+                        elif stripped == b'CB tls-unique':
+                            if support.verbose and self.server.connectionchatty:
+                                sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
+                            data = self.sslconn.get_channel_binding("tls-unique")
+                            self.write(repr(data).encode("us-ascii") + b"\n")
                         else:
                             if (support.verbose and
                                 self.server.connectionchatty):
@@ -1625,6 +1649,73 @@
                 t.join()
                 server.close()
 
+        @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
+                             "'tls-unique' channel binding not available")
+        def test_tls_unique_channel_binding(self):
+            """Test tls-unique channel binding."""
+            if support.verbose:
+                sys.stdout.write("\n")
+
+            server = ThreadedEchoServer(CERTFILE,
+                                        certreqs=ssl.CERT_NONE,
+                                        ssl_version=ssl.PROTOCOL_TLSv1,
+                                        cacerts=CERTFILE,
+                                        chatty=True,
+                                        connectionchatty=False)
+            flag = threading.Event()
+            server.start(flag)
+            # wait for it to start
+            flag.wait()
+            # try to connect
+            s = ssl.wrap_socket(socket.socket(),
+                                server_side=False,
+                                certfile=CERTFILE,
+                                ca_certs=CERTFILE,
+                                cert_reqs=ssl.CERT_NONE,
+                                ssl_version=ssl.PROTOCOL_TLSv1)
+            s.connect((HOST, server.port))
+            try:
+                # get the data
+                cb_data = s.get_channel_binding("tls-unique")
+                if support.verbose:
+                    sys.stdout.write(" got channel binding data: {0!r}\n"
+                                     .format(cb_data))
+
+                # check if it is sane
+                self.assertIsNotNone(cb_data)
+                self.assertEqual(len(cb_data), 12) # True for TLSv1
+
+                # and compare with the peers version
+                s.write(b"CB tls-unique\n")
+                peer_data_repr = s.read().strip()
+                self.assertEqual(peer_data_repr,
+                                 repr(cb_data).encode("us-ascii"))
+                s.close()
+
+                # now, again
+                s = ssl.wrap_socket(socket.socket(),
+                                    server_side=False,
+                                    certfile=CERTFILE,
+                                    ca_certs=CERTFILE,
+                                    cert_reqs=ssl.CERT_NONE,
+                                    ssl_version=ssl.PROTOCOL_TLSv1)
+                s.connect((HOST, server.port))
+                new_cb_data = s.get_channel_binding("tls-unique")
+                if support.verbose:
+                    sys.stdout.write(" got another channel binding data: {0!r}\n"
+                                     .format(new_cb_data))
+                # is it really unique
+                self.assertNotEqual(cb_data, new_cb_data)
+                self.assertIsNotNone(cb_data)
+                self.assertEqual(len(cb_data), 12) # True for TLSv1
+                s.write(b"CB tls-unique\n")
+                peer_data_repr = s.read().strip()
+                self.assertEqual(peer_data_repr,
+                                 repr(new_cb_data).encode("us-ascii"))
+                s.close()
+            finally:
+                server.stop()
+                server.join()
 
 def test_main(verbose=False):
     if support.verbose:
diff --git a/Misc/ACKS b/Misc/ACKS
index 1e9a721..8a63f50 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -516,6 +516,7 @@
 Pat Knight
 Greg Kochanski
 Damon Kohler
+Jacek Konieczny
 Марк Коренберг
 Vlad Korolev
 Joseph Koshy
diff --git a/Misc/NEWS b/Misc/NEWS
index d648501..c3e0f52 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -234,6 +234,12 @@
 Library
 -------
 
+- Issue #12551: Provide a get_channel_binding() method on SSL sockets so as
+  to get channel binding data for the current SSL session (only the
+  "tls-unique" channel binding is implemented).  This allows the implementation
+  of certain authentication mechanisms such as SCRAM-SHA-1-PLUS.  Patch by
+  Jacek Konieczny.
+
 - Issue #665194: email.utils now has format_datetime and parsedate_to_datetime
   functions, allowing for round tripping of RFC2822 format dates.
 
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
index d2d2480..1a367f2 100644
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -124,6 +124,17 @@
 # undef HAVE_SSL_CTX_CLEAR_OPTIONS
 #endif
 
+/* In case of 'tls-unique' it will be 12 bytes for TLS, 36 bytes for
+ * older SSL, but let's be safe */
+#define PySSL_CB_MAXLEN 128
+
+/* SSL_get_finished got added to OpenSSL in 0.9.5 */
+#if OPENSSL_VERSION_NUMBER >= 0x0090500fL
+# define HAVE_OPENSSL_FINISHED 1
+#else
+# define HAVE_OPENSSL_FINISHED 0
+#endif
+
 typedef struct {
     PyObject_HEAD
     SSL_CTX *ctx;
@@ -135,6 +146,7 @@
     SSL *ssl;
     X509 *peer_cert;
     int shutdown_seen_zero;
+    enum py_ssl_server_or_client socket_type;
 } PySSLSocket;
 
 static PyTypeObject PySSLContext_Type;
@@ -328,6 +340,7 @@
         SSL_set_accept_state(self->ssl);
     PySSL_END_ALLOW_THREADS
 
+    self->socket_type = socket_type;
     self->Socket = PyWeakref_NewRef((PyObject *) sock, NULL);
     return self;
 }
@@ -1377,6 +1390,41 @@
 Does the SSL shutdown handshake with the remote end, and returns\n\
 the underlying socket object.");
 
+#if HAVE_OPENSSL_FINISHED
+static PyObject *
+PySSL_tls_unique_cb(PySSLSocket *self)
+{
+    PyObject *retval = NULL;
+    char buf[PySSL_CB_MAXLEN];
+    int len;
+
+    if (SSL_session_reused(self->ssl) ^ !self->socket_type) {
+        /* if session is resumed XOR we are the client */
+        len = SSL_get_finished(self->ssl, buf, PySSL_CB_MAXLEN);
+    }
+    else {
+        /* if a new session XOR we are the server */
+        len = SSL_get_peer_finished(self->ssl, buf, PySSL_CB_MAXLEN);
+    }
+
+    /* It cannot be negative in current OpenSSL version as of July 2011 */
+    assert(len >= 0);
+    if (len == 0)
+        Py_RETURN_NONE;
+
+    retval = PyBytes_FromStringAndSize(buf, len);
+
+    return retval;
+}
+
+PyDoc_STRVAR(PySSL_tls_unique_cb_doc,
+"tls_unique_cb() -> bytes\n\
+\n\
+Returns the 'tls-unique' channel binding data, as defined by RFC 5929.\n\
+\n\
+If the TLS handshake is not yet complete, None is returned");
+
+#endif /* HAVE_OPENSSL_FINISHED */
 
 static PyMethodDef PySSLMethods[] = {
     {"do_handshake", (PyCFunction)PySSL_SSLdo_handshake, METH_NOARGS},
@@ -1391,6 +1439,10 @@
     {"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS},
     {"shutdown", (PyCFunction)PySSL_SSLshutdown, METH_NOARGS,
      PySSL_SSLshutdown_doc},
+#if HAVE_OPENSSL_FINISHED
+    {"tls_unique_cb", (PyCFunction)PySSL_tls_unique_cb, METH_NOARGS,
+     PySSL_tls_unique_cb_doc},
+#endif
     {NULL, NULL}
 };
 
@@ -2221,6 +2273,14 @@
     Py_INCREF(r);
     PyModule_AddObject(m, "HAS_SNI", r);
 
+#if HAVE_OPENSSL_FINISHED
+    r = Py_True;
+#else
+    r = Py_False;
+#endif
+    Py_INCREF(r);
+    PyModule_AddObject(m, "HAS_TLS_UNIQUE", r);
+
     /* OpenSSL version */
     /* SSLeay() gives us the version of the library linked against,
        which could be different from the headers version.