bpo-34670: Add TLS 1.3 post handshake auth (GH-9460)



Add SSLContext.post_handshake_auth and
SSLSocket.verify_client_post_handshake for TLS 1.3 post-handshake
authentication.

Signed-off-by: Christian Heimes <christian@python.org>q


https://bugs.python.org/issue34670
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
index 5b5d7dd..99d4ecc 100644
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -421,6 +421,9 @@
      */
     unsigned int hostflags;
     int protocol;
+#ifdef TLS1_3_VERSION
+    int post_handshake_auth;
+#endif
 } PySSLContext;
 
 typedef struct {
@@ -2643,6 +2646,30 @@
     return PyBytes_FromStringAndSize(buf, len);
 }
 
+/*[clinic input]
+_ssl._SSLSocket.verify_client_post_handshake
+
+Initiate TLS 1.3 post-handshake authentication
+[clinic start generated code]*/
+
+static PyObject *
+_ssl__SSLSocket_verify_client_post_handshake_impl(PySSLSocket *self)
+/*[clinic end generated code: output=532147f3b1341425 input=6bfa874810a3d889]*/
+{
+#ifdef TLS1_3_VERSION
+    int err = SSL_verify_client_post_handshake(self->ssl);
+    if (err == 0)
+        return _setSSLError(NULL, 0, __FILE__, __LINE__);
+    else
+        Py_RETURN_NONE;
+#else
+    PyErr_SetString(PyExc_NotImplementedError,
+                    "Post-handshake auth is not supported by your "
+                    "OpenSSL version.");
+    return NULL;
+#endif
+}
+
 #ifdef OPENSSL_VERSION_1_1
 
 static SSL_SESSION*
@@ -2819,6 +2846,7 @@
     _SSL__SSLSOCKET_SELECTED_ALPN_PROTOCOL_METHODDEF
     _SSL__SSLSOCKET_COMPRESSION_METHODDEF
     _SSL__SSLSOCKET_SHUTDOWN_METHODDEF
+    _SSL__SSLSOCKET_VERIFY_CLIENT_POST_HANDSHAKE_METHODDEF
     {NULL, NULL}
 };
 
@@ -2862,7 +2890,7 @@
  */
 
 static int
-_set_verify_mode(SSL_CTX *ctx, enum py_ssl_cert_requirements n)
+_set_verify_mode(PySSLContext *self, enum py_ssl_cert_requirements n)
 {
     int mode;
     int (*verify_cb)(int, X509_STORE_CTX *) = NULL;
@@ -2882,9 +2910,13 @@
                         "invalid value for verify_mode");
         return -1;
     }
+#ifdef TLS1_3_VERSION
+    if (self->post_handshake_auth)
+        mode |= SSL_VERIFY_POST_HANDSHAKE;
+#endif
     /* keep current verify cb */
-    verify_cb = SSL_CTX_get_verify_callback(ctx);
-    SSL_CTX_set_verify(ctx, mode, verify_cb);
+    verify_cb = SSL_CTX_get_verify_callback(self->ctx);
+    SSL_CTX_set_verify(self->ctx, mode, verify_cb);
     return 0;
 }
 
@@ -2966,13 +2998,13 @@
     /* Don't check host name by default */
     if (proto_version == PY_SSL_VERSION_TLS_CLIENT) {
         self->check_hostname = 1;
-        if (_set_verify_mode(self->ctx, PY_SSL_CERT_REQUIRED) == -1) {
+        if (_set_verify_mode(self, PY_SSL_CERT_REQUIRED) == -1) {
             Py_DECREF(self);
             return NULL;
         }
     } else {
         self->check_hostname = 0;
-        if (_set_verify_mode(self->ctx, PY_SSL_CERT_NONE) == -1) {
+        if (_set_verify_mode(self, PY_SSL_CERT_NONE) == -1) {
             Py_DECREF(self);
             return NULL;
         }
@@ -3065,6 +3097,11 @@
 #endif
     X509_VERIFY_PARAM_set_hostflags(params, self->hostflags);
 
+#ifdef TLS1_3_VERSION
+    self->post_handshake_auth = 0;
+    SSL_CTX_set_post_handshake_auth(self->ctx, self->post_handshake_auth);
+#endif
+
     return (PyObject *)self;
 }
 
@@ -3319,7 +3356,10 @@
 static PyObject *
 get_verify_mode(PySSLContext *self, void *c)
 {
-    switch (SSL_CTX_get_verify_mode(self->ctx)) {
+    /* ignore SSL_VERIFY_CLIENT_ONCE and SSL_VERIFY_POST_HANDSHAKE */
+    int mask = (SSL_VERIFY_NONE | SSL_VERIFY_PEER |
+                SSL_VERIFY_FAIL_IF_NO_PEER_CERT);
+    switch (SSL_CTX_get_verify_mode(self->ctx) & mask) {
     case SSL_VERIFY_NONE:
         return PyLong_FromLong(PY_SSL_CERT_NONE);
     case SSL_VERIFY_PEER:
@@ -3344,7 +3384,7 @@
                         "check_hostname is enabled.");
         return -1;
     }
-    return _set_verify_mode(self->ctx, n);
+    return _set_verify_mode(self, n);
 }
 
 static PyObject *
@@ -3550,7 +3590,7 @@
     if (check_hostname &&
             SSL_CTX_get_verify_mode(self->ctx) == SSL_VERIFY_NONE) {
         /* check_hostname = True sets verify_mode = CERT_REQUIRED */
-        if (_set_verify_mode(self->ctx, PY_SSL_CERT_REQUIRED) == -1) {
+        if (_set_verify_mode(self, PY_SSL_CERT_REQUIRED) == -1) {
             return -1;
         }
     }
@@ -3559,6 +3599,43 @@
 }
 
 static PyObject *
+get_post_handshake_auth(PySSLContext *self, void *c) {
+#if TLS1_3_VERSION
+    return PyBool_FromLong(self->post_handshake_auth);
+#else
+    Py_RETURN_NONE;
+#endif
+}
+
+#if TLS1_3_VERSION
+static int
+set_post_handshake_auth(PySSLContext *self, PyObject *arg, void *c) {
+    int (*verify_cb)(int, X509_STORE_CTX *) = NULL;
+    int mode = SSL_CTX_get_verify_mode(self->ctx);
+    int pha = PyObject_IsTrue(arg);
+
+    if (pha == -1) {
+        return -1;
+    }
+    self->post_handshake_auth = pha;
+
+    /* client-side socket setting, ignored by server-side */
+    SSL_CTX_set_post_handshake_auth(self->ctx, pha);
+
+    /* server-side socket setting, ignored by client-side */
+    verify_cb = SSL_CTX_get_verify_callback(self->ctx);
+    if (pha) {
+        mode |= SSL_VERIFY_POST_HANDSHAKE;
+    } else {
+        mode ^= SSL_VERIFY_POST_HANDSHAKE;
+    }
+    SSL_CTX_set_verify(self->ctx, mode, verify_cb);
+
+    return 0;
+}
+#endif
+
+static PyObject *
 get_protocol(PySSLContext *self, void *c) {
     return PyLong_FromLong(self->protocol);
 }
@@ -4461,6 +4538,13 @@
                      (setter) set_sni_callback, PySSLContext_sni_callback_doc},
     {"options", (getter) get_options,
                 (setter) set_options, NULL},
+    {"post_handshake_auth", (getter) get_post_handshake_auth,
+#ifdef TLS1_3_VERSION
+                            (setter) set_post_handshake_auth,
+#else
+                            NULL,
+#endif
+                            NULL},
     {"protocol", (getter) get_protocol,
                  NULL, NULL},
     {"verify_flags", (getter) get_verify_flags,