blob: a32b0bcc7e22b072866de15ac10bb0b0a0d424ad [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Victor Stinner466e18e2019-07-01 19:01:52 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010034IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010036PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000037
Victor Stinner3ef63442019-02-19 18:06:03 +010038PROTOCOL_TO_TLS_VERSION = {}
39for proto, ver in (
40 ("PROTOCOL_SSLv23", "SSLv3"),
41 ("PROTOCOL_TLSv1", "TLSv1"),
42 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
43):
44 try:
45 proto = getattr(ssl, proto)
46 ver = getattr(ssl.TLSVersion, ver)
47 except AttributeError:
48 continue
49 PROTOCOL_TO_TLS_VERSION[proto] = ver
50
Christian Heimesefff7062013-11-21 03:35:02 +010051def data_file(*name):
52 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000053
Antoine Pitrou81564092010-10-08 23:06:24 +000054# The custom key and certificate files used in test_ssl are generated
55# using Lib/test/make_ssl_certs.py.
56# Other certificates are simply fetched from the Internet servers they
57# are meant to authenticate.
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000060BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000061ONLYCERT = data_file("ssl_cert.pem")
62ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_ONLYCERT = os.fsencode(ONLYCERT)
64BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020065CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
66ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
67KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000068CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000069BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010070CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
71CAFILE_CACERT = data_file("capath", "5ed36f99.0")
72
Christian Heimesbd5c7d22018-01-20 15:16:30 +010073CERTFILE_INFO = {
74 'issuer': ((('countryName', 'XY'),),
75 (('localityName', 'Castle Anthrax'),),
76 (('organizationName', 'Python Software Foundation'),),
77 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020078 'notAfter': 'Aug 26 14:23:15 2028 GMT',
79 'notBefore': 'Aug 29 14:23:15 2018 GMT',
80 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081 'subject': ((('countryName', 'XY'),),
82 (('localityName', 'Castle Anthrax'),),
83 (('organizationName', 'Python Software Foundation'),),
84 (('commonName', 'localhost'),)),
85 'subjectAltName': (('DNS', 'localhost'),),
86 'version': 3
87}
Antoine Pitrou152efa22010-05-16 18:19:27 +000088
Christian Heimes22587792013-11-21 23:56:13 +010089# empty CRL
90CRLFILE = data_file("revocation.crl")
91
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010092# Two keys and certs signed by the same CA (for SNI tests)
93SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020094SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010095
96SIGNED_CERTFILE_INFO = {
97 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
98 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
99 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
100 'issuer': ((('countryName', 'XY'),),
101 (('organizationName', 'Python Software Foundation CA'),),
102 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200103 'notAfter': 'Jul 7 14:23:16 2028 GMT',
104 'notBefore': 'Aug 29 14:23:16 2018 GMT',
105 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100106 'subject': ((('countryName', 'XY'),),
107 (('localityName', 'Castle Anthrax'),),
108 (('organizationName', 'Python Software Foundation'),),
109 (('commonName', 'localhost'),)),
110 'subjectAltName': (('DNS', 'localhost'),),
111 'version': 3
112}
113
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100114SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200115SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100116SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
117SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
118
Martin Panter3840b2a2016-03-27 01:53:46 +0000119# Same certificate as pycacert.pem, but without extra text in file
120SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200121# cert with all kinds of subject alt names
122ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100123IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100124
Martin Panter3d81d932016-01-14 09:36:00 +0000125REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000126
127EMPTYCERT = data_file("nullcert.pem")
128BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000129NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200131NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200132NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100133TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200135DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100136BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000137
Christian Heimes358cfd42016-09-10 22:43:48 +0200138# Not defined in all versions of OpenSSL
139OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
140OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
141OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
142OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100143OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200144
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100145
Thomas Woutersed03b412007-08-28 21:37:11 +0000146def handle_error(prefix):
147 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000149 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000150
Antoine Pitroub5218772010-05-21 09:56:06 +0000151def can_clear_options():
152 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200153 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000154
155def no_sslv2_implies_sslv3_hello():
156 # 0.9.7h or higher
157 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
158
Christian Heimes2427b502013-11-23 11:24:32 +0100159def have_verify_flags():
160 # 0.9.8 or higher
161 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
162
Christian Heimesb7b92252018-02-25 09:49:31 +0100163def _have_secp_curves():
164 if not ssl.HAS_ECDH:
165 return False
166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
167 try:
168 ctx.set_ecdh_curve("secp384r1")
169 except ValueError:
170 return False
171 else:
172 return True
173
174
175HAVE_SECP_CURVES = _have_secp_curves()
176
177
Antoine Pitrouc695c952014-04-28 20:57:36 +0200178def utc_offset(): #NOTE: ignore issues like #1647654
179 # local time = utc time + utc offset
180 if time.daylight and time.localtime().tm_isdst > 0:
181 return -time.altzone # seconds
182 return -time.timezone
183
Christian Heimes9424bb42013-06-17 15:32:57 +0200184def asn1time(cert_time):
185 # Some versions of OpenSSL ignore seconds, see #18207
186 # 0.9.8.i
187 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
188 fmt = "%b %d %H:%M:%S %Y GMT"
189 dt = datetime.datetime.strptime(cert_time, fmt)
190 dt = dt.replace(second=0)
191 cert_time = dt.strftime(fmt)
192 # %d adds leading zero but ASN1_TIME_print() uses leading space
193 if cert_time[4] == "0":
194 cert_time = cert_time[:4] + " " + cert_time[5:]
195
196 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000197
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100198needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
199
Antoine Pitrou23df4832010-08-04 17:14:06 +0000200
Christian Heimesd0486372016-09-10 23:23:33 +0200201def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
202 cert_reqs=ssl.CERT_NONE, ca_certs=None,
203 ciphers=None, certfile=None, keyfile=None,
204 **kwargs):
205 context = ssl.SSLContext(ssl_version)
206 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200207 if cert_reqs == ssl.CERT_NONE:
208 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200209 context.verify_mode = cert_reqs
210 if ca_certs is not None:
211 context.load_verify_locations(ca_certs)
212 if certfile is not None or keyfile is not None:
213 context.load_cert_chain(certfile, keyfile)
214 if ciphers is not None:
215 context.set_ciphers(ciphers)
216 return context.wrap_socket(sock, **kwargs)
217
Christian Heimesa170fa12017-09-15 20:27:30 +0200218
219def testing_context(server_cert=SIGNED_CERTFILE):
220 """Create context
221
222 client_context, server_context, hostname = testing_context()
223 """
224 if server_cert == SIGNED_CERTFILE:
225 hostname = SIGNED_CERTFILE_HOSTNAME
226 elif server_cert == SIGNED_CERTFILE2:
227 hostname = SIGNED_CERTFILE2_HOSTNAME
228 else:
229 raise ValueError(server_cert)
230
231 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
232 client_context.load_verify_locations(SIGNING_CA)
233
234 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
235 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200236 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200237
238 return client_context, server_context, hostname
239
240
Antoine Pitrou152efa22010-05-16 18:19:27 +0000241class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000242
Antoine Pitrou480a1242010-04-28 21:37:09 +0000243 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000244 ssl.CERT_NONE
245 ssl.CERT_OPTIONAL
246 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100247 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100248 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100249 if ssl.HAS_ECDH:
250 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100251 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
252 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000253 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100254 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700255 ssl.OP_NO_SSLv2
256 ssl.OP_NO_SSLv3
257 ssl.OP_NO_TLSv1
258 ssl.OP_NO_TLSv1_3
259 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
260 ssl.OP_NO_TLSv1_1
261 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200262 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000263
Christian Heimes9d50ab52018-02-27 10:17:30 +0100264 def test_private_init(self):
265 with self.assertRaisesRegex(TypeError, "public constructor"):
266 with socket.socket() as s:
267 ssl.SSLSocket(s)
268
Antoine Pitrou172f0252014-04-18 20:33:08 +0200269 def test_str_for_enums(self):
270 # Make sure that the PROTOCOL_* constants have enum-like string
271 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200272 proto = ssl.PROTOCOL_TLS
273 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200274 ctx = ssl.SSLContext(proto)
275 self.assertIs(ctx.protocol, proto)
276
Antoine Pitrou480a1242010-04-28 21:37:09 +0000277 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000279 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 sys.stdout.write("\n RAND_status is %d (%s)\n"
281 % (v, (v and "sufficient randomness") or
282 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200283
284 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
285 self.assertEqual(len(data), 16)
286 self.assertEqual(is_cryptographic, v == 1)
287 if v:
288 data = ssl.RAND_bytes(16)
289 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200290 else:
291 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200292
Victor Stinner1e81a392013-12-19 16:47:04 +0100293 # negative num is invalid
294 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
295 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
296
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100297 if hasattr(ssl, 'RAND_egd'):
298 self.assertRaises(TypeError, ssl.RAND_egd, 1)
299 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000300 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200301 ssl.RAND_add(b"this is a random bytes object", 75.0)
302 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000303
Christian Heimesf77b4b22013-08-21 13:26:05 +0200304 @unittest.skipUnless(os.name == 'posix', 'requires posix')
305 def test_random_fork(self):
306 status = ssl.RAND_status()
307 if not status:
308 self.fail("OpenSSL's PRNG has insufficient randomness")
309
310 rfd, wfd = os.pipe()
311 pid = os.fork()
312 if pid == 0:
313 try:
314 os.close(rfd)
315 child_random = ssl.RAND_pseudo_bytes(16)[0]
316 self.assertEqual(len(child_random), 16)
317 os.write(wfd, child_random)
318 os.close(wfd)
319 except BaseException:
320 os._exit(1)
321 else:
322 os._exit(0)
323 else:
324 os.close(wfd)
325 self.addCleanup(os.close, rfd)
326 _, status = os.waitpid(pid, 0)
327 self.assertEqual(status, 0)
328
329 child_random = os.read(rfd, 16)
330 self.assertEqual(len(child_random), 16)
331 parent_random = ssl.RAND_pseudo_bytes(16)[0]
332 self.assertEqual(len(parent_random), 16)
333
334 self.assertNotEqual(child_random, parent_random)
335
Christian Heimese6dac002018-08-30 07:25:49 +0200336 maxDiff = None
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimesa37f5242019-01-15 23:47:42 +0100367 def test_parse_cert_CVE_2019_5010(self):
368 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 self.assertEqual(
372 p,
373 {
374 'issuer': (
375 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
376 'notAfter': 'Jun 14 18:00:58 2028 GMT',
377 'notBefore': 'Jun 18 18:00:58 2018 GMT',
378 'serialNumber': '02',
379 'subject': ((('countryName', 'UK'),),
380 (('commonName',
381 'codenomicon-vm-2.test.lal.cisco.com'),)),
382 'subjectAltName': (
383 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
384 'version': 3
385 }
386 )
387
Christian Heimes824f7f32013-08-17 00:54:47 +0200388 def test_parse_cert_CVE_2013_4238(self):
389 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
390 if support.verbose:
391 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
392 subject = ((('countryName', 'US'),),
393 (('stateOrProvinceName', 'Oregon'),),
394 (('localityName', 'Beaverton'),),
395 (('organizationName', 'Python Software Foundation'),),
396 (('organizationalUnitName', 'Python Core Development'),),
397 (('commonName', 'null.python.org\x00example.org'),),
398 (('emailAddress', 'python-dev@python.org'),))
399 self.assertEqual(p['subject'], subject)
400 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200401 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
402 san = (('DNS', 'altnull.python.org\x00example.com'),
403 ('email', 'null@python.org\x00user@example.org'),
404 ('URI', 'http://null.python.org\x00http://example.org'),
405 ('IP Address', '192.0.2.1'),
406 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
407 else:
408 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
409 san = (('DNS', 'altnull.python.org\x00example.com'),
410 ('email', 'null@python.org\x00user@example.org'),
411 ('URI', 'http://null.python.org\x00http://example.org'),
412 ('IP Address', '192.0.2.1'),
413 ('IP Address', '<invalid>'))
414
415 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200416
Christian Heimes1c03abd2016-09-06 23:25:35 +0200417 def test_parse_all_sans(self):
418 p = ssl._ssl._test_decode_cert(ALLSANFILE)
419 self.assertEqual(p['subjectAltName'],
420 (
421 ('DNS', 'allsans'),
422 ('othername', '<unsupported>'),
423 ('othername', '<unsupported>'),
424 ('email', 'user@example.org'),
425 ('DNS', 'www.example.org'),
426 ('DirName',
427 ((('countryName', 'XY'),),
428 (('localityName', 'Castle Anthrax'),),
429 (('organizationName', 'Python Software Foundation'),),
430 (('commonName', 'dirname example'),))),
431 ('URI', 'https://www.python.org/'),
432 ('IP Address', '127.0.0.1'),
433 ('IP Address', '0:0:0:0:0:0:0:1\n'),
434 ('Registered ID', '1.2.3.4.5')
435 )
436 )
437
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000439 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000440 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000441 d1 = ssl.PEM_cert_to_DER_cert(pem)
442 p2 = ssl.DER_cert_to_PEM_cert(d1)
443 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000444 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000445 if not p2.startswith(ssl.PEM_HEADER + '\n'):
446 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
447 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
448 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000449
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 def test_openssl_version(self):
451 n = ssl.OPENSSL_VERSION_NUMBER
452 t = ssl.OPENSSL_VERSION_INFO
453 s = ssl.OPENSSL_VERSION
454 self.assertIsInstance(n, int)
455 self.assertIsInstance(t, tuple)
456 self.assertIsInstance(s, str)
457 # Some sanity checks follow
458 # >= 0.9
459 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400460 # < 3.0
461 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000462 major, minor, fix, patch, status = t
463 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400464 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 self.assertGreaterEqual(minor, 0)
466 self.assertLess(minor, 256)
467 self.assertGreaterEqual(fix, 0)
468 self.assertLess(fix, 256)
469 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100470 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000471 self.assertGreaterEqual(status, 0)
472 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400473 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200474 if IS_LIBRESSL:
475 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100476 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400477 else:
478 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000480
Antoine Pitrou9d543662010-04-23 23:10:32 +0000481 @support.cpython_only
482 def test_refcycle(self):
483 # Issue #7943: an SSL object doesn't create reference cycles with
484 # itself.
485 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200486 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000487 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100488 with support.check_warnings(("", ResourceWarning)):
489 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100490 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000491
Antoine Pitroua468adc2010-09-14 14:43:44 +0000492 def test_wrapped_unconnected(self):
493 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200494 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200496 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100497 self.assertRaises(OSError, ss.recv, 1)
498 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
499 self.assertRaises(OSError, ss.recvfrom, 1)
500 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
501 self.assertRaises(OSError, ss.send, b'x')
502 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200503 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100504 self.assertRaises(NotImplementedError, ss.sendmsg,
505 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
507 self.assertRaises(NotImplementedError, ss.recvmsg_into,
508 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000509
Antoine Pitrou40f08742010-04-24 22:04:40 +0000510 def test_timeout(self):
511 # Issue #8524: when creating an SSL socket, the timeout of the
512 # original socket should be retained.
513 for timeout in (None, 0.0, 5.0):
514 s = socket.socket(socket.AF_INET)
515 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200516 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100517 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000518
Christian Heimesd0486372016-09-10 23:23:33 +0200519 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000520 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000521 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000522 "certfile must be specified",
523 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified for server-side operations",
526 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200529 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100530 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
531 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200533 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000534 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000535 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000536 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200537 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000538 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000539 ssl.wrap_socket(sock,
540 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000541 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200542 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000543 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000544 ssl.wrap_socket(sock,
545 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000546 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000547
Martin Panter3464ea22016-02-01 21:58:11 +0000548 def bad_cert_test(self, certfile):
549 """Check that trying to use the given client certificate fails"""
550 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
551 certfile)
552 sock = socket.socket()
553 self.addCleanup(sock.close)
554 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200555 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200556 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000557
558 def test_empty_cert(self):
559 """Wrapping with an empty cert file"""
560 self.bad_cert_test("nullcert.pem")
561
562 def test_malformed_cert(self):
563 """Wrapping with a badly formatted certificate (syntax error)"""
564 self.bad_cert_test("badcert.pem")
565
566 def test_malformed_key(self):
567 """Wrapping with a badly formatted key (syntax error)"""
568 self.bad_cert_test("badkey.pem")
569
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000570 def test_match_hostname(self):
571 def ok(cert, hostname):
572 ssl.match_hostname(cert, hostname)
573 def fail(cert, hostname):
574 self.assertRaises(ssl.CertificateError,
575 ssl.match_hostname, cert, hostname)
576
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100577 # -- Hostname matching --
578
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000579 cert = {'subject': ((('commonName', 'example.com'),),)}
580 ok(cert, 'example.com')
581 ok(cert, 'ExAmple.cOm')
582 fail(cert, 'www.example.com')
583 fail(cert, '.example.com')
584 fail(cert, 'example.org')
585 fail(cert, 'exampleXcom')
586
587 cert = {'subject': ((('commonName', '*.a.com'),),)}
588 ok(cert, 'foo.a.com')
589 fail(cert, 'bar.foo.a.com')
590 fail(cert, 'a.com')
591 fail(cert, 'Xa.com')
592 fail(cert, '.a.com')
593
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 # only match wildcards when they are the only thing
595 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000596 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 fail(cert, 'foo.com')
598 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 fail(cert, 'bar.com')
600 fail(cert, 'foo.a.com')
601 fail(cert, 'bar.foo.com')
602
Christian Heimes824f7f32013-08-17 00:54:47 +0200603 # NULL bytes are bad, CVE-2013-4073
604 cert = {'subject': ((('commonName',
605 'null.python.org\x00example.org'),),)}
606 ok(cert, 'null.python.org\x00example.org') # or raise an error?
607 fail(cert, 'example.org')
608 fail(cert, 'null.python.org')
609
Georg Brandl72c98d32013-10-27 07:16:53 +0100610 # error cases with wildcards
611 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
612 fail(cert, 'bar.foo.a.com')
613 fail(cert, 'a.com')
614 fail(cert, 'Xa.com')
615 fail(cert, '.a.com')
616
617 cert = {'subject': ((('commonName', 'a.*.com'),),)}
618 fail(cert, 'a.foo.com')
619 fail(cert, 'a..com')
620 fail(cert, 'a.com')
621
622 # wildcard doesn't match IDNA prefix 'xn--'
623 idna = 'püthon.python.org'.encode("idna").decode("ascii")
624 cert = {'subject': ((('commonName', idna),),)}
625 ok(cert, idna)
626 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
627 fail(cert, idna)
628 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
629 fail(cert, idna)
630
631 # wildcard in first fragment and IDNA A-labels in sequent fragments
632 # are supported.
633 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
634 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530635 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100637 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
638 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
639
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000640 # Slightly fake real-world example
641 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
642 'subject': ((('commonName', 'linuxfrz.org'),),),
643 'subjectAltName': (('DNS', 'linuxfr.org'),
644 ('DNS', 'linuxfr.com'),
645 ('othername', '<unsupported>'))}
646 ok(cert, 'linuxfr.org')
647 ok(cert, 'linuxfr.com')
648 # Not a "DNS" entry
649 fail(cert, '<unsupported>')
650 # When there is a subjectAltName, commonName isn't used
651 fail(cert, 'linuxfrz.org')
652
653 # A pristine real-world example
654 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
655 'subject': ((('countryName', 'US'),),
656 (('stateOrProvinceName', 'California'),),
657 (('localityName', 'Mountain View'),),
658 (('organizationName', 'Google Inc'),),
659 (('commonName', 'mail.google.com'),))}
660 ok(cert, 'mail.google.com')
661 fail(cert, 'gmail.com')
662 # Only commonName is considered
663 fail(cert, 'California')
664
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100665 # -- IPv4 matching --
666 cert = {'subject': ((('commonName', 'example.com'),),),
667 'subjectAltName': (('DNS', 'example.com'),
668 ('IP Address', '10.11.12.13'),
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700669 ('IP Address', '14.15.16.17'),
670 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100671 ok(cert, '10.11.12.13')
672 ok(cert, '14.15.16.17')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700673 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
674 fail(cert, '127.1')
675 fail(cert, '14.15.16.17 ')
676 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100677 fail(cert, '14.15.16.18')
678 fail(cert, 'example.net')
679
680 # -- IPv6 matching --
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700681 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100682 cert = {'subject': ((('commonName', 'example.com'),),),
683 'subjectAltName': (
684 ('DNS', 'example.com'),
685 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
686 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
687 ok(cert, '2001::cafe')
688 ok(cert, '2003::baba')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700689 fail(cert, '2003::baba ')
690 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100691 fail(cert, '2003::bebe')
692 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100693
694 # -- Miscellaneous --
695
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000696 # Neither commonName nor subjectAltName
697 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
698 'subject': ((('countryName', 'US'),),
699 (('stateOrProvinceName', 'California'),),
700 (('localityName', 'Mountain View'),),
701 (('organizationName', 'Google Inc'),))}
702 fail(cert, 'mail.google.com')
703
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200704 # No DNS entry in subjectAltName but a commonName
705 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
706 'subject': ((('countryName', 'US'),),
707 (('stateOrProvinceName', 'California'),),
708 (('localityName', 'Mountain View'),),
709 (('commonName', 'mail.google.com'),)),
710 'subjectAltName': (('othername', 'blabla'), )}
711 ok(cert, 'mail.google.com')
712
713 # No DNS entry subjectAltName and no commonName
714 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
715 'subject': ((('countryName', 'US'),),
716 (('stateOrProvinceName', 'California'),),
717 (('localityName', 'Mountain View'),),
718 (('organizationName', 'Google Inc'),)),
719 'subjectAltName': (('othername', 'blabla'),)}
720 fail(cert, 'google.com')
721
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000722 # Empty cert / no cert
723 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
724 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
725
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200726 # Issue #17980: avoid denials of service by refusing more than one
727 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100728 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
729 with self.assertRaisesRegex(
730 ssl.CertificateError,
731 "partial wildcards in leftmost label are not supported"):
732 ssl.match_hostname(cert, 'axxb.example.com')
733
734 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
735 with self.assertRaisesRegex(
736 ssl.CertificateError,
737 "wildcard can only be present in the leftmost label"):
738 ssl.match_hostname(cert, 'www.sub.example.com')
739
740 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
741 with self.assertRaisesRegex(
742 ssl.CertificateError,
743 "too many wildcards"):
744 ssl.match_hostname(cert, 'axxbxxc.example.com')
745
746 cert = {'subject': ((('commonName', '*'),),)}
747 with self.assertRaisesRegex(
748 ssl.CertificateError,
749 "sole wildcard without additional labels are not support"):
750 ssl.match_hostname(cert, 'host')
751
752 cert = {'subject': ((('commonName', '*.com'),),)}
753 with self.assertRaisesRegex(
754 ssl.CertificateError,
755 r"hostname 'com' doesn't match '\*.com'"):
756 ssl.match_hostname(cert, 'com')
757
758 # extra checks for _inet_paton()
759 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
760 with self.assertRaises(ValueError):
761 ssl._inet_paton(invalid)
762 for ipaddr in ['127.0.0.1', '192.168.0.1']:
763 self.assertTrue(ssl._inet_paton(ipaddr))
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700764 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100765 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
766 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200767
Antoine Pitroud5323212010-10-22 18:19:07 +0000768 def test_server_side(self):
769 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200770 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000771 with socket.socket() as sock:
772 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
773 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000774
Antoine Pitroud6494802011-07-21 01:11:30 +0200775 def test_unknown_channel_binding(self):
776 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200777 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200778 c = socket.socket(socket.AF_INET)
779 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200780 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100781 with self.assertRaises(ValueError):
782 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200783 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200784
785 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
786 "'tls-unique' channel binding not available")
787 def test_tls_unique_channel_binding(self):
788 # unconnected should return None for known type
789 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200790 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100791 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200792 # the same for server-side
793 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200794 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100795 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200796
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600797 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200798 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600799 r = repr(ss)
800 with self.assertWarns(ResourceWarning) as cm:
801 ss = None
802 support.gc_collect()
803 self.assertIn(r, str(cm.warning.args[0]))
804
Christian Heimes6d7ad132013-06-09 18:02:55 +0200805 def test_get_default_verify_paths(self):
806 paths = ssl.get_default_verify_paths()
807 self.assertEqual(len(paths), 6)
808 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
809
810 with support.EnvironmentVarGuard() as env:
811 env["SSL_CERT_DIR"] = CAPATH
812 env["SSL_CERT_FILE"] = CERTFILE
813 paths = ssl.get_default_verify_paths()
814 self.assertEqual(paths.cafile, CERTFILE)
815 self.assertEqual(paths.capath, CAPATH)
816
Christian Heimes44109d72013-11-22 01:51:30 +0100817 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
818 def test_enum_certificates(self):
819 self.assertTrue(ssl.enum_certificates("CA"))
820 self.assertTrue(ssl.enum_certificates("ROOT"))
821
822 self.assertRaises(TypeError, ssl.enum_certificates)
823 self.assertRaises(WindowsError, ssl.enum_certificates, "")
824
Christian Heimesc2d65e12013-11-22 16:13:55 +0100825 trust_oids = set()
826 for storename in ("CA", "ROOT"):
827 store = ssl.enum_certificates(storename)
828 self.assertIsInstance(store, list)
829 for element in store:
830 self.assertIsInstance(element, tuple)
831 self.assertEqual(len(element), 3)
832 cert, enc, trust = element
833 self.assertIsInstance(cert, bytes)
834 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Steve Dowerfdd17ab2019-09-10 02:02:04 -0700835 self.assertIsInstance(trust, (frozenset, set, bool))
836 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100837 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100838
839 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100840 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200841
Christian Heimes46bebee2013-06-09 19:03:31 +0200842 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100843 def test_enum_crls(self):
844 self.assertTrue(ssl.enum_crls("CA"))
845 self.assertRaises(TypeError, ssl.enum_crls)
846 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200847
Christian Heimes44109d72013-11-22 01:51:30 +0100848 crls = ssl.enum_crls("CA")
849 self.assertIsInstance(crls, list)
850 for element in crls:
851 self.assertIsInstance(element, tuple)
852 self.assertEqual(len(element), 2)
853 self.assertIsInstance(element[0], bytes)
854 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200855
Christian Heimes46bebee2013-06-09 19:03:31 +0200856
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100857 def test_asn1object(self):
858 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
859 '1.3.6.1.5.5.7.3.1')
860
861 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
862 self.assertEqual(val, expected)
863 self.assertEqual(val.nid, 129)
864 self.assertEqual(val.shortname, 'serverAuth')
865 self.assertEqual(val.longname, 'TLS Web Server Authentication')
866 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
867 self.assertIsInstance(val, ssl._ASN1Object)
868 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
869
870 val = ssl._ASN1Object.fromnid(129)
871 self.assertEqual(val, expected)
872 self.assertIsInstance(val, ssl._ASN1Object)
873 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100874 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
875 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100876 for i in range(1000):
877 try:
878 obj = ssl._ASN1Object.fromnid(i)
879 except ValueError:
880 pass
881 else:
882 self.assertIsInstance(obj.nid, int)
883 self.assertIsInstance(obj.shortname, str)
884 self.assertIsInstance(obj.longname, str)
885 self.assertIsInstance(obj.oid, (str, type(None)))
886
887 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
888 self.assertEqual(val, expected)
889 self.assertIsInstance(val, ssl._ASN1Object)
890 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
891 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
892 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100893 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
894 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100895
Christian Heimes72d28502013-11-23 13:56:58 +0100896 def test_purpose_enum(self):
897 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
898 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
899 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
900 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
901 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
902 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
903 '1.3.6.1.5.5.7.3.1')
904
905 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
906 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
907 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
908 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
909 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
910 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
911 '1.3.6.1.5.5.7.3.2')
912
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100913 def test_unsupported_dtls(self):
914 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
915 self.addCleanup(s.close)
916 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200917 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100918 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200919 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100920 with self.assertRaises(NotImplementedError) as cx:
921 ctx.wrap_socket(s)
922 self.assertEqual(str(cx.exception), "only stream sockets are supported")
923
Antoine Pitrouc695c952014-04-28 20:57:36 +0200924 def cert_time_ok(self, timestring, timestamp):
925 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
926
927 def cert_time_fail(self, timestring):
928 with self.assertRaises(ValueError):
929 ssl.cert_time_to_seconds(timestring)
930
931 @unittest.skipUnless(utc_offset(),
932 'local time needs to be different from UTC')
933 def test_cert_time_to_seconds_timezone(self):
934 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
935 # results if local timezone is not UTC
936 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
937 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
938
939 def test_cert_time_to_seconds(self):
940 timestring = "Jan 5 09:34:43 2018 GMT"
941 ts = 1515144883.0
942 self.cert_time_ok(timestring, ts)
943 # accept keyword parameter, assert its name
944 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
945 # accept both %e and %d (space or zero generated by strftime)
946 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
947 # case-insensitive
948 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
949 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
950 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
951 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
952 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
953 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
954 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
955 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
956
957 newyear_ts = 1230768000.0
958 # leap seconds
959 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
960 # same timestamp
961 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
962
963 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
964 # allow 60th second (even if it is not a leap second)
965 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
966 # allow 2nd leap second for compatibility with time.strptime()
967 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
968 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
969
Mike53f7a7c2017-12-14 14:04:53 +0300970 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200971 # 99991231235959Z (rfc 5280)
972 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
973
974 @support.run_with_locale('LC_ALL', '')
975 def test_cert_time_to_seconds_locale(self):
976 # `cert_time_to_seconds()` should be locale independent
977
978 def local_february_name():
979 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
980
981 if local_february_name().lower() == 'feb':
982 self.skipTest("locale-specific month name needs to be "
983 "different from C locale")
984
985 # locale-independent
986 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
987 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
988
Martin Panter3840b2a2016-03-27 01:53:46 +0000989 def test_connect_ex_error(self):
990 server = socket.socket(socket.AF_INET)
991 self.addCleanup(server.close)
992 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200993 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000994 cert_reqs=ssl.CERT_REQUIRED)
995 self.addCleanup(s.close)
996 rc = s.connect_ex((HOST, port))
997 # Issue #19919: Windows machines or VMs hosted on Windows
998 # machines sometimes return EWOULDBLOCK.
999 errors = (
1000 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1001 errno.EWOULDBLOCK,
1002 )
1003 self.assertIn(rc, errors)
1004
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001005
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006class ContextTests(unittest.TestCase):
1007
1008 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001009 for protocol in PROTOCOLS:
1010 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001011 ctx = ssl.SSLContext()
1012 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001013 self.assertRaises(ValueError, ssl.SSLContext, -1)
1014 self.assertRaises(ValueError, ssl.SSLContext, 42)
1015
1016 def test_protocol(self):
1017 for proto in PROTOCOLS:
1018 ctx = ssl.SSLContext(proto)
1019 self.assertEqual(ctx.protocol, proto)
1020
1021 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001023 ctx.set_ciphers("ALL")
1024 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001025 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001026 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001027
Christian Heimes892d66e2018-01-29 14:10:18 +01001028 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1029 "Test applies only to Python default ciphers")
1030 def test_python_ciphers(self):
1031 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1032 ciphers = ctx.get_ciphers()
1033 for suite in ciphers:
1034 name = suite['name']
1035 self.assertNotIn("PSK", name)
1036 self.assertNotIn("SRP", name)
1037 self.assertNotIn("MD5", name)
1038 self.assertNotIn("RC4", name)
1039 self.assertNotIn("3DES", name)
1040
Christian Heimes25bfcd52016-09-06 00:04:45 +02001041 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1042 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001044 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001045 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001046 self.assertIn('AES256-GCM-SHA384', names)
1047 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001048
Antoine Pitroub5218772010-05-21 09:56:06 +00001049 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001052 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001053 # SSLContext also enables these by default
1054 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001055 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1056 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001057 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001058 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001059 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001060 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001061 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1062 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001063 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001064 # Ubuntu has OP_NO_SSLv3 forced on by default
1065 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001066 else:
1067 with self.assertRaises(ValueError):
1068 ctx.options = 0
1069
Christian Heimesa170fa12017-09-15 20:27:30 +02001070 def test_verify_mode_protocol(self):
1071 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001072 # Default value
1073 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1074 ctx.verify_mode = ssl.CERT_OPTIONAL
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1076 ctx.verify_mode = ssl.CERT_REQUIRED
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1078 ctx.verify_mode = ssl.CERT_NONE
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 with self.assertRaises(TypeError):
1081 ctx.verify_mode = None
1082 with self.assertRaises(ValueError):
1083 ctx.verify_mode = 42
1084
Christian Heimesa170fa12017-09-15 20:27:30 +02001085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1086 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1087 self.assertFalse(ctx.check_hostname)
1088
1089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1090 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1091 self.assertTrue(ctx.check_hostname)
1092
Christian Heimes61d478c2018-01-27 15:51:38 +01001093 def test_hostname_checks_common_name(self):
1094 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1097 ctx.hostname_checks_common_name = True
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 ctx.hostname_checks_common_name = False
1100 self.assertFalse(ctx.hostname_checks_common_name)
1101 ctx.hostname_checks_common_name = True
1102 self.assertTrue(ctx.hostname_checks_common_name)
1103 else:
1104 with self.assertRaises(AttributeError):
1105 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001106
Christian Heimes698dde12018-02-27 11:54:43 +01001107 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1108 "required OpenSSL 1.1.0g")
Miss Islington (bot)d6ac67f2019-09-11 10:59:13 -07001109 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001110 def test_min_max_version(self):
1111 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001112 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1113 # Fedora override the setting to TLS 1.0.
1114 self.assertIn(
1115 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001116 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1117 # Fedora 29 uses TLS 1.0 by default
1118 ssl.TLSVersion.TLSv1,
1119 # RHEL 8 uses TLS 1.2 by default
1120 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001121 )
1122 self.assertEqual(
1123 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1124 )
1125
1126 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1127 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1128 self.assertEqual(
1129 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1130 )
1131 self.assertEqual(
1132 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1133 )
1134
1135 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1136 ctx.maximum_version = ssl.TLSVersion.TLSv1
1137 self.assertEqual(
1138 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1139 )
1140 self.assertEqual(
1141 ctx.maximum_version, ssl.TLSVersion.TLSv1
1142 )
1143
1144 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1145 self.assertEqual(
1146 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1147 )
1148
1149 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1150 self.assertIn(
1151 ctx.maximum_version,
1152 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1153 )
1154
1155 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1156 self.assertIn(
1157 ctx.minimum_version,
1158 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1159 )
1160
1161 with self.assertRaises(ValueError):
1162 ctx.minimum_version = 42
1163
1164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1165
1166 self.assertEqual(
1167 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1168 )
1169 self.assertEqual(
1170 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1171 )
1172 with self.assertRaises(ValueError):
1173 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1174 with self.assertRaises(ValueError):
1175 ctx.maximum_version = ssl.TLSVersion.TLSv1
1176
1177
Christian Heimes2427b502013-11-23 11:24:32 +01001178 @unittest.skipUnless(have_verify_flags(),
1179 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001180 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001181 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001182 # default value
1183 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1184 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001185 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1186 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1187 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1188 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1189 ctx.verify_flags = ssl.VERIFY_DEFAULT
1190 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1191 # supports any value
1192 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1193 self.assertEqual(ctx.verify_flags,
1194 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1195 with self.assertRaises(TypeError):
1196 ctx.verify_flags = None
1197
Antoine Pitrou152efa22010-05-16 18:19:27 +00001198 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001199 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001201 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001202 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1203 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001204 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001205 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001206 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001207 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001208 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001209 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001210 ctx.load_cert_chain(EMPTYCERT)
1211 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001212 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1214 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1215 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001216 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001217 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001218 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001219 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001220 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001221 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1222 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001223 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001224 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001225 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001226 # Password protected key and cert
1227 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1229 ctx.load_cert_chain(CERTFILE_PROTECTED,
1230 password=bytearray(KEY_PASSWORD.encode()))
1231 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1232 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1233 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1234 bytearray(KEY_PASSWORD.encode()))
1235 with self.assertRaisesRegex(TypeError, "should be a string"):
1236 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1237 with self.assertRaises(ssl.SSLError):
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1239 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1240 # openssl has a fixed limit on the password buffer.
1241 # PEM_BUFSIZE is generally set to 1kb.
1242 # Return a string larger than this.
1243 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1244 # Password callback
1245 def getpass_unicode():
1246 return KEY_PASSWORD
1247 def getpass_bytes():
1248 return KEY_PASSWORD.encode()
1249 def getpass_bytearray():
1250 return bytearray(KEY_PASSWORD.encode())
1251 def getpass_badpass():
1252 return "badpass"
1253 def getpass_huge():
1254 return b'a' * (1024 * 1024)
1255 def getpass_bad_type():
1256 return 9
1257 def getpass_exception():
1258 raise Exception('getpass error')
1259 class GetPassCallable:
1260 def __call__(self):
1261 return KEY_PASSWORD
1262 def getpass(self):
1263 return KEY_PASSWORD
1264 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1266 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1268 ctx.load_cert_chain(CERTFILE_PROTECTED,
1269 password=GetPassCallable().getpass)
1270 with self.assertRaises(ssl.SSLError):
1271 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1272 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1273 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1274 with self.assertRaisesRegex(TypeError, "must return a string"):
1275 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1276 with self.assertRaisesRegex(Exception, "getpass error"):
1277 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1278 # Make sure the password function isn't called if it isn't needed
1279 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001280
1281 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001282 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001283 ctx.load_verify_locations(CERTFILE)
1284 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1285 ctx.load_verify_locations(BYTES_CERTFILE)
1286 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1287 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001288 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001289 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001290 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001291 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001292 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001293 ctx.load_verify_locations(BADCERT)
1294 ctx.load_verify_locations(CERTFILE, CAPATH)
1295 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1296
Victor Stinner80f75e62011-01-29 11:31:20 +00001297 # Issue #10989: crash if the second argument type is invalid
1298 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1299
Christian Heimesefff7062013-11-21 03:35:02 +01001300 def test_load_verify_cadata(self):
1301 # test cadata
1302 with open(CAFILE_CACERT) as f:
1303 cacert_pem = f.read()
1304 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1305 with open(CAFILE_NEURONIO) as f:
1306 neuronio_pem = f.read()
1307 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1308
1309 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001310 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001311 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1312 ctx.load_verify_locations(cadata=cacert_pem)
1313 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1314 ctx.load_verify_locations(cadata=neuronio_pem)
1315 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1316 # cert already in hash table
1317 ctx.load_verify_locations(cadata=neuronio_pem)
1318 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1319
1320 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001321 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001322 combined = "\n".join((cacert_pem, neuronio_pem))
1323 ctx.load_verify_locations(cadata=combined)
1324 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1325
1326 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001327 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001328 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1329 neuronio_pem, "tail"]
1330 ctx.load_verify_locations(cadata="\n".join(combined))
1331 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1332
1333 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001334 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001335 ctx.load_verify_locations(cadata=cacert_der)
1336 ctx.load_verify_locations(cadata=neuronio_der)
1337 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1338 # cert already in hash table
1339 ctx.load_verify_locations(cadata=cacert_der)
1340 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1341
1342 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001343 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001344 combined = b"".join((cacert_der, neuronio_der))
1345 ctx.load_verify_locations(cadata=combined)
1346 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1347
1348 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001349 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001350 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1351
1352 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1353 ctx.load_verify_locations(cadata="broken")
1354 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1355 ctx.load_verify_locations(cadata=b"broken")
1356
1357
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001358 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001359 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001360 ctx.load_dh_params(DHFILE)
1361 if os.name != 'nt':
1362 ctx.load_dh_params(BYTES_DHFILE)
1363 self.assertRaises(TypeError, ctx.load_dh_params)
1364 self.assertRaises(TypeError, ctx.load_dh_params, None)
1365 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001366 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001367 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001368 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001369 ctx.load_dh_params(CERTFILE)
1370
Antoine Pitroub0182c82010-10-12 20:09:02 +00001371 def test_session_stats(self):
1372 for proto in PROTOCOLS:
1373 ctx = ssl.SSLContext(proto)
1374 self.assertEqual(ctx.session_stats(), {
1375 'number': 0,
1376 'connect': 0,
1377 'connect_good': 0,
1378 'connect_renegotiate': 0,
1379 'accept': 0,
1380 'accept_good': 0,
1381 'accept_renegotiate': 0,
1382 'hits': 0,
1383 'misses': 0,
1384 'timeouts': 0,
1385 'cache_full': 0,
1386 })
1387
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001388 def test_set_default_verify_paths(self):
1389 # There's not much we can do to test that it acts as expected,
1390 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001391 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001392 ctx.set_default_verify_paths()
1393
Antoine Pitrou501da612011-12-21 09:27:41 +01001394 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001395 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001396 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001397 ctx.set_ecdh_curve("prime256v1")
1398 ctx.set_ecdh_curve(b"prime256v1")
1399 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1400 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1401 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1402 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1403
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001404 @needs_sni
1405 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001406 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001407
1408 # set_servername_callback expects a callable, or None
1409 self.assertRaises(TypeError, ctx.set_servername_callback)
1410 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1411 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1412 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1413
1414 def dummycallback(sock, servername, ctx):
1415 pass
1416 ctx.set_servername_callback(None)
1417 ctx.set_servername_callback(dummycallback)
1418
1419 @needs_sni
1420 def test_sni_callback_refcycle(self):
1421 # Reference cycles through the servername callback are detected
1422 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001423 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001424 def dummycallback(sock, servername, ctx, cycle=ctx):
1425 pass
1426 ctx.set_servername_callback(dummycallback)
1427 wr = weakref.ref(ctx)
1428 del ctx, dummycallback
1429 gc.collect()
1430 self.assertIs(wr(), None)
1431
Christian Heimes9a5395a2013-06-17 15:44:12 +02001432 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001433 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001434 self.assertEqual(ctx.cert_store_stats(),
1435 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1436 ctx.load_cert_chain(CERTFILE)
1437 self.assertEqual(ctx.cert_store_stats(),
1438 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1439 ctx.load_verify_locations(CERTFILE)
1440 self.assertEqual(ctx.cert_store_stats(),
1441 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001442 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001443 self.assertEqual(ctx.cert_store_stats(),
1444 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1445
1446 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001447 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001448 self.assertEqual(ctx.get_ca_certs(), [])
1449 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1450 ctx.load_verify_locations(CERTFILE)
1451 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001452 # but CAFILE_CACERT is a CA cert
1453 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001454 self.assertEqual(ctx.get_ca_certs(),
1455 [{'issuer': ((('organizationName', 'Root CA'),),
1456 (('organizationalUnitName', 'http://www.cacert.org'),),
1457 (('commonName', 'CA Cert Signing Authority'),),
1458 (('emailAddress', 'support@cacert.org'),)),
1459 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1460 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1461 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001462 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001463 'subject': ((('organizationName', 'Root CA'),),
1464 (('organizationalUnitName', 'http://www.cacert.org'),),
1465 (('commonName', 'CA Cert Signing Authority'),),
1466 (('emailAddress', 'support@cacert.org'),)),
1467 'version': 3}])
1468
Martin Panterb55f8b72016-01-14 12:53:56 +00001469 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001470 pem = f.read()
1471 der = ssl.PEM_cert_to_DER_cert(pem)
1472 self.assertEqual(ctx.get_ca_certs(True), [der])
1473
Christian Heimes72d28502013-11-23 13:56:58 +01001474 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001475 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001476 ctx.load_default_certs()
1477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001479 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1480 ctx.load_default_certs()
1481
Christian Heimesa170fa12017-09-15 20:27:30 +02001482 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001483 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1484
Christian Heimesa170fa12017-09-15 20:27:30 +02001485 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001486 self.assertRaises(TypeError, ctx.load_default_certs, None)
1487 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1488
Benjamin Peterson91244e02014-10-03 18:17:15 -04001489 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001490 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001491 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001493 with support.EnvironmentVarGuard() as env:
1494 env["SSL_CERT_DIR"] = CAPATH
1495 env["SSL_CERT_FILE"] = CERTFILE
1496 ctx.load_default_certs()
1497 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1498
Benjamin Peterson91244e02014-10-03 18:17:15 -04001499 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001500 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001501 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001502 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001503 ctx.load_default_certs()
1504 stats = ctx.cert_store_stats()
1505
Christian Heimesa170fa12017-09-15 20:27:30 +02001506 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001507 with support.EnvironmentVarGuard() as env:
1508 env["SSL_CERT_DIR"] = CAPATH
1509 env["SSL_CERT_FILE"] = CERTFILE
1510 ctx.load_default_certs()
1511 stats["x509"] += 1
1512 self.assertEqual(ctx.cert_store_stats(), stats)
1513
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 def _assert_context_options(self, ctx):
1515 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1516 if OP_NO_COMPRESSION != 0:
1517 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1518 OP_NO_COMPRESSION)
1519 if OP_SINGLE_DH_USE != 0:
1520 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1521 OP_SINGLE_DH_USE)
1522 if OP_SINGLE_ECDH_USE != 0:
1523 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1524 OP_SINGLE_ECDH_USE)
1525 if OP_CIPHER_SERVER_PREFERENCE != 0:
1526 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1527 OP_CIPHER_SERVER_PREFERENCE)
1528
Christian Heimes4c05b472013-11-23 15:58:30 +01001529 def test_create_default_context(self):
1530 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001531
Christian Heimesa170fa12017-09-15 20:27:30 +02001532 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001534 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
1536
Christian Heimes4c05b472013-11-23 15:58:30 +01001537 with open(SIGNING_CA) as f:
1538 cadata = f.read()
1539 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1540 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001541 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001542 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001543 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001544
1545 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001546 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001547 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001548 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001549
Christian Heimes67986f92013-11-23 22:43:47 +01001550 def test__create_stdlib_context(self):
1551 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001552 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001553 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001554 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001555 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001556
1557 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1558 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1559 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001560 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001561
1562 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001563 cert_reqs=ssl.CERT_REQUIRED,
1564 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001565 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1566 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001567 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001568 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001569
1570 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001571 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001572 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001573 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001574
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001576 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001577 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001578 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001579
Christian Heimese82c0342017-09-15 20:29:57 +02001580 # Auto set CERT_REQUIRED
1581 ctx.check_hostname = True
1582 self.assertTrue(ctx.check_hostname)
1583 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1584 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001585 ctx.verify_mode = ssl.CERT_REQUIRED
1586 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001587 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001588
Christian Heimese82c0342017-09-15 20:29:57 +02001589 # Changing verify_mode does not affect check_hostname
1590 ctx.check_hostname = False
1591 ctx.verify_mode = ssl.CERT_NONE
1592 ctx.check_hostname = False
1593 self.assertFalse(ctx.check_hostname)
1594 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1595 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001596 ctx.check_hostname = True
1597 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001598 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1599
1600 ctx.check_hostname = False
1601 ctx.verify_mode = ssl.CERT_OPTIONAL
1602 ctx.check_hostname = False
1603 self.assertFalse(ctx.check_hostname)
1604 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1605 # keep CERT_OPTIONAL
1606 ctx.check_hostname = True
1607 self.assertTrue(ctx.check_hostname)
1608 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001609
1610 # Cannot set CERT_NONE with check_hostname enabled
1611 with self.assertRaises(ValueError):
1612 ctx.verify_mode = ssl.CERT_NONE
1613 ctx.check_hostname = False
1614 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001615 ctx.verify_mode = ssl.CERT_NONE
1616 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001617
Christian Heimes5fe668c2016-09-12 00:01:11 +02001618 def test_context_client_server(self):
1619 # PROTOCOL_TLS_CLIENT has sane defaults
1620 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1621 self.assertTrue(ctx.check_hostname)
1622 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1623
1624 # PROTOCOL_TLS_SERVER has different but also sane defaults
1625 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1626 self.assertFalse(ctx.check_hostname)
1627 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1628
Christian Heimes4df60f12017-09-15 20:26:05 +02001629 def test_context_custom_class(self):
1630 class MySSLSocket(ssl.SSLSocket):
1631 pass
1632
1633 class MySSLObject(ssl.SSLObject):
1634 pass
1635
1636 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1637 ctx.sslsocket_class = MySSLSocket
1638 ctx.sslobject_class = MySSLObject
1639
1640 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1641 self.assertIsInstance(sock, MySSLSocket)
1642 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1643 self.assertIsInstance(obj, MySSLObject)
1644
Christian Heimes78c7d522019-06-03 21:00:10 +02001645 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1646 def test_num_tickest(self):
1647 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1648 self.assertEqual(ctx.num_tickets, 2)
1649 ctx.num_tickets = 1
1650 self.assertEqual(ctx.num_tickets, 1)
1651 ctx.num_tickets = 0
1652 self.assertEqual(ctx.num_tickets, 0)
1653 with self.assertRaises(ValueError):
1654 ctx.num_tickets = -1
1655 with self.assertRaises(TypeError):
1656 ctx.num_tickets = None
1657
1658 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1659 self.assertEqual(ctx.num_tickets, 2)
1660 with self.assertRaises(ValueError):
1661 ctx.num_tickets = 1
1662
Antoine Pitrou152efa22010-05-16 18:19:27 +00001663
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001664class SSLErrorTests(unittest.TestCase):
1665
1666 def test_str(self):
1667 # The str() of a SSLError doesn't include the errno
1668 e = ssl.SSLError(1, "foo")
1669 self.assertEqual(str(e), "foo")
1670 self.assertEqual(e.errno, 1)
1671 # Same for a subclass
1672 e = ssl.SSLZeroReturnError(1, "foo")
1673 self.assertEqual(str(e), "foo")
1674 self.assertEqual(e.errno, 1)
1675
1676 def test_lib_reason(self):
1677 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001678 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001679 with self.assertRaises(ssl.SSLError) as cm:
1680 ctx.load_dh_params(CERTFILE)
1681 self.assertEqual(cm.exception.library, 'PEM')
1682 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1683 s = str(cm.exception)
1684 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1685
1686 def test_subclass(self):
1687 # Check that the appropriate SSLError subclass is raised
1688 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001689 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1690 ctx.check_hostname = False
1691 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001692 with socket.create_server(("127.0.0.1", 0)) as s:
1693 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001694 c.setblocking(False)
1695 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001696 with self.assertRaises(ssl.SSLWantReadError) as cm:
1697 c.do_handshake()
1698 s = str(cm.exception)
1699 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1700 # For compatibility
1701 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1702
1703
Christian Heimes61d478c2018-01-27 15:51:38 +01001704 def test_bad_server_hostname(self):
1705 ctx = ssl.create_default_context()
1706 with self.assertRaises(ValueError):
1707 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1708 server_hostname="")
1709 with self.assertRaises(ValueError):
1710 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1711 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001712 with self.assertRaises(TypeError):
1713 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1714 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001715
1716
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001717class MemoryBIOTests(unittest.TestCase):
1718
1719 def test_read_write(self):
1720 bio = ssl.MemoryBIO()
1721 bio.write(b'foo')
1722 self.assertEqual(bio.read(), b'foo')
1723 self.assertEqual(bio.read(), b'')
1724 bio.write(b'foo')
1725 bio.write(b'bar')
1726 self.assertEqual(bio.read(), b'foobar')
1727 self.assertEqual(bio.read(), b'')
1728 bio.write(b'baz')
1729 self.assertEqual(bio.read(2), b'ba')
1730 self.assertEqual(bio.read(1), b'z')
1731 self.assertEqual(bio.read(1), b'')
1732
1733 def test_eof(self):
1734 bio = ssl.MemoryBIO()
1735 self.assertFalse(bio.eof)
1736 self.assertEqual(bio.read(), b'')
1737 self.assertFalse(bio.eof)
1738 bio.write(b'foo')
1739 self.assertFalse(bio.eof)
1740 bio.write_eof()
1741 self.assertFalse(bio.eof)
1742 self.assertEqual(bio.read(2), b'fo')
1743 self.assertFalse(bio.eof)
1744 self.assertEqual(bio.read(1), b'o')
1745 self.assertTrue(bio.eof)
1746 self.assertEqual(bio.read(), b'')
1747 self.assertTrue(bio.eof)
1748
1749 def test_pending(self):
1750 bio = ssl.MemoryBIO()
1751 self.assertEqual(bio.pending, 0)
1752 bio.write(b'foo')
1753 self.assertEqual(bio.pending, 3)
1754 for i in range(3):
1755 bio.read(1)
1756 self.assertEqual(bio.pending, 3-i-1)
1757 for i in range(3):
1758 bio.write(b'x')
1759 self.assertEqual(bio.pending, i+1)
1760 bio.read()
1761 self.assertEqual(bio.pending, 0)
1762
1763 def test_buffer_types(self):
1764 bio = ssl.MemoryBIO()
1765 bio.write(b'foo')
1766 self.assertEqual(bio.read(), b'foo')
1767 bio.write(bytearray(b'bar'))
1768 self.assertEqual(bio.read(), b'bar')
1769 bio.write(memoryview(b'baz'))
1770 self.assertEqual(bio.read(), b'baz')
1771
1772 def test_error_types(self):
1773 bio = ssl.MemoryBIO()
1774 self.assertRaises(TypeError, bio.write, 'foo')
1775 self.assertRaises(TypeError, bio.write, None)
1776 self.assertRaises(TypeError, bio.write, True)
1777 self.assertRaises(TypeError, bio.write, 1)
1778
1779
Christian Heimes9d50ab52018-02-27 10:17:30 +01001780class SSLObjectTests(unittest.TestCase):
1781 def test_private_init(self):
1782 bio = ssl.MemoryBIO()
1783 with self.assertRaisesRegex(TypeError, "public constructor"):
1784 ssl.SSLObject(bio, bio)
1785
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001786 def test_unwrap(self):
1787 client_ctx, server_ctx, hostname = testing_context()
1788 c_in = ssl.MemoryBIO()
1789 c_out = ssl.MemoryBIO()
1790 s_in = ssl.MemoryBIO()
1791 s_out = ssl.MemoryBIO()
1792 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1793 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1794
1795 # Loop on the handshake for a bit to get it settled
1796 for _ in range(5):
1797 try:
1798 client.do_handshake()
1799 except ssl.SSLWantReadError:
1800 pass
1801 if c_out.pending:
1802 s_in.write(c_out.read())
1803 try:
1804 server.do_handshake()
1805 except ssl.SSLWantReadError:
1806 pass
1807 if s_out.pending:
1808 c_in.write(s_out.read())
1809 # Now the handshakes should be complete (don't raise WantReadError)
1810 client.do_handshake()
1811 server.do_handshake()
1812
1813 # Now if we unwrap one side unilaterally, it should send close-notify
1814 # and raise WantReadError:
1815 with self.assertRaises(ssl.SSLWantReadError):
1816 client.unwrap()
1817
1818 # But server.unwrap() does not raise, because it reads the client's
1819 # close-notify:
1820 s_in.write(c_out.read())
1821 server.unwrap()
1822
1823 # And now that the client gets the server's close-notify, it doesn't
1824 # raise either.
1825 c_in.write(s_out.read())
1826 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001827
Martin Panter3840b2a2016-03-27 01:53:46 +00001828class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 """Tests that connect to a simple server running in the background"""
1830
1831 def setUp(self):
1832 server = ThreadedEchoServer(SIGNED_CERTFILE)
1833 self.server_addr = (HOST, server.port)
1834 server.__enter__()
1835 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001836
Antoine Pitrou480a1242010-04-28 21:37:09 +00001837 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001838 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001839 cert_reqs=ssl.CERT_NONE) as s:
1840 s.connect(self.server_addr)
1841 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001842 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001843
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001845 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 cert_reqs=ssl.CERT_REQUIRED,
1847 ca_certs=SIGNING_CA) as s:
1848 s.connect(self.server_addr)
1849 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001850 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001851
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 def test_connect_fail(self):
1853 # This should fail because we have no verification certs. Connection
1854 # failure crashes ThreadedEchoServer, so run this in an independent
1855 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001856 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001857 cert_reqs=ssl.CERT_REQUIRED)
1858 self.addCleanup(s.close)
1859 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1860 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001861
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001862 def test_connect_ex(self):
1863 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001864 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001865 cert_reqs=ssl.CERT_REQUIRED,
1866 ca_certs=SIGNING_CA)
1867 self.addCleanup(s.close)
1868 self.assertEqual(0, s.connect_ex(self.server_addr))
1869 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001870
1871 def test_non_blocking_connect_ex(self):
1872 # Issue #11326: non-blocking connect_ex() should allow handshake
1873 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001874 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001875 cert_reqs=ssl.CERT_REQUIRED,
1876 ca_certs=SIGNING_CA,
1877 do_handshake_on_connect=False)
1878 self.addCleanup(s.close)
1879 s.setblocking(False)
1880 rc = s.connect_ex(self.server_addr)
1881 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1882 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1883 # Wait for connect to finish
1884 select.select([], [s], [], 5.0)
1885 # Non-blocking handshake
1886 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001887 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001888 s.do_handshake()
1889 break
1890 except ssl.SSLWantReadError:
1891 select.select([s], [], [], 5.0)
1892 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001893 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001894 # SSL established
1895 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001896
Antoine Pitrou152efa22010-05-16 18:19:27 +00001897 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001899 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1901 s.connect(self.server_addr)
1902 self.assertEqual({}, s.getpeercert())
1903 # Same with a server hostname
1904 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1905 server_hostname="dummy") as s:
1906 s.connect(self.server_addr)
1907 ctx.verify_mode = ssl.CERT_REQUIRED
1908 # This should succeed because we specify the root cert
1909 ctx.load_verify_locations(SIGNING_CA)
1910 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1911 s.connect(self.server_addr)
1912 cert = s.getpeercert()
1913 self.assertTrue(cert)
1914
1915 def test_connect_with_context_fail(self):
1916 # This should fail because we have no verification certs. Connection
1917 # failure crashes ThreadedEchoServer, so run this in an independent
1918 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001919 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001920 ctx.verify_mode = ssl.CERT_REQUIRED
1921 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1922 self.addCleanup(s.close)
1923 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1924 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001925
1926 def test_connect_capath(self):
1927 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001928 # NOTE: the subject hashing algorithm has been changed between
1929 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1930 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001931 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001932 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 ctx.verify_mode = ssl.CERT_REQUIRED
1934 ctx.load_verify_locations(capath=CAPATH)
1935 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1936 s.connect(self.server_addr)
1937 cert = s.getpeercert()
1938 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001939
Martin Panter3840b2a2016-03-27 01:53:46 +00001940 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001941 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 ctx.verify_mode = ssl.CERT_REQUIRED
1943 ctx.load_verify_locations(capath=BYTES_CAPATH)
1944 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1945 s.connect(self.server_addr)
1946 cert = s.getpeercert()
1947 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001948
Christian Heimesefff7062013-11-21 03:35:02 +01001949 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001950 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001951 pem = f.read()
1952 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001953 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001954 ctx.verify_mode = ssl.CERT_REQUIRED
1955 ctx.load_verify_locations(cadata=pem)
1956 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1957 s.connect(self.server_addr)
1958 cert = s.getpeercert()
1959 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001960
Martin Panter3840b2a2016-03-27 01:53:46 +00001961 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001962 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001963 ctx.verify_mode = ssl.CERT_REQUIRED
1964 ctx.load_verify_locations(cadata=der)
1965 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1966 s.connect(self.server_addr)
1967 cert = s.getpeercert()
1968 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001969
Antoine Pitroue3220242010-04-24 11:13:53 +00001970 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1971 def test_makefile_close(self):
1972 # Issue #5238: creating a file-like object with makefile() shouldn't
1973 # delay closing the underlying "real socket" (here tested with its
1974 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001975 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001976 ss.connect(self.server_addr)
1977 fd = ss.fileno()
1978 f = ss.makefile()
1979 f.close()
1980 # The fd is still open
1981 os.read(fd, 0)
1982 # Closing the SSL socket should close the fd too
1983 ss.close()
1984 gc.collect()
1985 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001986 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001987 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001988
Antoine Pitrou480a1242010-04-28 21:37:09 +00001989 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001990 s = socket.socket(socket.AF_INET)
1991 s.connect(self.server_addr)
1992 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001993 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001994 cert_reqs=ssl.CERT_NONE,
1995 do_handshake_on_connect=False)
1996 self.addCleanup(s.close)
1997 count = 0
1998 while True:
1999 try:
2000 count += 1
2001 s.do_handshake()
2002 break
2003 except ssl.SSLWantReadError:
2004 select.select([s], [], [])
2005 except ssl.SSLWantWriteError:
2006 select.select([], [s], [])
2007 if support.verbose:
2008 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002009
Antoine Pitrou480a1242010-04-28 21:37:09 +00002010 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002011 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002012
Martin Panter3840b2a2016-03-27 01:53:46 +00002013 def test_get_server_certificate_fail(self):
2014 # Connection failure crashes ThreadedEchoServer, so run this in an
2015 # independent test method
2016 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002017
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002018 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002019 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002020 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2021 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002022 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002023 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2024 s.connect(self.server_addr)
2025 # Error checking can happen at instantiation or when connecting
2026 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2027 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002028 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002029 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2030 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002031
Christian Heimes9a5395a2013-06-17 15:44:12 +02002032 def test_get_ca_certs_capath(self):
2033 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002035 ctx.load_verify_locations(capath=CAPATH)
2036 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002037 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2038 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002039 s.connect(self.server_addr)
2040 cert = s.getpeercert()
2041 self.assertTrue(cert)
2042 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002043
Christian Heimes575596e2013-12-15 21:49:17 +01002044 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002045 def test_context_setget(self):
2046 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002047 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2048 ctx1.load_verify_locations(capath=CAPATH)
2049 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2050 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002052 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002053 ss.connect(self.server_addr)
2054 self.assertIs(ss.context, ctx1)
2055 self.assertIs(ss._sslobj.context, ctx1)
2056 ss.context = ctx2
2057 self.assertIs(ss.context, ctx2)
2058 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002059
2060 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2061 # A simple IO loop. Call func(*args) depending on the error we get
2062 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2063 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002064 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002065 count = 0
2066 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002067 if time.monotonic() > deadline:
2068 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002069 errno = None
2070 count += 1
2071 try:
2072 ret = func(*args)
2073 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002074 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002075 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002076 raise
2077 errno = e.errno
2078 # Get any data from the outgoing BIO irrespective of any error, and
2079 # send it to the socket.
2080 buf = outgoing.read()
2081 sock.sendall(buf)
2082 # If there's no error, we're done. For WANT_READ, we need to get
2083 # data from the socket and put it in the incoming BIO.
2084 if errno is None:
2085 break
2086 elif errno == ssl.SSL_ERROR_WANT_READ:
2087 buf = sock.recv(32768)
2088 if buf:
2089 incoming.write(buf)
2090 else:
2091 incoming.write_eof()
2092 if support.verbose:
2093 sys.stdout.write("Needed %d calls to complete %s().\n"
2094 % (count, func.__name__))
2095 return ret
2096
Martin Panter3840b2a2016-03-27 01:53:46 +00002097 def test_bio_handshake(self):
2098 sock = socket.socket(socket.AF_INET)
2099 self.addCleanup(sock.close)
2100 sock.connect(self.server_addr)
2101 incoming = ssl.MemoryBIO()
2102 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002103 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2104 self.assertTrue(ctx.check_hostname)
2105 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002106 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002107 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2108 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002109 self.assertIs(sslobj._sslobj.owner, sslobj)
2110 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002111 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002112 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002113 self.assertRaises(ValueError, sslobj.getpeercert)
2114 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2115 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2116 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2117 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002118 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002119 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002120 self.assertTrue(sslobj.getpeercert())
2121 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2122 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2123 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002124 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002125 except ssl.SSLSyscallError:
2126 # If the server shuts down the TCP connection without sending a
2127 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2128 pass
2129 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2130
2131 def test_bio_read_write_data(self):
2132 sock = socket.socket(socket.AF_INET)
2133 self.addCleanup(sock.close)
2134 sock.connect(self.server_addr)
2135 incoming = ssl.MemoryBIO()
2136 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002137 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002138 ctx.verify_mode = ssl.CERT_NONE
2139 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2140 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2141 req = b'FOO\n'
2142 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2143 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2144 self.assertEqual(buf, b'foo\n')
2145 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002146
2147
Martin Panter3840b2a2016-03-27 01:53:46 +00002148class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002149
Martin Panter3840b2a2016-03-27 01:53:46 +00002150 def test_timeout_connect_ex(self):
2151 # Issue #12065: on a timeout, connect_ex() should return the original
2152 # errno (mimicking the behaviour of non-SSL sockets).
2153 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002154 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002155 cert_reqs=ssl.CERT_REQUIRED,
2156 do_handshake_on_connect=False)
2157 self.addCleanup(s.close)
2158 s.settimeout(0.0000001)
2159 rc = s.connect_ex((REMOTE_HOST, 443))
2160 if rc == 0:
2161 self.skipTest("REMOTE_HOST responded too quickly")
2162 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2163
2164 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2165 def test_get_server_certificate_ipv6(self):
2166 with support.transient_internet('ipv6.google.com'):
2167 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2168 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2169
Martin Panter3840b2a2016-03-27 01:53:46 +00002170
2171def _test_get_server_certificate(test, host, port, cert=None):
2172 pem = ssl.get_server_certificate((host, port))
2173 if not pem:
2174 test.fail("No server certificate on %s:%s!" % (host, port))
2175
2176 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2177 if not pem:
2178 test.fail("No server certificate on %s:%s!" % (host, port))
2179 if support.verbose:
2180 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2181
2182def _test_get_server_certificate_fail(test, host, port):
2183 try:
2184 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2185 except ssl.SSLError as x:
2186 #should fail
2187 if support.verbose:
2188 sys.stdout.write("%s\n" % x)
2189 else:
2190 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2191
2192
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002193from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002194
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002196
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002197 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002198
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002199 """A mildly complicated class, because we want it to work both
2200 with and without the SSL wrapper around the socket connection, so
2201 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002202
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002203 def __init__(self, server, connsock, addr):
2204 self.server = server
2205 self.running = False
2206 self.sock = connsock
2207 self.addr = addr
2208 self.sock.setblocking(1)
2209 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002210 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002211 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002212
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002213 def wrap_conn(self):
2214 try:
2215 self.sslconn = self.server.context.wrap_socket(
2216 self.sock, server_side=True)
2217 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2218 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002219 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002220 # We treat ConnectionResetError as though it were an
2221 # SSLError - OpenSSL on Ubuntu abruptly closes the
2222 # connection when asked to use an unsupported protocol.
2223 #
Christian Heimes529525f2018-05-23 22:24:45 +02002224 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2225 # tries to send session tickets after handshake.
2226 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002227 #
2228 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2229 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002230 self.server.conn_errors.append(str(e))
2231 if self.server.chatty:
2232 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2233 self.running = False
2234 self.close()
2235 return False
2236 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002237 # OSError may occur with wrong protocols, e.g. both
2238 # sides use PROTOCOL_TLS_SERVER.
2239 #
2240 # XXX Various errors can have happened here, for example
2241 # a mismatching protocol version, an invalid certificate,
2242 # or a low-level bug. This should be made more discriminating.
2243 #
2244 # bpo-31323: Store the exception as string to prevent
2245 # a reference leak: server -> conn_errors -> exception
2246 # -> traceback -> self (ConnectionHandler) -> server
2247 self.server.conn_errors.append(str(e))
2248 if self.server.chatty:
2249 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2250 self.running = False
2251 self.server.stop()
2252 self.close()
2253 return False
2254 else:
2255 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2256 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2257 cert = self.sslconn.getpeercert()
2258 if support.verbose and self.server.chatty:
2259 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2260 cert_binary = self.sslconn.getpeercert(True)
2261 if support.verbose and self.server.chatty:
2262 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2263 cipher = self.sslconn.cipher()
2264 if support.verbose and self.server.chatty:
2265 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2266 sys.stdout.write(" server: selected protocol is now "
2267 + str(self.sslconn.selected_npn_protocol()) + "\n")
2268 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002269
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002270 def read(self):
2271 if self.sslconn:
2272 return self.sslconn.read()
2273 else:
2274 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002275
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002276 def write(self, bytes):
2277 if self.sslconn:
2278 return self.sslconn.write(bytes)
2279 else:
2280 return self.sock.send(bytes)
2281
2282 def close(self):
2283 if self.sslconn:
2284 self.sslconn.close()
2285 else:
2286 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002287
Antoine Pitrou480a1242010-04-28 21:37:09 +00002288 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002289 self.running = True
2290 if not self.server.starttls_server:
2291 if not self.wrap_conn():
2292 return
2293 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002294 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002295 msg = self.read()
2296 stripped = msg.strip()
2297 if not stripped:
2298 # eof, so quit this handler
2299 self.running = False
2300 try:
2301 self.sock = self.sslconn.unwrap()
2302 except OSError:
2303 # Many tests shut the TCP connection down
2304 # without an SSL shutdown. This causes
2305 # unwrap() to raise OSError with errno=0!
2306 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002307 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 self.sslconn = None
2309 self.close()
2310 elif stripped == b'over':
2311 if support.verbose and self.server.connectionchatty:
2312 sys.stdout.write(" server: client closed connection\n")
2313 self.close()
2314 return
2315 elif (self.server.starttls_server and
2316 stripped == b'STARTTLS'):
2317 if support.verbose and self.server.connectionchatty:
2318 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2319 self.write(b"OK\n")
2320 if not self.wrap_conn():
2321 return
2322 elif (self.server.starttls_server and self.sslconn
2323 and stripped == b'ENDTLS'):
2324 if support.verbose and self.server.connectionchatty:
2325 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2326 self.write(b"OK\n")
2327 self.sock = self.sslconn.unwrap()
2328 self.sslconn = None
2329 if support.verbose and self.server.connectionchatty:
2330 sys.stdout.write(" server: connection is now unencrypted...\n")
2331 elif stripped == b'CB tls-unique':
2332 if support.verbose and self.server.connectionchatty:
2333 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2334 data = self.sslconn.get_channel_binding("tls-unique")
2335 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002336 elif stripped == b'PHA':
2337 if support.verbose and self.server.connectionchatty:
2338 sys.stdout.write(" server: initiating post handshake auth\n")
2339 try:
2340 self.sslconn.verify_client_post_handshake()
2341 except ssl.SSLError as e:
2342 self.write(repr(e).encode("us-ascii") + b"\n")
2343 else:
2344 self.write(b"OK\n")
2345 elif stripped == b'HASCERT':
2346 if self.sslconn.getpeercert() is not None:
2347 self.write(b'TRUE\n')
2348 else:
2349 self.write(b'FALSE\n')
2350 elif stripped == b'GETCERT':
2351 cert = self.sslconn.getpeercert()
2352 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002353 else:
2354 if (support.verbose and
2355 self.server.connectionchatty):
2356 ctype = (self.sslconn and "encrypted") or "unencrypted"
2357 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2358 % (msg, ctype, msg.lower(), ctype))
2359 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002360 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002361 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2362 # when connection is not shut down gracefully.
2363 if self.server.chatty and support.verbose:
2364 sys.stdout.write(
2365 " Connection reset by peer: {}\n".format(
2366 self.addr)
2367 )
2368 self.close()
2369 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002370 except ssl.SSLError as err:
2371 # On Windows sometimes test_pha_required_nocert receives the
2372 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2373 # before the 'tlsv13 alert certificate required' exception.
2374 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2375 # is received test_pha_required_nocert fails with ConnectionResetError
2376 # because the underlying socket is closed
2377 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2378 if self.server.chatty and support.verbose:
2379 sys.stdout.write(err.args[1])
2380 # test_pha_required_nocert is expecting this exception
2381 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002382 except OSError:
2383 if self.server.chatty:
2384 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002385 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002386 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002387
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002388 # normally, we'd just stop here, but for the test
2389 # harness, we want to stop the server
2390 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002391
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002392 def __init__(self, certificate=None, ssl_version=None,
2393 certreqs=None, cacerts=None,
2394 chatty=True, connectionchatty=False, starttls_server=False,
2395 npn_protocols=None, alpn_protocols=None,
2396 ciphers=None, context=None):
2397 if context:
2398 self.context = context
2399 else:
2400 self.context = ssl.SSLContext(ssl_version
2401 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002402 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002403 self.context.verify_mode = (certreqs if certreqs is not None
2404 else ssl.CERT_NONE)
2405 if cacerts:
2406 self.context.load_verify_locations(cacerts)
2407 if certificate:
2408 self.context.load_cert_chain(certificate)
2409 if npn_protocols:
2410 self.context.set_npn_protocols(npn_protocols)
2411 if alpn_protocols:
2412 self.context.set_alpn_protocols(alpn_protocols)
2413 if ciphers:
2414 self.context.set_ciphers(ciphers)
2415 self.chatty = chatty
2416 self.connectionchatty = connectionchatty
2417 self.starttls_server = starttls_server
2418 self.sock = socket.socket()
2419 self.port = support.bind_port(self.sock)
2420 self.flag = None
2421 self.active = False
2422 self.selected_npn_protocols = []
2423 self.selected_alpn_protocols = []
2424 self.shared_ciphers = []
2425 self.conn_errors = []
2426 threading.Thread.__init__(self)
2427 self.daemon = True
2428
2429 def __enter__(self):
2430 self.start(threading.Event())
2431 self.flag.wait()
2432 return self
2433
2434 def __exit__(self, *args):
2435 self.stop()
2436 self.join()
2437
2438 def start(self, flag=None):
2439 self.flag = flag
2440 threading.Thread.start(self)
2441
2442 def run(self):
2443 self.sock.settimeout(0.05)
2444 self.sock.listen()
2445 self.active = True
2446 if self.flag:
2447 # signal an event
2448 self.flag.set()
2449 while self.active:
2450 try:
2451 newconn, connaddr = self.sock.accept()
2452 if support.verbose and self.chatty:
2453 sys.stdout.write(' server: new connection from '
2454 + repr(connaddr) + '\n')
2455 handler = self.ConnectionHandler(self, newconn, connaddr)
2456 handler.start()
2457 handler.join()
2458 except socket.timeout:
2459 pass
2460 except KeyboardInterrupt:
2461 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002462 except BaseException as e:
2463 if support.verbose and self.chatty:
2464 sys.stdout.write(
2465 ' connection handling failed: ' + repr(e) + '\n')
2466
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002467 self.sock.close()
2468
2469 def stop(self):
2470 self.active = False
2471
2472class AsyncoreEchoServer(threading.Thread):
2473
2474 # this one's based on asyncore.dispatcher
2475
2476 class EchoServer (asyncore.dispatcher):
2477
2478 class ConnectionHandler(asyncore.dispatcher_with_send):
2479
2480 def __init__(self, conn, certfile):
2481 self.socket = test_wrap_socket(conn, server_side=True,
2482 certfile=certfile,
2483 do_handshake_on_connect=False)
2484 asyncore.dispatcher_with_send.__init__(self, self.socket)
2485 self._ssl_accepting = True
2486 self._do_ssl_handshake()
2487
2488 def readable(self):
2489 if isinstance(self.socket, ssl.SSLSocket):
2490 while self.socket.pending() > 0:
2491 self.handle_read_event()
2492 return True
2493
2494 def _do_ssl_handshake(self):
2495 try:
2496 self.socket.do_handshake()
2497 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2498 return
2499 except ssl.SSLEOFError:
2500 return self.handle_close()
2501 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002502 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002503 except OSError as err:
2504 if err.args[0] == errno.ECONNABORTED:
2505 return self.handle_close()
2506 else:
2507 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002508
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002509 def handle_read(self):
2510 if self._ssl_accepting:
2511 self._do_ssl_handshake()
2512 else:
2513 data = self.recv(1024)
2514 if support.verbose:
2515 sys.stdout.write(" server: read %s from client\n" % repr(data))
2516 if not data:
2517 self.close()
2518 else:
2519 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002520
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 def handle_close(self):
2522 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002523 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002525
2526 def handle_error(self):
2527 raise
2528
Trent Nelson78520002008-04-10 20:54:35 +00002529 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002530 self.certfile = certfile
2531 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2532 self.port = support.bind_port(sock, '')
2533 asyncore.dispatcher.__init__(self, sock)
2534 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002535
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002537 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002538 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2539 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002540
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 def handle_error(self):
2542 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002543
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544 def __init__(self, certfile):
2545 self.flag = None
2546 self.active = False
2547 self.server = self.EchoServer(certfile)
2548 self.port = self.server.port
2549 threading.Thread.__init__(self)
2550 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 def __str__(self):
2553 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002554
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002555 def __enter__(self):
2556 self.start(threading.Event())
2557 self.flag.wait()
2558 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002561 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002562 sys.stdout.write(" cleanup: stopping server.\n")
2563 self.stop()
2564 if support.verbose:
2565 sys.stdout.write(" cleanup: joining server thread.\n")
2566 self.join()
2567 if support.verbose:
2568 sys.stdout.write(" cleanup: successfully joined.\n")
2569 # make sure that ConnectionHandler is removed from socket_map
2570 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002571
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002572 def start (self, flag=None):
2573 self.flag = flag
2574 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002575
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002576 def run(self):
2577 self.active = True
2578 if self.flag:
2579 self.flag.set()
2580 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002581 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002582 asyncore.loop(1)
2583 except:
2584 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002585
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 def stop(self):
2587 self.active = False
2588 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002589
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590def server_params_test(client_context, server_context, indata=b"FOO\n",
2591 chatty=True, connectionchatty=False, sni_name=None,
2592 session=None):
2593 """
2594 Launch a server, connect a client to it and try various reads
2595 and writes.
2596 """
2597 stats = {}
2598 server = ThreadedEchoServer(context=server_context,
2599 chatty=chatty,
2600 connectionchatty=False)
2601 with server:
2602 with client_context.wrap_socket(socket.socket(),
2603 server_hostname=sni_name, session=session) as s:
2604 s.connect((HOST, server.port))
2605 for arg in [indata, bytearray(indata), memoryview(indata)]:
2606 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002607 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002608 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002609 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002610 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002611 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 if connectionchatty:
2613 if support.verbose:
2614 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002615 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002616 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002617 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2618 % (outdata[:20], len(outdata),
2619 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 s.write(b"over\n")
2621 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002622 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002623 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002624 stats.update({
2625 'compression': s.compression(),
2626 'cipher': s.cipher(),
2627 'peercert': s.getpeercert(),
2628 'client_alpn_protocol': s.selected_alpn_protocol(),
2629 'client_npn_protocol': s.selected_npn_protocol(),
2630 'version': s.version(),
2631 'session_reused': s.session_reused,
2632 'session': s.session,
2633 })
2634 s.close()
2635 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2636 stats['server_npn_protocols'] = server.selected_npn_protocols
2637 stats['server_shared_ciphers'] = server.shared_ciphers
2638 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002639
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640def try_protocol_combo(server_protocol, client_protocol, expect_success,
2641 certsreqs=None, server_options=0, client_options=0):
2642 """
2643 Try to SSL-connect using *client_protocol* to *server_protocol*.
2644 If *expect_success* is true, assert that the connection succeeds,
2645 if it's false, assert that the connection fails.
2646 Also, if *expect_success* is a string, assert that it is the protocol
2647 version actually used by the connection.
2648 """
2649 if certsreqs is None:
2650 certsreqs = ssl.CERT_NONE
2651 certtype = {
2652 ssl.CERT_NONE: "CERT_NONE",
2653 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2654 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2655 }[certsreqs]
2656 if support.verbose:
2657 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2658 sys.stdout.write(formatstr %
2659 (ssl.get_protocol_name(client_protocol),
2660 ssl.get_protocol_name(server_protocol),
2661 certtype))
2662 client_context = ssl.SSLContext(client_protocol)
2663 client_context.options |= client_options
2664 server_context = ssl.SSLContext(server_protocol)
2665 server_context.options |= server_options
2666
Victor Stinner3ef63442019-02-19 18:06:03 +01002667 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2668 if (min_version is not None
2669 # SSLContext.minimum_version is only available on recent OpenSSL
2670 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2671 and hasattr(server_context, 'minimum_version')
2672 and server_protocol == ssl.PROTOCOL_TLS
2673 and server_context.minimum_version > min_version):
2674 # If OpenSSL configuration is strict and requires more recent TLS
2675 # version, we have to change the minimum to test old TLS versions.
2676 server_context.minimum_version = min_version
2677
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002678 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2679 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2680 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002681 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002682 client_context.set_ciphers("ALL")
2683
2684 for ctx in (client_context, server_context):
2685 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002686 ctx.load_cert_chain(SIGNED_CERTFILE)
2687 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002688 try:
2689 stats = server_params_test(client_context, server_context,
2690 chatty=False, connectionchatty=False)
2691 # Protocol mismatch can result in either an SSLError, or a
2692 # "Connection reset by peer" error.
2693 except ssl.SSLError:
2694 if expect_success:
2695 raise
2696 except OSError as e:
2697 if expect_success or e.errno != errno.ECONNRESET:
2698 raise
2699 else:
2700 if not expect_success:
2701 raise AssertionError(
2702 "Client protocol %s succeeded with server protocol %s!"
2703 % (ssl.get_protocol_name(client_protocol),
2704 ssl.get_protocol_name(server_protocol)))
2705 elif (expect_success is not True
2706 and expect_success != stats['version']):
2707 raise AssertionError("version mismatch: expected %r, got %r"
2708 % (expect_success, stats['version']))
2709
2710
2711class ThreadedTests(unittest.TestCase):
2712
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 def test_echo(self):
2714 """Basic test of an SSL client connecting to a server"""
2715 if support.verbose:
2716 sys.stdout.write("\n")
2717 for protocol in PROTOCOLS:
2718 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2719 continue
2720 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2721 context = ssl.SSLContext(protocol)
2722 context.load_cert_chain(CERTFILE)
2723 server_params_test(context, context,
2724 chatty=True, connectionchatty=True)
2725
Christian Heimesa170fa12017-09-15 20:27:30 +02002726 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002727
2728 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2729 server_params_test(client_context=client_context,
2730 server_context=server_context,
2731 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002732 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002733
2734 client_context.check_hostname = False
2735 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2736 with self.assertRaises(ssl.SSLError) as e:
2737 server_params_test(client_context=server_context,
2738 server_context=client_context,
2739 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002740 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002741 self.assertIn('called a function you should not call',
2742 str(e.exception))
2743
2744 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2745 with self.assertRaises(ssl.SSLError) as e:
2746 server_params_test(client_context=server_context,
2747 server_context=server_context,
2748 chatty=True, connectionchatty=True)
2749 self.assertIn('called a function you should not call',
2750 str(e.exception))
2751
2752 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2753 with self.assertRaises(ssl.SSLError) as e:
2754 server_params_test(client_context=server_context,
2755 server_context=client_context,
2756 chatty=True, connectionchatty=True)
2757 self.assertIn('called a function you should not call',
2758 str(e.exception))
2759
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002760 def test_getpeercert(self):
2761 if support.verbose:
2762 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002763
2764 client_context, server_context, hostname = testing_context()
2765 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002766 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002767 with client_context.wrap_socket(socket.socket(),
2768 do_handshake_on_connect=False,
2769 server_hostname=hostname) as s:
2770 s.connect((HOST, server.port))
2771 # getpeercert() raise ValueError while the handshake isn't
2772 # done.
2773 with self.assertRaises(ValueError):
2774 s.getpeercert()
2775 s.do_handshake()
2776 cert = s.getpeercert()
2777 self.assertTrue(cert, "Can't get peer certificate.")
2778 cipher = s.cipher()
2779 if support.verbose:
2780 sys.stdout.write(pprint.pformat(cert) + '\n')
2781 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2782 if 'subject' not in cert:
2783 self.fail("No subject field in certificate: %s." %
2784 pprint.pformat(cert))
2785 if ((('organizationName', 'Python Software Foundation'),)
2786 not in cert['subject']):
2787 self.fail(
2788 "Missing or invalid 'organizationName' field in certificate subject; "
2789 "should be 'Python Software Foundation'.")
2790 self.assertIn('notBefore', cert)
2791 self.assertIn('notAfter', cert)
2792 before = ssl.cert_time_to_seconds(cert['notBefore'])
2793 after = ssl.cert_time_to_seconds(cert['notAfter'])
2794 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002795
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002796 @unittest.skipUnless(have_verify_flags(),
2797 "verify_flags need OpenSSL > 0.9.8")
2798 def test_crl_check(self):
2799 if support.verbose:
2800 sys.stdout.write("\n")
2801
Christian Heimesa170fa12017-09-15 20:27:30 +02002802 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002805 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002806
2807 # VERIFY_DEFAULT should pass
2808 server = ThreadedEchoServer(context=server_context, chatty=True)
2809 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002810 with client_context.wrap_socket(socket.socket(),
2811 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002812 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002813 cert = s.getpeercert()
2814 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002816 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002817 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002818
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002819 server = ThreadedEchoServer(context=server_context, chatty=True)
2820 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002821 with client_context.wrap_socket(socket.socket(),
2822 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823 with self.assertRaisesRegex(ssl.SSLError,
2824 "certificate verify failed"):
2825 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002826
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002828 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002829
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002830 server = ThreadedEchoServer(context=server_context, chatty=True)
2831 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002832 with client_context.wrap_socket(socket.socket(),
2833 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002834 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 cert = s.getpeercert()
2836 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002837
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002838 def test_check_hostname(self):
2839 if support.verbose:
2840 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002841
Christian Heimesa170fa12017-09-15 20:27:30 +02002842 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002843
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844 # correct hostname should verify
2845 server = ThreadedEchoServer(context=server_context, chatty=True)
2846 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002847 with client_context.wrap_socket(socket.socket(),
2848 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002849 s.connect((HOST, server.port))
2850 cert = s.getpeercert()
2851 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002852
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 # incorrect hostname should raise an exception
2854 server = ThreadedEchoServer(context=server_context, chatty=True)
2855 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002856 with client_context.wrap_socket(socket.socket(),
2857 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002858 with self.assertRaisesRegex(
2859 ssl.CertificateError,
2860 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002861 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002862
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002863 # missing server_hostname arg should cause an exception, too
2864 server = ThreadedEchoServer(context=server_context, chatty=True)
2865 with server:
2866 with socket.socket() as s:
2867 with self.assertRaisesRegex(ValueError,
2868 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002869 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002870
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002871 def test_ecc_cert(self):
2872 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2873 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002874 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002875 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2876
2877 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2878 # load ECC cert
2879 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2880
2881 # correct hostname should verify
2882 server = ThreadedEchoServer(context=server_context, chatty=True)
2883 with server:
2884 with client_context.wrap_socket(socket.socket(),
2885 server_hostname=hostname) as s:
2886 s.connect((HOST, server.port))
2887 cert = s.getpeercert()
2888 self.assertTrue(cert, "Can't get peer certificate.")
2889 cipher = s.cipher()[0].split('-')
2890 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2891
2892 def test_dual_rsa_ecc(self):
2893 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2894 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002895 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2896 # algorithms.
2897 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002898 # only ECDSA certs
2899 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2900 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2901
2902 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2903 # load ECC and RSA key/cert pairs
2904 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2905 server_context.load_cert_chain(SIGNED_CERTFILE)
2906
2907 # correct hostname should verify
2908 server = ThreadedEchoServer(context=server_context, chatty=True)
2909 with server:
2910 with client_context.wrap_socket(socket.socket(),
2911 server_hostname=hostname) as s:
2912 s.connect((HOST, server.port))
2913 cert = s.getpeercert()
2914 self.assertTrue(cert, "Can't get peer certificate.")
2915 cipher = s.cipher()[0].split('-')
2916 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2917
Christian Heimes66e57422018-01-29 14:25:13 +01002918 def test_check_hostname_idn(self):
2919 if support.verbose:
2920 sys.stdout.write("\n")
2921
Christian Heimes11a14932018-02-24 02:35:08 +01002922 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002923 server_context.load_cert_chain(IDNSANSFILE)
2924
Christian Heimes11a14932018-02-24 02:35:08 +01002925 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002926 context.verify_mode = ssl.CERT_REQUIRED
2927 context.check_hostname = True
2928 context.load_verify_locations(SIGNING_CA)
2929
2930 # correct hostname should verify, when specified in several
2931 # different ways
2932 idn_hostnames = [
2933 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002934 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002935 ('xn--knig-5qa.idn.pythontest.net',
2936 'xn--knig-5qa.idn.pythontest.net'),
2937 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002938 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002939
2940 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002941 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002942 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2943 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2944 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002945 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2946
2947 # ('königsgäßchen.idna2008.pythontest.net',
2948 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2949 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2950 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2951 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2952 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2953
Christian Heimes66e57422018-01-29 14:25:13 +01002954 ]
2955 for server_hostname, expected_hostname in idn_hostnames:
2956 server = ThreadedEchoServer(context=server_context, chatty=True)
2957 with server:
2958 with context.wrap_socket(socket.socket(),
2959 server_hostname=server_hostname) as s:
2960 self.assertEqual(s.server_hostname, expected_hostname)
2961 s.connect((HOST, server.port))
2962 cert = s.getpeercert()
2963 self.assertEqual(s.server_hostname, expected_hostname)
2964 self.assertTrue(cert, "Can't get peer certificate.")
2965
Christian Heimes66e57422018-01-29 14:25:13 +01002966 # incorrect hostname should raise an exception
2967 server = ThreadedEchoServer(context=server_context, chatty=True)
2968 with server:
2969 with context.wrap_socket(socket.socket(),
2970 server_hostname="python.example.org") as s:
2971 with self.assertRaises(ssl.CertificateError):
2972 s.connect((HOST, server.port))
2973
Christian Heimes529525f2018-05-23 22:24:45 +02002974 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975 """Connecting when the server rejects the client's certificate
2976
2977 Launch a server with CERT_REQUIRED, and check that trying to
2978 connect to it with a wrong client certificate fails.
2979 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002980 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002981 # load client cert that is not signed by trusted CA
2982 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002983 # require TLS client authentication
2984 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002985 # TLS 1.3 has different handshake
2986 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002987
2988 server = ThreadedEchoServer(
2989 context=server_context, chatty=True, connectionchatty=True,
2990 )
2991
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002992 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002993 client_context.wrap_socket(socket.socket(),
2994 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002995 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002996 # Expect either an SSL error about the server rejecting
2997 # the connection, or a low-level connection reset (which
2998 # sometimes happens on Windows)
2999 s.connect((HOST, server.port))
3000 except ssl.SSLError as e:
3001 if support.verbose:
3002 sys.stdout.write("\nSSLError is %r\n" % e)
3003 except OSError as e:
3004 if e.errno != errno.ECONNRESET:
3005 raise
3006 if support.verbose:
3007 sys.stdout.write("\nsocket.error is %r\n" % e)
3008 else:
3009 self.fail("Use of invalid cert should have failed!")
3010
Christian Heimes529525f2018-05-23 22:24:45 +02003011 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
3012 def test_wrong_cert_tls13(self):
3013 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003014 # load client cert that is not signed by trusted CA
3015 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003016 server_context.verify_mode = ssl.CERT_REQUIRED
3017 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3018 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3019
3020 server = ThreadedEchoServer(
3021 context=server_context, chatty=True, connectionchatty=True,
3022 )
3023 with server, \
3024 client_context.wrap_socket(socket.socket(),
3025 server_hostname=hostname) as s:
3026 # TLS 1.3 perform client cert exchange after handshake
3027 s.connect((HOST, server.port))
3028 try:
3029 s.write(b'data')
3030 s.read(4)
3031 except ssl.SSLError as e:
3032 if support.verbose:
3033 sys.stdout.write("\nSSLError is %r\n" % e)
3034 except OSError as e:
3035 if e.errno != errno.ECONNRESET:
3036 raise
3037 if support.verbose:
3038 sys.stdout.write("\nsocket.error is %r\n" % e)
3039 else:
3040 self.fail("Use of invalid cert should have failed!")
3041
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 def test_rude_shutdown(self):
3043 """A brutal shutdown of an SSL server should raise an OSError
3044 in the client when attempting handshake.
3045 """
3046 listener_ready = threading.Event()
3047 listener_gone = threading.Event()
3048
3049 s = socket.socket()
3050 port = support.bind_port(s, HOST)
3051
3052 # `listener` runs in a thread. It sits in an accept() until
3053 # the main thread connects. Then it rudely closes the socket,
3054 # and sets Event `listener_gone` to let the main thread know
3055 # the socket is gone.
3056 def listener():
3057 s.listen()
3058 listener_ready.set()
3059 newsock, addr = s.accept()
3060 newsock.close()
3061 s.close()
3062 listener_gone.set()
3063
3064 def connector():
3065 listener_ready.wait()
3066 with socket.socket() as c:
3067 c.connect((HOST, port))
3068 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003069 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 ssl_sock = test_wrap_socket(c)
3071 except OSError:
3072 pass
3073 else:
3074 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003075
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003076 t = threading.Thread(target=listener)
3077 t.start()
3078 try:
3079 connector()
3080 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003081 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003082
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003083 def test_ssl_cert_verify_error(self):
3084 if support.verbose:
3085 sys.stdout.write("\n")
3086
3087 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3088 server_context.load_cert_chain(SIGNED_CERTFILE)
3089
3090 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3091
3092 server = ThreadedEchoServer(context=server_context, chatty=True)
3093 with server:
3094 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003095 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003096 try:
3097 s.connect((HOST, server.port))
3098 except ssl.SSLError as e:
3099 msg = 'unable to get local issuer certificate'
3100 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3101 self.assertEqual(e.verify_code, 20)
3102 self.assertEqual(e.verify_message, msg)
3103 self.assertIn(msg, repr(e))
3104 self.assertIn('certificate verify failed', repr(e))
3105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003106 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3107 "OpenSSL is compiled without SSLv2 support")
3108 def test_protocol_sslv2(self):
3109 """Connecting to an SSLv2 server with various client options"""
3110 if support.verbose:
3111 sys.stdout.write("\n")
3112 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3113 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3114 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003115 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003116 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3117 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3118 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3119 # SSLv23 client with specific SSL options
3120 if no_sslv2_implies_sslv3_hello():
3121 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003122 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003124 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003125 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003126 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003127 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003128
Christian Heimesa170fa12017-09-15 20:27:30 +02003129 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003130 """Connecting to an SSLv23 server with various client options"""
3131 if support.verbose:
3132 sys.stdout.write("\n")
3133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003134 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003135 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003136 except OSError as x:
3137 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3138 if support.verbose:
3139 sys.stdout.write(
3140 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3141 % str(x))
3142 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003143 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3144 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3145 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003146
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003147 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003148 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3149 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3150 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151
3152 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003153 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3154 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3155 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003156
3157 # Server with specific SSL options
3158 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003159 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003160 server_options=ssl.OP_NO_SSLv3)
3161 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003162 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003164 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003165 server_options=ssl.OP_NO_TLSv1)
3166
3167
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003168 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3169 "OpenSSL is compiled without SSLv3 support")
3170 def test_protocol_sslv3(self):
3171 """Connecting to an SSLv3 server with various client options"""
3172 if support.verbose:
3173 sys.stdout.write("\n")
3174 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3175 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3176 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3177 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3178 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003179 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003180 client_options=ssl.OP_NO_SSLv3)
3181 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3182 if no_sslv2_implies_sslv3_hello():
3183 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003184 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003185 False, client_options=ssl.OP_NO_SSLv2)
3186
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003187 def test_protocol_tlsv1(self):
3188 """Connecting to a TLSv1 server with various client options"""
3189 if support.verbose:
3190 sys.stdout.write("\n")
3191 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3192 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3193 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3194 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3195 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3196 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3197 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003198 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 client_options=ssl.OP_NO_TLSv1)
3200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003201 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3202 "TLS version 1.1 not supported.")
3203 def test_protocol_tlsv1_1(self):
3204 """Connecting to a TLSv1.1 server with various client options.
3205 Testing against older TLS versions."""
3206 if support.verbose:
3207 sys.stdout.write("\n")
3208 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3209 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3210 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3211 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3212 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003213 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003214 client_options=ssl.OP_NO_TLSv1_1)
3215
Christian Heimesa170fa12017-09-15 20:27:30 +02003216 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3218 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3219
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003220 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3221 "TLS version 1.2 not supported.")
3222 def test_protocol_tlsv1_2(self):
3223 """Connecting to a TLSv1.2 server with various client options.
3224 Testing against older TLS versions."""
3225 if support.verbose:
3226 sys.stdout.write("\n")
3227 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3228 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3229 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3230 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3231 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3232 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3233 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003234 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003235 client_options=ssl.OP_NO_TLSv1_2)
3236
Christian Heimesa170fa12017-09-15 20:27:30 +02003237 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003238 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3239 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3240 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3241 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3242
3243 def test_starttls(self):
3244 """Switching from clear text to encrypted and back again."""
3245 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3246
3247 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003248 starttls_server=True,
3249 chatty=True,
3250 connectionchatty=True)
3251 wrapped = False
3252 with server:
3253 s = socket.socket()
3254 s.setblocking(1)
3255 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003256 if support.verbose:
3257 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003258 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003259 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003260 sys.stdout.write(
3261 " client: sending %r...\n" % indata)
3262 if wrapped:
3263 conn.write(indata)
3264 outdata = conn.read()
3265 else:
3266 s.send(indata)
3267 outdata = s.recv(1024)
3268 msg = outdata.strip().lower()
3269 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3270 # STARTTLS ok, switch to secure mode
3271 if support.verbose:
3272 sys.stdout.write(
3273 " client: read %r from server, starting TLS...\n"
3274 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003275 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003276 wrapped = True
3277 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3278 # ENDTLS ok, switch back to clear text
3279 if support.verbose:
3280 sys.stdout.write(
3281 " client: read %r from server, ending TLS...\n"
3282 % msg)
3283 s = conn.unwrap()
3284 wrapped = False
3285 else:
3286 if support.verbose:
3287 sys.stdout.write(
3288 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003289 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003290 sys.stdout.write(" client: closing connection.\n")
3291 if wrapped:
3292 conn.write(b"over\n")
3293 else:
3294 s.send(b"over\n")
3295 if wrapped:
3296 conn.close()
3297 else:
3298 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003299
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 def test_socketserver(self):
3301 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003302 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003303 # try to connect
3304 if support.verbose:
3305 sys.stdout.write('\n')
3306 with open(CERTFILE, 'rb') as f:
3307 d1 = f.read()
3308 d2 = ''
3309 # now fetch the same data from the HTTPS server
3310 url = 'https://localhost:%d/%s' % (
3311 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003312 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003313 f = urllib.request.urlopen(url, context=context)
3314 try:
3315 dlen = f.info().get("content-length")
3316 if dlen and (int(dlen) > 0):
3317 d2 = f.read(int(dlen))
3318 if support.verbose:
3319 sys.stdout.write(
3320 " client: read %d bytes from remote server '%s'\n"
3321 % (len(d2), server))
3322 finally:
3323 f.close()
3324 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003325
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003326 def test_asyncore_server(self):
3327 """Check the example asyncore integration."""
3328 if support.verbose:
3329 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003330
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003331 indata = b"FOO\n"
3332 server = AsyncoreEchoServer(CERTFILE)
3333 with server:
3334 s = test_wrap_socket(socket.socket())
3335 s.connect(('127.0.0.1', server.port))
3336 if support.verbose:
3337 sys.stdout.write(
3338 " client: sending %r...\n" % indata)
3339 s.write(indata)
3340 outdata = s.read()
3341 if support.verbose:
3342 sys.stdout.write(" client: read %r\n" % outdata)
3343 if outdata != indata.lower():
3344 self.fail(
3345 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3346 % (outdata[:20], len(outdata),
3347 indata[:20].lower(), len(indata)))
3348 s.write(b"over\n")
3349 if support.verbose:
3350 sys.stdout.write(" client: closing connection.\n")
3351 s.close()
3352 if support.verbose:
3353 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003354
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003355 def test_recv_send(self):
3356 """Test recv(), send() and friends."""
3357 if support.verbose:
3358 sys.stdout.write("\n")
3359
3360 server = ThreadedEchoServer(CERTFILE,
3361 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003362 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003363 cacerts=CERTFILE,
3364 chatty=True,
3365 connectionchatty=False)
3366 with server:
3367 s = test_wrap_socket(socket.socket(),
3368 server_side=False,
3369 certfile=CERTFILE,
3370 ca_certs=CERTFILE,
3371 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003372 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003373 s.connect((HOST, server.port))
3374 # helper methods for standardising recv* method signatures
3375 def _recv_into():
3376 b = bytearray(b"\0"*100)
3377 count = s.recv_into(b)
3378 return b[:count]
3379
3380 def _recvfrom_into():
3381 b = bytearray(b"\0"*100)
3382 count, addr = s.recvfrom_into(b)
3383 return b[:count]
3384
3385 # (name, method, expect success?, *args, return value func)
3386 send_methods = [
3387 ('send', s.send, True, [], len),
3388 ('sendto', s.sendto, False, ["some.address"], len),
3389 ('sendall', s.sendall, True, [], lambda x: None),
3390 ]
3391 # (name, method, whether to expect success, *args)
3392 recv_methods = [
3393 ('recv', s.recv, True, []),
3394 ('recvfrom', s.recvfrom, False, ["some.address"]),
3395 ('recv_into', _recv_into, True, []),
3396 ('recvfrom_into', _recvfrom_into, False, []),
3397 ]
3398 data_prefix = "PREFIX_"
3399
3400 for (meth_name, send_meth, expect_success, args,
3401 ret_val_meth) in send_methods:
3402 indata = (data_prefix + meth_name).encode('ascii')
3403 try:
3404 ret = send_meth(indata, *args)
3405 msg = "sending with {}".format(meth_name)
3406 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3407 outdata = s.read()
3408 if outdata != indata.lower():
3409 self.fail(
3410 "While sending with <<{name:s}>> bad data "
3411 "<<{outdata:r}>> ({nout:d}) received; "
3412 "expected <<{indata:r}>> ({nin:d})\n".format(
3413 name=meth_name, outdata=outdata[:20],
3414 nout=len(outdata),
3415 indata=indata[:20], nin=len(indata)
3416 )
3417 )
3418 except ValueError as e:
3419 if expect_success:
3420 self.fail(
3421 "Failed to send with method <<{name:s}>>; "
3422 "expected to succeed.\n".format(name=meth_name)
3423 )
3424 if not str(e).startswith(meth_name):
3425 self.fail(
3426 "Method <<{name:s}>> failed with unexpected "
3427 "exception message: {exp:s}\n".format(
3428 name=meth_name, exp=e
3429 )
3430 )
3431
3432 for meth_name, recv_meth, expect_success, args in recv_methods:
3433 indata = (data_prefix + meth_name).encode('ascii')
3434 try:
3435 s.send(indata)
3436 outdata = recv_meth(*args)
3437 if outdata != indata.lower():
3438 self.fail(
3439 "While receiving with <<{name:s}>> bad data "
3440 "<<{outdata:r}>> ({nout:d}) received; "
3441 "expected <<{indata:r}>> ({nin:d})\n".format(
3442 name=meth_name, outdata=outdata[:20],
3443 nout=len(outdata),
3444 indata=indata[:20], nin=len(indata)
3445 )
3446 )
3447 except ValueError as e:
3448 if expect_success:
3449 self.fail(
3450 "Failed to receive with method <<{name:s}>>; "
3451 "expected to succeed.\n".format(name=meth_name)
3452 )
3453 if not str(e).startswith(meth_name):
3454 self.fail(
3455 "Method <<{name:s}>> failed with unexpected "
3456 "exception message: {exp:s}\n".format(
3457 name=meth_name, exp=e
3458 )
3459 )
3460 # consume data
3461 s.read()
3462
3463 # read(-1, buffer) is supported, even though read(-1) is not
3464 data = b"data"
3465 s.send(data)
3466 buffer = bytearray(len(data))
3467 self.assertEqual(s.read(-1, buffer), len(data))
3468 self.assertEqual(buffer, data)
3469
Christian Heimes888bbdc2017-09-07 14:18:21 -07003470 # sendall accepts bytes-like objects
3471 if ctypes is not None:
3472 ubyte = ctypes.c_ubyte * len(data)
3473 byteslike = ubyte.from_buffer_copy(data)
3474 s.sendall(byteslike)
3475 self.assertEqual(s.read(), data)
3476
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003477 # Make sure sendmsg et al are disallowed to avoid
3478 # inadvertent disclosure of data and/or corruption
3479 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003480 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003481 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3482 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3483 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003484 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003485 s.write(b"over\n")
3486
3487 self.assertRaises(ValueError, s.recv, -1)
3488 self.assertRaises(ValueError, s.read, -1)
3489
3490 s.close()
3491
3492 def test_recv_zero(self):
3493 server = ThreadedEchoServer(CERTFILE)
3494 server.__enter__()
3495 self.addCleanup(server.__exit__, None, None)
3496 s = socket.create_connection((HOST, server.port))
3497 self.addCleanup(s.close)
3498 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3499 self.addCleanup(s.close)
3500
3501 # recv/read(0) should return no data
3502 s.send(b"data")
3503 self.assertEqual(s.recv(0), b"")
3504 self.assertEqual(s.read(0), b"")
3505 self.assertEqual(s.read(), b"data")
3506
3507 # Should not block if the other end sends no data
3508 s.setblocking(False)
3509 self.assertEqual(s.recv(0), b"")
3510 self.assertEqual(s.recv_into(bytearray()), 0)
3511
3512 def test_nonblocking_send(self):
3513 server = ThreadedEchoServer(CERTFILE,
3514 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003515 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003516 cacerts=CERTFILE,
3517 chatty=True,
3518 connectionchatty=False)
3519 with server:
3520 s = test_wrap_socket(socket.socket(),
3521 server_side=False,
3522 certfile=CERTFILE,
3523 ca_certs=CERTFILE,
3524 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003525 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003526 s.connect((HOST, server.port))
3527 s.setblocking(False)
3528
3529 # If we keep sending data, at some point the buffers
3530 # will be full and the call will block
3531 buf = bytearray(8192)
3532 def fill_buffer():
3533 while True:
3534 s.send(buf)
3535 self.assertRaises((ssl.SSLWantWriteError,
3536 ssl.SSLWantReadError), fill_buffer)
3537
3538 # Now read all the output and discard it
3539 s.setblocking(True)
3540 s.close()
3541
3542 def test_handshake_timeout(self):
3543 # Issue #5103: SSL handshake must respect the socket timeout
3544 server = socket.socket(socket.AF_INET)
3545 host = "127.0.0.1"
3546 port = support.bind_port(server)
3547 started = threading.Event()
3548 finish = False
3549
3550 def serve():
3551 server.listen()
3552 started.set()
3553 conns = []
3554 while not finish:
3555 r, w, e = select.select([server], [], [], 0.1)
3556 if server in r:
3557 # Let the socket hang around rather than having
3558 # it closed by garbage collection.
3559 conns.append(server.accept()[0])
3560 for sock in conns:
3561 sock.close()
3562
3563 t = threading.Thread(target=serve)
3564 t.start()
3565 started.wait()
3566
3567 try:
3568 try:
3569 c = socket.socket(socket.AF_INET)
3570 c.settimeout(0.2)
3571 c.connect((host, port))
3572 # Will attempt handshake and time out
3573 self.assertRaisesRegex(socket.timeout, "timed out",
3574 test_wrap_socket, c)
3575 finally:
3576 c.close()
3577 try:
3578 c = socket.socket(socket.AF_INET)
3579 c = test_wrap_socket(c)
3580 c.settimeout(0.2)
3581 # Will attempt handshake and time out
3582 self.assertRaisesRegex(socket.timeout, "timed out",
3583 c.connect, (host, port))
3584 finally:
3585 c.close()
3586 finally:
3587 finish = True
3588 t.join()
3589 server.close()
3590
3591 def test_server_accept(self):
3592 # Issue #16357: accept() on a SSLSocket created through
3593 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003594 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003595 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003596 context.load_verify_locations(SIGNING_CA)
3597 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003598 server = socket.socket(socket.AF_INET)
3599 host = "127.0.0.1"
3600 port = support.bind_port(server)
3601 server = context.wrap_socket(server, server_side=True)
3602 self.assertTrue(server.server_side)
3603
3604 evt = threading.Event()
3605 remote = None
3606 peer = None
3607 def serve():
3608 nonlocal remote, peer
3609 server.listen()
3610 # Block on the accept and wait on the connection to close.
3611 evt.set()
3612 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003613 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003614
3615 t = threading.Thread(target=serve)
3616 t.start()
3617 # Client wait until server setup and perform a connect.
3618 evt.wait()
3619 client = context.wrap_socket(socket.socket())
3620 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003621 client.send(b'data')
3622 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003623 client_addr = client.getsockname()
3624 client.close()
3625 t.join()
3626 remote.close()
3627 server.close()
3628 # Sanity checks.
3629 self.assertIsInstance(remote, ssl.SSLSocket)
3630 self.assertEqual(peer, client_addr)
3631
3632 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003633 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003634 with context.wrap_socket(socket.socket()) as sock:
3635 with self.assertRaises(OSError) as cm:
3636 sock.getpeercert()
3637 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3638
3639 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003640 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003641 with context.wrap_socket(socket.socket()) as sock:
3642 with self.assertRaises(OSError) as cm:
3643 sock.do_handshake()
3644 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3645
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003646 def test_no_shared_ciphers(self):
3647 client_context, server_context, hostname = testing_context()
3648 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3649 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003650 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003651 client_context.set_ciphers("AES128")
3652 server_context.set_ciphers("AES256")
3653 with ThreadedEchoServer(context=server_context) as server:
3654 with client_context.wrap_socket(socket.socket(),
3655 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003656 with self.assertRaises(OSError):
3657 s.connect((HOST, server.port))
3658 self.assertIn("no shared cipher", server.conn_errors[0])
3659
3660 def test_version_basic(self):
3661 """
3662 Basic tests for SSLSocket.version().
3663 More tests are done in the test_protocol_*() methods.
3664 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003665 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3666 context.check_hostname = False
3667 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003668 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003669 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003670 chatty=False) as server:
3671 with context.wrap_socket(socket.socket()) as s:
3672 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003673 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003675 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003676 self.assertEqual(s.version(), 'TLSv1.3')
3677 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003678 self.assertEqual(s.version(), 'TLSv1.2')
3679 else: # 0.9.8 to 1.0.1
3680 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003681 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003682 self.assertIs(s.version(), None)
3683
Christian Heimescb5b68a2017-09-07 18:07:00 -07003684 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3685 "test requires TLSv1.3 enabled OpenSSL")
3686 def test_tls1_3(self):
3687 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3688 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003689 context.options |= (
3690 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3691 )
3692 with ThreadedEchoServer(context=context) as server:
3693 with context.wrap_socket(socket.socket()) as s:
3694 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003695 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003696 'TLS_AES_256_GCM_SHA384',
3697 'TLS_CHACHA20_POLY1305_SHA256',
3698 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003699 })
3700 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003701
Christian Heimes698dde12018-02-27 11:54:43 +01003702 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3703 "required OpenSSL 1.1.0g")
3704 def test_min_max_version(self):
3705 client_context, server_context, hostname = testing_context()
3706 # client TLSv1.0 to 1.2
3707 client_context.minimum_version = ssl.TLSVersion.TLSv1
3708 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3709 # server only TLSv1.2
3710 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3711 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3712
3713 with ThreadedEchoServer(context=server_context) as server:
3714 with client_context.wrap_socket(socket.socket(),
3715 server_hostname=hostname) as s:
3716 s.connect((HOST, server.port))
3717 self.assertEqual(s.version(), 'TLSv1.2')
3718
3719 # client 1.0 to 1.2, server 1.0 to 1.1
3720 server_context.minimum_version = ssl.TLSVersion.TLSv1
3721 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3722
3723 with ThreadedEchoServer(context=server_context) as server:
3724 with client_context.wrap_socket(socket.socket(),
3725 server_hostname=hostname) as s:
3726 s.connect((HOST, server.port))
3727 self.assertEqual(s.version(), 'TLSv1.1')
3728
3729 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003730 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)d6ac67f2019-09-11 10:59:13 -07003731 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003732 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003733 client_context.maximum_version = ssl.TLSVersion.TLSv1
3734 with ThreadedEchoServer(context=server_context) as server:
3735 with client_context.wrap_socket(socket.socket(),
3736 server_hostname=hostname) as s:
3737 with self.assertRaises(ssl.SSLError) as e:
3738 s.connect((HOST, server.port))
3739 self.assertIn("alert", str(e.exception))
3740
3741
3742 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3743 "required OpenSSL 1.1.0g")
3744 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3745 def test_min_max_version_sslv3(self):
3746 client_context, server_context, hostname = testing_context()
3747 server_context.minimum_version = ssl.TLSVersion.SSLv3
3748 client_context.minimum_version = ssl.TLSVersion.SSLv3
3749 client_context.maximum_version = ssl.TLSVersion.SSLv3
3750 with ThreadedEchoServer(context=server_context) as server:
3751 with client_context.wrap_socket(socket.socket(),
3752 server_hostname=hostname) as s:
3753 s.connect((HOST, server.port))
3754 self.assertEqual(s.version(), 'SSLv3')
3755
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003756 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3757 def test_default_ecdh_curve(self):
3758 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3759 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003760 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003762 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3763 # cipher name.
3764 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3766 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3767 # our default cipher list should prefer ECDH-based ciphers
3768 # automatically.
3769 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3770 context.set_ciphers("ECCdraft:ECDH")
3771 with ThreadedEchoServer(context=context) as server:
3772 with context.wrap_socket(socket.socket()) as s:
3773 s.connect((HOST, server.port))
3774 self.assertIn("ECDH", s.cipher()[0])
3775
3776 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3777 "'tls-unique' channel binding not available")
3778 def test_tls_unique_channel_binding(self):
3779 """Test tls-unique channel binding."""
3780 if support.verbose:
3781 sys.stdout.write("\n")
3782
Christian Heimes05d9fe32018-02-27 08:55:39 +01003783 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003784
3785 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 chatty=True,
3787 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003788
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003789 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003790 with client_context.wrap_socket(
3791 socket.socket(),
3792 server_hostname=hostname) as s:
3793 s.connect((HOST, server.port))
3794 # get the data
3795 cb_data = s.get_channel_binding("tls-unique")
3796 if support.verbose:
3797 sys.stdout.write(
3798 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003799
Christian Heimes05d9fe32018-02-27 08:55:39 +01003800 # check if it is sane
3801 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003802 if s.version() == 'TLSv1.3':
3803 self.assertEqual(len(cb_data), 48)
3804 else:
3805 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003806
Christian Heimes05d9fe32018-02-27 08:55:39 +01003807 # and compare with the peers version
3808 s.write(b"CB tls-unique\n")
3809 peer_data_repr = s.read().strip()
3810 self.assertEqual(peer_data_repr,
3811 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003812
3813 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003814 with client_context.wrap_socket(
3815 socket.socket(),
3816 server_hostname=hostname) as s:
3817 s.connect((HOST, server.port))
3818 new_cb_data = s.get_channel_binding("tls-unique")
3819 if support.verbose:
3820 sys.stdout.write(
3821 "got another channel binding data: {0!r}\n".format(
3822 new_cb_data)
3823 )
3824 # is it really unique
3825 self.assertNotEqual(cb_data, new_cb_data)
3826 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003827 if s.version() == 'TLSv1.3':
3828 self.assertEqual(len(cb_data), 48)
3829 else:
3830 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003831 s.write(b"CB tls-unique\n")
3832 peer_data_repr = s.read().strip()
3833 self.assertEqual(peer_data_repr,
3834 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003835
3836 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003837 client_context, server_context, hostname = testing_context()
3838 stats = server_params_test(client_context, server_context,
3839 chatty=True, connectionchatty=True,
3840 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003841 if support.verbose:
3842 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3843 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3844
3845 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3846 "ssl.OP_NO_COMPRESSION needed for this test")
3847 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003848 client_context, server_context, hostname = testing_context()
3849 client_context.options |= ssl.OP_NO_COMPRESSION
3850 server_context.options |= ssl.OP_NO_COMPRESSION
3851 stats = server_params_test(client_context, server_context,
3852 chatty=True, connectionchatty=True,
3853 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003854 self.assertIs(stats['compression'], None)
3855
3856 def test_dh_params(self):
3857 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003858 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003859 # test scenario needs TLS <= 1.2
3860 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003861 server_context.load_dh_params(DHFILE)
3862 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003863 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003864 stats = server_params_test(client_context, server_context,
3865 chatty=True, connectionchatty=True,
3866 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003867 cipher = stats["cipher"][0]
3868 parts = cipher.split("-")
3869 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3870 self.fail("Non-DH cipher: " + cipher[0])
3871
Christian Heimesb7b92252018-02-25 09:49:31 +01003872 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003873 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003874 def test_ecdh_curve(self):
3875 # server secp384r1, client auto
3876 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003877
Christian Heimesb7b92252018-02-25 09:49:31 +01003878 server_context.set_ecdh_curve("secp384r1")
3879 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3880 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3881 stats = server_params_test(client_context, server_context,
3882 chatty=True, connectionchatty=True,
3883 sni_name=hostname)
3884
3885 # server auto, client secp384r1
3886 client_context, server_context, hostname = testing_context()
3887 client_context.set_ecdh_curve("secp384r1")
3888 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3889 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3890 stats = server_params_test(client_context, server_context,
3891 chatty=True, connectionchatty=True,
3892 sni_name=hostname)
3893
3894 # server / client curve mismatch
3895 client_context, server_context, hostname = testing_context()
3896 client_context.set_ecdh_curve("prime256v1")
3897 server_context.set_ecdh_curve("secp384r1")
3898 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3899 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3900 try:
3901 stats = server_params_test(client_context, server_context,
3902 chatty=True, connectionchatty=True,
3903 sni_name=hostname)
3904 except ssl.SSLError:
3905 pass
3906 else:
3907 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003908 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003909 self.fail("mismatch curve did not fail")
3910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911 def test_selected_alpn_protocol(self):
3912 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003913 client_context, server_context, hostname = testing_context()
3914 stats = server_params_test(client_context, server_context,
3915 chatty=True, connectionchatty=True,
3916 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003917 self.assertIs(stats['client_alpn_protocol'], None)
3918
3919 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3920 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3921 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003922 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 server_context.set_alpn_protocols(['foo', 'bar'])
3924 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003925 chatty=True, connectionchatty=True,
3926 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 self.assertIs(stats['client_alpn_protocol'], None)
3928
3929 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3930 def test_alpn_protocols(self):
3931 server_protocols = ['foo', 'bar', 'milkshake']
3932 protocol_tests = [
3933 (['foo', 'bar'], 'foo'),
3934 (['bar', 'foo'], 'foo'),
3935 (['milkshake'], 'milkshake'),
3936 (['http/3.0', 'http/4.0'], None)
3937 ]
3938 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003939 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003940 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 client_context.set_alpn_protocols(client_protocols)
3942
3943 try:
3944 stats = server_params_test(client_context,
3945 server_context,
3946 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003947 connectionchatty=True,
3948 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003949 except ssl.SSLError as e:
3950 stats = e
3951
Christian Heimes05d9fe32018-02-27 08:55:39 +01003952 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003953 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3954 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3955 self.assertIsInstance(stats, ssl.SSLError)
3956 else:
3957 msg = "failed trying %s (s) and %s (c).\n" \
3958 "was expecting %s, but got %%s from the %%s" \
3959 % (str(server_protocols), str(client_protocols),
3960 str(expected))
3961 client_result = stats['client_alpn_protocol']
3962 self.assertEqual(client_result, expected,
3963 msg % (client_result, "client"))
3964 server_result = stats['server_alpn_protocols'][-1] \
3965 if len(stats['server_alpn_protocols']) else 'nothing'
3966 self.assertEqual(server_result, expected,
3967 msg % (server_result, "server"))
3968
3969 def test_selected_npn_protocol(self):
3970 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003971 client_context, server_context, hostname = testing_context()
3972 stats = server_params_test(client_context, server_context,
3973 chatty=True, connectionchatty=True,
3974 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003975 self.assertIs(stats['client_npn_protocol'], None)
3976
3977 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3978 def test_npn_protocols(self):
3979 server_protocols = ['http/1.1', 'spdy/2']
3980 protocol_tests = [
3981 (['http/1.1', 'spdy/2'], 'http/1.1'),
3982 (['spdy/2', 'http/1.1'], 'http/1.1'),
3983 (['spdy/2', 'test'], 'spdy/2'),
3984 (['abc', 'def'], 'abc')
3985 ]
3986 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003987 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003988 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003989 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003990 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003991 chatty=True, connectionchatty=True,
3992 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003993 msg = "failed trying %s (s) and %s (c).\n" \
3994 "was expecting %s, but got %%s from the %%s" \
3995 % (str(server_protocols), str(client_protocols),
3996 str(expected))
3997 client_result = stats['client_npn_protocol']
3998 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3999 server_result = stats['server_npn_protocols'][-1] \
4000 if len(stats['server_npn_protocols']) else 'nothing'
4001 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4002
4003 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004004 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004005 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004006 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004007 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004008 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004009 client_context.load_verify_locations(SIGNING_CA)
4010 return server_context, other_context, client_context
4011
4012 def check_common_name(self, stats, name):
4013 cert = stats['peercert']
4014 self.assertIn((('commonName', name),), cert['subject'])
4015
4016 @needs_sni
4017 def test_sni_callback(self):
4018 calls = []
4019 server_context, other_context, client_context = self.sni_contexts()
4020
Christian Heimesa170fa12017-09-15 20:27:30 +02004021 client_context.check_hostname = False
4022
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004023 def servername_cb(ssl_sock, server_name, initial_context):
4024 calls.append((server_name, initial_context))
4025 if server_name is not None:
4026 ssl_sock.context = other_context
4027 server_context.set_servername_callback(servername_cb)
4028
4029 stats = server_params_test(client_context, server_context,
4030 chatty=True,
4031 sni_name='supermessage')
4032 # The hostname was fetched properly, and the certificate was
4033 # changed for the connection.
4034 self.assertEqual(calls, [("supermessage", server_context)])
4035 # CERTFILE4 was selected
4036 self.check_common_name(stats, 'fakehostname')
4037
4038 calls = []
4039 # The callback is called with server_name=None
4040 stats = server_params_test(client_context, server_context,
4041 chatty=True,
4042 sni_name=None)
4043 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004044 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004045
4046 # Check disabling the callback
4047 calls = []
4048 server_context.set_servername_callback(None)
4049
4050 stats = server_params_test(client_context, server_context,
4051 chatty=True,
4052 sni_name='notfunny')
4053 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004054 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004055 self.assertEqual(calls, [])
4056
4057 @needs_sni
4058 def test_sni_callback_alert(self):
4059 # Returning a TLS alert is reflected to the connecting client
4060 server_context, other_context, client_context = self.sni_contexts()
4061
4062 def cb_returning_alert(ssl_sock, server_name, initial_context):
4063 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4064 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 with self.assertRaises(ssl.SSLError) as cm:
4066 stats = server_params_test(client_context, server_context,
4067 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004068 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004069 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 @needs_sni
4072 def test_sni_callback_raising(self):
4073 # Raising fails the connection with a TLS handshake failure alert.
4074 server_context, other_context, client_context = self.sni_contexts()
4075
4076 def cb_raising(ssl_sock, server_name, initial_context):
4077 1/0
4078 server_context.set_servername_callback(cb_raising)
4079
Victor Stinner00253502019-06-03 03:51:43 +02004080 with support.catch_unraisable_exception() as catch:
4081 with self.assertRaises(ssl.SSLError) as cm:
4082 stats = server_params_test(client_context, server_context,
4083 chatty=False,
4084 sni_name='supermessage')
4085
4086 self.assertEqual(cm.exception.reason,
4087 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4088 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004090 @needs_sni
4091 def test_sni_callback_wrong_return_type(self):
4092 # Returning the wrong return type terminates the TLS connection
4093 # with an internal error alert.
4094 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4097 return "foo"
4098 server_context.set_servername_callback(cb_wrong_return_type)
4099
Victor Stinner00253502019-06-03 03:51:43 +02004100 with support.catch_unraisable_exception() as catch:
4101 with self.assertRaises(ssl.SSLError) as cm:
4102 stats = server_params_test(client_context, server_context,
4103 chatty=False,
4104 sni_name='supermessage')
4105
4106
4107 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4108 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004110 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004111 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004112 client_context.set_ciphers("AES128:AES256")
4113 server_context.set_ciphers("AES256")
4114 expected_algs = [
4115 "AES256", "AES-256",
4116 # TLS 1.3 ciphers are always enabled
4117 "TLS_CHACHA20", "TLS_AES",
4118 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004119
Christian Heimesa170fa12017-09-15 20:27:30 +02004120 stats = server_params_test(client_context, server_context,
4121 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004122 ciphers = stats['server_shared_ciphers'][0]
4123 self.assertGreater(len(ciphers), 0)
4124 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004125 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004126 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004127
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004128 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004129 client_context, server_context, hostname = testing_context()
4130 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004131
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004132 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004133 s = client_context.wrap_socket(socket.socket(),
4134 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004135 s.connect((HOST, server.port))
4136 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004137
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004138 self.assertRaises(ValueError, s.read, 1024)
4139 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004140
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004141 def test_sendfile(self):
4142 TEST_DATA = b"x" * 512
4143 with open(support.TESTFN, 'wb') as f:
4144 f.write(TEST_DATA)
4145 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004146 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004147 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004148 context.load_verify_locations(SIGNING_CA)
4149 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 server = ThreadedEchoServer(context=context, chatty=False)
4151 with server:
4152 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004153 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004154 with open(support.TESTFN, 'rb') as file:
4155 s.sendfile(file)
4156 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004157
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004158 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004159 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004160 # TODO: sessions aren't compatible with TLSv1.3 yet
4161 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004162
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004163 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004164 stats = server_params_test(client_context, server_context,
4165 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004166 session = stats['session']
4167 self.assertTrue(session.id)
4168 self.assertGreater(session.time, 0)
4169 self.assertGreater(session.timeout, 0)
4170 self.assertTrue(session.has_ticket)
4171 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4172 self.assertGreater(session.ticket_lifetime_hint, 0)
4173 self.assertFalse(stats['session_reused'])
4174 sess_stat = server_context.session_stats()
4175 self.assertEqual(sess_stat['accept'], 1)
4176 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004178 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004179 stats = server_params_test(client_context, server_context,
4180 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004181 sess_stat = server_context.session_stats()
4182 self.assertEqual(sess_stat['accept'], 2)
4183 self.assertEqual(sess_stat['hits'], 1)
4184 self.assertTrue(stats['session_reused'])
4185 session2 = stats['session']
4186 self.assertEqual(session2.id, session.id)
4187 self.assertEqual(session2, session)
4188 self.assertIsNot(session2, session)
4189 self.assertGreaterEqual(session2.time, session.time)
4190 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004191
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004192 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004193 stats = server_params_test(client_context, server_context,
4194 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004195 self.assertFalse(stats['session_reused'])
4196 session3 = stats['session']
4197 self.assertNotEqual(session3.id, session.id)
4198 self.assertNotEqual(session3, session)
4199 sess_stat = server_context.session_stats()
4200 self.assertEqual(sess_stat['accept'], 3)
4201 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004202
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004203 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004204 stats = server_params_test(client_context, server_context,
4205 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004206 self.assertTrue(stats['session_reused'])
4207 session4 = stats['session']
4208 self.assertEqual(session4.id, session.id)
4209 self.assertEqual(session4, session)
4210 self.assertGreaterEqual(session4.time, session.time)
4211 self.assertGreaterEqual(session4.timeout, session.timeout)
4212 sess_stat = server_context.session_stats()
4213 self.assertEqual(sess_stat['accept'], 4)
4214 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004215
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004216 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004217 client_context, server_context, hostname = testing_context()
4218 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004219
Christian Heimes05d9fe32018-02-27 08:55:39 +01004220 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004221 client_context.options |= ssl.OP_NO_TLSv1_3
4222 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004223
Christian Heimesa170fa12017-09-15 20:27:30 +02004224 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004225 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004226 with client_context.wrap_socket(socket.socket(),
4227 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004228 # session is None before handshake
4229 self.assertEqual(s.session, None)
4230 self.assertEqual(s.session_reused, None)
4231 s.connect((HOST, server.port))
4232 session = s.session
4233 self.assertTrue(session)
4234 with self.assertRaises(TypeError) as e:
4235 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004236 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004237
Christian Heimesa170fa12017-09-15 20:27:30 +02004238 with client_context.wrap_socket(socket.socket(),
4239 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004240 s.connect((HOST, server.port))
4241 # cannot set session after handshake
4242 with self.assertRaises(ValueError) as e:
4243 s.session = session
4244 self.assertEqual(str(e.exception),
4245 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004246
Christian Heimesa170fa12017-09-15 20:27:30 +02004247 with client_context.wrap_socket(socket.socket(),
4248 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004249 # can set session before handshake and before the
4250 # connection was established
4251 s.session = session
4252 s.connect((HOST, server.port))
4253 self.assertEqual(s.session.id, session.id)
4254 self.assertEqual(s.session, session)
4255 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004256
Christian Heimesa170fa12017-09-15 20:27:30 +02004257 with client_context2.wrap_socket(socket.socket(),
4258 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004259 # cannot re-use session with a different SSLContext
4260 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004261 s.session = session
4262 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004263 self.assertEqual(str(e.exception),
4264 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004265
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004266
Christian Heimes9fb051f2018-09-23 08:32:31 +02004267@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4268class TestPostHandshakeAuth(unittest.TestCase):
4269 def test_pha_setter(self):
4270 protocols = [
4271 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4272 ]
4273 for protocol in protocols:
4274 ctx = ssl.SSLContext(protocol)
4275 self.assertEqual(ctx.post_handshake_auth, False)
4276
4277 ctx.post_handshake_auth = True
4278 self.assertEqual(ctx.post_handshake_auth, True)
4279
4280 ctx.verify_mode = ssl.CERT_REQUIRED
4281 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4282 self.assertEqual(ctx.post_handshake_auth, True)
4283
4284 ctx.post_handshake_auth = False
4285 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4286 self.assertEqual(ctx.post_handshake_auth, False)
4287
4288 ctx.verify_mode = ssl.CERT_OPTIONAL
4289 ctx.post_handshake_auth = True
4290 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4291 self.assertEqual(ctx.post_handshake_auth, True)
4292
4293 def test_pha_required(self):
4294 client_context, server_context, hostname = testing_context()
4295 server_context.post_handshake_auth = True
4296 server_context.verify_mode = ssl.CERT_REQUIRED
4297 client_context.post_handshake_auth = True
4298 client_context.load_cert_chain(SIGNED_CERTFILE)
4299
4300 server = ThreadedEchoServer(context=server_context, chatty=False)
4301 with server:
4302 with client_context.wrap_socket(socket.socket(),
4303 server_hostname=hostname) as s:
4304 s.connect((HOST, server.port))
4305 s.write(b'HASCERT')
4306 self.assertEqual(s.recv(1024), b'FALSE\n')
4307 s.write(b'PHA')
4308 self.assertEqual(s.recv(1024), b'OK\n')
4309 s.write(b'HASCERT')
4310 self.assertEqual(s.recv(1024), b'TRUE\n')
4311 # PHA method just returns true when cert is already available
4312 s.write(b'PHA')
4313 self.assertEqual(s.recv(1024), b'OK\n')
4314 s.write(b'GETCERT')
4315 cert_text = s.recv(4096).decode('us-ascii')
4316 self.assertIn('Python Software Foundation CA', cert_text)
4317
4318 def test_pha_required_nocert(self):
4319 client_context, server_context, hostname = testing_context()
4320 server_context.post_handshake_auth = True
4321 server_context.verify_mode = ssl.CERT_REQUIRED
4322 client_context.post_handshake_auth = True
4323
Miss Islington (bot)4c403b82019-07-09 05:55:08 -07004324 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4325 # (it is only raised sometimes on Windows)
4326 with support.catch_threading_exception() as cm:
4327 server = ThreadedEchoServer(context=server_context, chatty=False)
4328 with server:
4329 with client_context.wrap_socket(socket.socket(),
4330 server_hostname=hostname) as s:
4331 s.connect((HOST, server.port))
4332 s.write(b'PHA')
4333 # receive CertificateRequest
4334 self.assertEqual(s.recv(1024), b'OK\n')
4335 # send empty Certificate + Finish
4336 s.write(b'HASCERT')
4337 # receive alert
4338 with self.assertRaisesRegex(
4339 ssl.SSLError,
4340 'tlsv13 alert certificate required'):
4341 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004342
4343 def test_pha_optional(self):
4344 if support.verbose:
4345 sys.stdout.write("\n")
4346
4347 client_context, server_context, hostname = testing_context()
4348 server_context.post_handshake_auth = True
4349 server_context.verify_mode = ssl.CERT_REQUIRED
4350 client_context.post_handshake_auth = True
4351 client_context.load_cert_chain(SIGNED_CERTFILE)
4352
4353 # check CERT_OPTIONAL
4354 server_context.verify_mode = ssl.CERT_OPTIONAL
4355 server = ThreadedEchoServer(context=server_context, chatty=False)
4356 with server:
4357 with client_context.wrap_socket(socket.socket(),
4358 server_hostname=hostname) as s:
4359 s.connect((HOST, server.port))
4360 s.write(b'HASCERT')
4361 self.assertEqual(s.recv(1024), b'FALSE\n')
4362 s.write(b'PHA')
4363 self.assertEqual(s.recv(1024), b'OK\n')
4364 s.write(b'HASCERT')
4365 self.assertEqual(s.recv(1024), b'TRUE\n')
4366
4367 def test_pha_optional_nocert(self):
4368 if support.verbose:
4369 sys.stdout.write("\n")
4370
4371 client_context, server_context, hostname = testing_context()
4372 server_context.post_handshake_auth = True
4373 server_context.verify_mode = ssl.CERT_OPTIONAL
4374 client_context.post_handshake_auth = True
4375
4376 server = ThreadedEchoServer(context=server_context, chatty=False)
4377 with server:
4378 with client_context.wrap_socket(socket.socket(),
4379 server_hostname=hostname) as s:
4380 s.connect((HOST, server.port))
4381 s.write(b'HASCERT')
4382 self.assertEqual(s.recv(1024), b'FALSE\n')
4383 s.write(b'PHA')
4384 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004385 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004386 s.write(b'HASCERT')
4387 self.assertEqual(s.recv(1024), b'FALSE\n')
4388
4389 def test_pha_no_pha_client(self):
4390 client_context, server_context, hostname = testing_context()
4391 server_context.post_handshake_auth = True
4392 server_context.verify_mode = ssl.CERT_REQUIRED
4393 client_context.load_cert_chain(SIGNED_CERTFILE)
4394
4395 server = ThreadedEchoServer(context=server_context, chatty=False)
4396 with server:
4397 with client_context.wrap_socket(socket.socket(),
4398 server_hostname=hostname) as s:
4399 s.connect((HOST, server.port))
4400 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4401 s.verify_client_post_handshake()
4402 s.write(b'PHA')
4403 self.assertIn(b'extension not received', s.recv(1024))
4404
4405 def test_pha_no_pha_server(self):
4406 # server doesn't have PHA enabled, cert is requested in handshake
4407 client_context, server_context, hostname = testing_context()
4408 server_context.verify_mode = ssl.CERT_REQUIRED
4409 client_context.post_handshake_auth = True
4410 client_context.load_cert_chain(SIGNED_CERTFILE)
4411
4412 server = ThreadedEchoServer(context=server_context, chatty=False)
4413 with server:
4414 with client_context.wrap_socket(socket.socket(),
4415 server_hostname=hostname) as s:
4416 s.connect((HOST, server.port))
4417 s.write(b'HASCERT')
4418 self.assertEqual(s.recv(1024), b'TRUE\n')
4419 # PHA doesn't fail if there is already a cert
4420 s.write(b'PHA')
4421 self.assertEqual(s.recv(1024), b'OK\n')
4422 s.write(b'HASCERT')
4423 self.assertEqual(s.recv(1024), b'TRUE\n')
4424
4425 def test_pha_not_tls13(self):
4426 # TLS 1.2
4427 client_context, server_context, hostname = testing_context()
4428 server_context.verify_mode = ssl.CERT_REQUIRED
4429 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4430 client_context.post_handshake_auth = True
4431 client_context.load_cert_chain(SIGNED_CERTFILE)
4432
4433 server = ThreadedEchoServer(context=server_context, chatty=False)
4434 with server:
4435 with client_context.wrap_socket(socket.socket(),
4436 server_hostname=hostname) as s:
4437 s.connect((HOST, server.port))
4438 # PHA fails for TLS != 1.3
4439 s.write(b'PHA')
4440 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4441
Christian Heimesf22c4cf2019-07-01 09:25:48 +02004442 def test_bpo37428_pha_cert_none(self):
4443 # verify that post_handshake_auth does not implicitly enable cert
4444 # validation.
4445 hostname = SIGNED_CERTFILE_HOSTNAME
4446 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4447 client_context.post_handshake_auth = True
4448 client_context.load_cert_chain(SIGNED_CERTFILE)
4449 # no cert validation and CA on client side
4450 client_context.check_hostname = False
4451 client_context.verify_mode = ssl.CERT_NONE
4452
4453 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4454 server_context.load_cert_chain(SIGNED_CERTFILE)
4455 server_context.load_verify_locations(SIGNING_CA)
4456 server_context.post_handshake_auth = True
4457 server_context.verify_mode = ssl.CERT_REQUIRED
4458
4459 server = ThreadedEchoServer(context=server_context, chatty=False)
4460 with server:
4461 with client_context.wrap_socket(socket.socket(),
4462 server_hostname=hostname) as s:
4463 s.connect((HOST, server.port))
4464 s.write(b'HASCERT')
4465 self.assertEqual(s.recv(1024), b'FALSE\n')
4466 s.write(b'PHA')
4467 self.assertEqual(s.recv(1024), b'OK\n')
4468 s.write(b'HASCERT')
4469 self.assertEqual(s.recv(1024), b'TRUE\n')
4470 # server cert has not been validated
4471 self.assertEqual(s.getpeercert(), {})
4472
Christian Heimes9fb051f2018-09-23 08:32:31 +02004473
Christian Heimesc7f70692019-05-31 11:44:05 +02004474HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4475requires_keylog = unittest.skipUnless(
4476 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4477
4478class TestSSLDebug(unittest.TestCase):
4479
4480 def keylog_lines(self, fname=support.TESTFN):
4481 with open(fname) as f:
4482 return len(list(f))
4483
4484 @requires_keylog
4485 def test_keylog_defaults(self):
4486 self.addCleanup(support.unlink, support.TESTFN)
4487 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4488 self.assertEqual(ctx.keylog_filename, None)
4489
4490 self.assertFalse(os.path.isfile(support.TESTFN))
4491 ctx.keylog_filename = support.TESTFN
4492 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4493 self.assertTrue(os.path.isfile(support.TESTFN))
4494 self.assertEqual(self.keylog_lines(), 1)
4495
4496 ctx.keylog_filename = None
4497 self.assertEqual(ctx.keylog_filename, None)
4498
4499 with self.assertRaises((IsADirectoryError, PermissionError)):
4500 # Windows raises PermissionError
4501 ctx.keylog_filename = os.path.dirname(
4502 os.path.abspath(support.TESTFN))
4503
4504 with self.assertRaises(TypeError):
4505 ctx.keylog_filename = 1
4506
4507 @requires_keylog
4508 def test_keylog_filename(self):
4509 self.addCleanup(support.unlink, support.TESTFN)
4510 client_context, server_context, hostname = testing_context()
4511
4512 client_context.keylog_filename = support.TESTFN
4513 server = ThreadedEchoServer(context=server_context, chatty=False)
4514 with server:
4515 with client_context.wrap_socket(socket.socket(),
4516 server_hostname=hostname) as s:
4517 s.connect((HOST, server.port))
4518 # header, 5 lines for TLS 1.3
4519 self.assertEqual(self.keylog_lines(), 6)
4520
4521 client_context.keylog_filename = None
4522 server_context.keylog_filename = support.TESTFN
4523 server = ThreadedEchoServer(context=server_context, chatty=False)
4524 with server:
4525 with client_context.wrap_socket(socket.socket(),
4526 server_hostname=hostname) as s:
4527 s.connect((HOST, server.port))
4528 self.assertGreaterEqual(self.keylog_lines(), 11)
4529
4530 client_context.keylog_filename = support.TESTFN
4531 server_context.keylog_filename = support.TESTFN
4532 server = ThreadedEchoServer(context=server_context, chatty=False)
4533 with server:
4534 with client_context.wrap_socket(socket.socket(),
4535 server_hostname=hostname) as s:
4536 s.connect((HOST, server.port))
4537 self.assertGreaterEqual(self.keylog_lines(), 21)
4538
4539 client_context.keylog_filename = None
4540 server_context.keylog_filename = None
4541
4542 @requires_keylog
4543 @unittest.skipIf(sys.flags.ignore_environment,
4544 "test is not compatible with ignore_environment")
4545 def test_keylog_env(self):
4546 self.addCleanup(support.unlink, support.TESTFN)
4547 with unittest.mock.patch.dict(os.environ):
4548 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4549 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4550
4551 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4552 self.assertEqual(ctx.keylog_filename, None)
4553
4554 ctx = ssl.create_default_context()
4555 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4556
4557 ctx = ssl._create_stdlib_context()
4558 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4559
4560 def test_msg_callback(self):
4561 client_context, server_context, hostname = testing_context()
4562
4563 def msg_cb(conn, direction, version, content_type, msg_type, data):
4564 pass
4565
4566 self.assertIs(client_context._msg_callback, None)
4567 client_context._msg_callback = msg_cb
4568 self.assertIs(client_context._msg_callback, msg_cb)
4569 with self.assertRaises(TypeError):
4570 client_context._msg_callback = object()
4571
4572 def test_msg_callback_tls12(self):
4573 client_context, server_context, hostname = testing_context()
4574 client_context.options |= ssl.OP_NO_TLSv1_3
4575
4576 msg = []
4577
4578 def msg_cb(conn, direction, version, content_type, msg_type, data):
4579 self.assertIsInstance(conn, ssl.SSLSocket)
4580 self.assertIsInstance(data, bytes)
4581 self.assertIn(direction, {'read', 'write'})
4582 msg.append((direction, version, content_type, msg_type))
4583
4584 client_context._msg_callback = msg_cb
4585
4586 server = ThreadedEchoServer(context=server_context, chatty=False)
4587 with server:
4588 with client_context.wrap_socket(socket.socket(),
4589 server_hostname=hostname) as s:
4590 s.connect((HOST, server.port))
4591
Christian Heimese35d1ba2019-06-03 20:40:15 +02004592 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004593 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4594 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004595 msg
4596 )
4597 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004598 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4599 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004600 msg
4601 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004602
4603
Thomas Woutersed03b412007-08-28 21:37:11 +00004604def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004605 if support.verbose:
4606 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004607 'Mac': platform.mac_ver,
4608 'Windows': platform.win32_ver,
4609 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004610 for name, func in plats.items():
4611 plat = func()
4612 if plat and plat[0]:
4613 plat = '%s %r' % (name, plat)
4614 break
4615 else:
4616 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004617 print("test_ssl: testing with %r %r" %
4618 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4619 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004620 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004621 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4622 try:
4623 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4624 except AttributeError:
4625 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004626
Antoine Pitrou152efa22010-05-16 18:19:27 +00004627 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004628 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004629 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004630 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004631 BADCERT, BADKEY, EMPTYCERT]:
4632 if not os.path.exists(filename):
4633 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004634
Martin Panter3840b2a2016-03-27 01:53:46 +00004635 tests = [
4636 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004637 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004638 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004639 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004640
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004641 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004642 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004643
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004644 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004645 try:
4646 support.run_unittest(*tests)
4647 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004648 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004649
4650if __name__ == "__main__":
4651 test_main()