bpo-18233: Add internal methods to access peer chain (GH-25467)

The internal `_ssl._SSLSocket` object now provides methods to retrieve
the peer cert chain and verified cert chain as a list of Certificate
objects. Certificate objects have methods to convert the cert to a dict,
PEM, or DER (ASN.1).

These are private APIs for now. There is a slim chance to stabilize the
approach and provide a public API for 3.10. Otherwise I'll provide a
stable API in 3.11.

Signed-off-by: Christian Heimes <christian@python.org>
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 15d7bfa..f2b26c4 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -32,6 +32,7 @@
     ctypes = None
 
 ssl = import_helper.import_module("ssl")
+import _ssl
 
 from ssl import TLSVersion, _TLSContentType, _TLSMessageType, _TLSAlertType
 
@@ -297,7 +298,7 @@ def test_wrap_socket(sock, *,
     return context.wrap_socket(sock, **kwargs)
 
 
-def testing_context(server_cert=SIGNED_CERTFILE):
+def testing_context(server_cert=SIGNED_CERTFILE, *, server_chain=True):
     """Create context
 
     client_context, server_context, hostname = testing_context()
@@ -316,7 +317,8 @@ def testing_context(server_cert=SIGNED_CERTFILE):
 
     server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     server_context.load_cert_chain(server_cert)
-    server_context.load_verify_locations(SIGNING_CA)
+    if server_chain:
+        server_context.load_verify_locations(SIGNING_CA)
 
     return client_context, server_context, hostname
 
@@ -2482,6 +2484,12 @@ def run(self):
                     elif stripped == b'GETCERT':
                         cert = self.sslconn.getpeercert()
                         self.write(repr(cert).encode("us-ascii") + b"\n")
+                    elif stripped == b'VERIFIEDCHAIN':
+                        certs = self.sslconn._sslobj.get_verified_chain()
+                        self.write(len(certs).to_bytes(1, "big") + b"\n")
+                    elif stripped == b'UNVERIFIEDCHAIN':
+                        certs = self.sslconn._sslobj.get_unverified_chain()
+                        self.write(len(certs).to_bytes(1, "big") + b"\n")
                     else:
                         if (support.verbose and
                             self.server.connectionchatty):
@@ -4567,6 +4575,63 @@ def test_bpo37428_pha_cert_none(self):
                 # server cert has not been validated
                 self.assertEqual(s.getpeercert(), {})
 
+    def test_internal_chain_client(self):
+        client_context, server_context, hostname = testing_context(
+            server_chain=False
+        )
+        server = ThreadedEchoServer(context=server_context, chatty=False)
+        with server:
+            with client_context.wrap_socket(
+                socket.socket(),
+                server_hostname=hostname
+            ) as s:
+                s.connect((HOST, server.port))
+                vc = s._sslobj.get_verified_chain()
+                self.assertEqual(len(vc), 2)
+                ee, ca = vc
+                uvc = s._sslobj.get_unverified_chain()
+                self.assertEqual(len(uvc), 1)
+
+                self.assertEqual(ee, uvc[0])
+                self.assertEqual(hash(ee), hash(uvc[0]))
+                self.assertEqual(repr(ee), repr(uvc[0]))
+
+                self.assertNotEqual(ee, ca)
+                self.assertNotEqual(hash(ee), hash(ca))
+                self.assertNotEqual(repr(ee), repr(ca))
+                self.assertNotEqual(ee.get_info(), ca.get_info())
+                self.assertIn("CN=localhost", repr(ee))
+                self.assertIn("CN=our-ca-server", repr(ca))
+
+                pem = ee.public_bytes(_ssl.ENCODING_PEM)
+                der = ee.public_bytes(_ssl.ENCODING_DER)
+                self.assertIsInstance(pem, str)
+                self.assertIn("-----BEGIN CERTIFICATE-----", pem)
+                self.assertIsInstance(der, bytes)
+                self.assertEqual(
+                    ssl.PEM_cert_to_DER_cert(pem), der
+                )
+
+    def test_internal_chain_server(self):
+        client_context, server_context, hostname = testing_context()
+        client_context.load_cert_chain(SIGNED_CERTFILE)
+        server_context.verify_mode = ssl.CERT_REQUIRED
+        server_context.maximum_version = ssl.TLSVersion.TLSv1_2
+
+        server = ThreadedEchoServer(context=server_context, chatty=False)
+        with server:
+            with client_context.wrap_socket(
+                socket.socket(),
+                server_hostname=hostname
+            ) as s:
+                s.connect((HOST, server.port))
+                s.write(b'VERIFIEDCHAIN\n')
+                res = s.recv(1024)
+                self.assertEqual(res, b'\x02\n')
+                s.write(b'UNVERIFIEDCHAIN\n')
+                res = s.recv(1024)
+                self.assertEqual(res, b'\x02\n')
+
 
 HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
 requires_keylog = unittest.skipUnless(