blob: ecb6049a6750f3c22655e6adc5700478d0c30481 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +03007from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +08008from test.support import threading_helper
Thomas Woutersed03b412007-08-28 21:37:11 +00009import socket
Bill Janssen6e027db2007-11-15 22:23:56 +000010import select
Thomas Woutersed03b412007-08-28 21:37:11 +000011import time
Christian Heimes9424bb42013-06-17 15:32:57 +020012import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000014import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000015import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000016import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000017import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020018import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000019import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000020import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000021import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000022import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010023import sysconfig
Christian Heimesdf6ac7e2019-09-26 17:02:59 +020024import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070025try:
26 import ctypes
27except ImportError:
28 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000029
Antoine Pitrou05d936d2010-10-13 11:38:36 +000030ssl = support.import_module("ssl")
31
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020032from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000033
Paul Monsonf3550692019-06-19 13:09:54 -070034Py_DEBUG = hasattr(sys, 'gettotalrefcount')
35Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
36
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010037PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Serhiy Storchaka16994912020-04-25 10:06:29 +030038HOST = socket_helper.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020039IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010040IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
41IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010042PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000043
Victor Stinner3ef63442019-02-19 18:06:03 +010044PROTOCOL_TO_TLS_VERSION = {}
45for proto, ver in (
46 ("PROTOCOL_SSLv23", "SSLv3"),
47 ("PROTOCOL_TLSv1", "TLSv1"),
48 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
49):
50 try:
51 proto = getattr(ssl, proto)
52 ver = getattr(ssl.TLSVersion, ver)
53 except AttributeError:
54 continue
55 PROTOCOL_TO_TLS_VERSION[proto] = ver
56
Christian Heimesefff7062013-11-21 03:35:02 +010057def data_file(*name):
58 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000059
Antoine Pitrou81564092010-10-08 23:06:24 +000060# The custom key and certificate files used in test_ssl are generated
61# using Lib/test/make_ssl_certs.py.
62# Other certificates are simply fetched from the Internet servers they
63# are meant to authenticate.
64
Antoine Pitrou152efa22010-05-16 18:19:27 +000065CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000066BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000067ONLYCERT = data_file("ssl_cert.pem")
68ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000069BYTES_ONLYCERT = os.fsencode(ONLYCERT)
70BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020071CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
72ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
73KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000074CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000075BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010076CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
77CAFILE_CACERT = data_file("capath", "5ed36f99.0")
78
Christian Heimesbd5c7d22018-01-20 15:16:30 +010079CERTFILE_INFO = {
80 'issuer': ((('countryName', 'XY'),),
81 (('localityName', 'Castle Anthrax'),),
82 (('organizationName', 'Python Software Foundation'),),
83 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020084 'notAfter': 'Aug 26 14:23:15 2028 GMT',
85 'notBefore': 'Aug 29 14:23:15 2018 GMT',
86 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010087 'subject': ((('countryName', 'XY'),),
88 (('localityName', 'Castle Anthrax'),),
89 (('organizationName', 'Python Software Foundation'),),
90 (('commonName', 'localhost'),)),
91 'subjectAltName': (('DNS', 'localhost'),),
92 'version': 3
93}
Antoine Pitrou152efa22010-05-16 18:19:27 +000094
Christian Heimes22587792013-11-21 23:56:13 +010095# empty CRL
96CRLFILE = data_file("revocation.crl")
97
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010098# Two keys and certs signed by the same CA (for SNI tests)
99SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101
102SIGNED_CERTFILE_INFO = {
103 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
104 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
105 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
106 'issuer': ((('countryName', 'XY'),),
107 (('organizationName', 'Python Software Foundation CA'),),
108 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200109 'notAfter': 'Jul 7 14:23:16 2028 GMT',
110 'notBefore': 'Aug 29 14:23:16 2018 GMT',
111 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100112 'subject': ((('countryName', 'XY'),),
113 (('localityName', 'Castle Anthrax'),),
114 (('organizationName', 'Python Software Foundation'),),
115 (('commonName', 'localhost'),)),
116 'subjectAltName': (('DNS', 'localhost'),),
117 'version': 3
118}
119
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100120SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200121SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100122SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
123SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
124
Martin Panter3840b2a2016-03-27 01:53:46 +0000125# Same certificate as pycacert.pem, but without extra text in file
126SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200127# cert with all kinds of subject alt names
128ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100129IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Martin Panter3d81d932016-01-14 09:36:00 +0000131REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000132
133EMPTYCERT = data_file("nullcert.pem")
134BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000135NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000136BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200137NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200138NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100139TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000140
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200141DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100142BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000143
Christian Heimes358cfd42016-09-10 22:43:48 +0200144# Not defined in all versions of OpenSSL
145OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
146OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
147OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
148OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100149OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200150
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100151
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200152def has_tls_protocol(protocol):
153 """Check if a TLS protocol is available and enabled
154
155 :param protocol: enum ssl._SSLMethod member or name
156 :return: bool
157 """
158 if isinstance(protocol, str):
159 assert protocol.startswith('PROTOCOL_')
160 protocol = getattr(ssl, protocol, None)
161 if protocol is None:
162 return False
163 if protocol in {
164 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
165 ssl.PROTOCOL_TLS_CLIENT
166 }:
167 # auto-negotiate protocols are always available
168 return True
169 name = protocol.name
170 return has_tls_version(name[len('PROTOCOL_'):])
171
172
173@functools.lru_cache
174def has_tls_version(version):
175 """Check if a TLS/SSL version is enabled
176
177 :param version: TLS version name or ssl.TLSVersion member
178 :return: bool
179 """
180 if version == "SSLv2":
181 # never supported and not even in TLSVersion enum
182 return False
183
184 if isinstance(version, str):
185 version = ssl.TLSVersion.__members__[version]
186
187 # check compile time flags like ssl.HAS_TLSv1_2
188 if not getattr(ssl, f'HAS_{version.name}'):
189 return False
190
191 # check runtime and dynamic crypto policy settings. A TLS version may
192 # be compiled in but disabled by a policy or config option.
193 ctx = ssl.SSLContext()
194 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200195 hasattr(ctx, 'minimum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200196 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
197 version < ctx.minimum_version
198 ):
199 return False
200 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200201 hasattr(ctx, 'maximum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200202 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
203 version > ctx.maximum_version
204 ):
205 return False
206
207 return True
208
209
210def requires_tls_version(version):
211 """Decorator to skip tests when a required TLS version is not available
212
213 :param version: TLS version name or ssl.TLSVersion member
214 :return:
215 """
216 def decorator(func):
217 @functools.wraps(func)
218 def wrapper(*args, **kw):
219 if not has_tls_version(version):
220 raise unittest.SkipTest(f"{version} is not available.")
221 else:
222 return func(*args, **kw)
223 return wrapper
224 return decorator
225
226
227requires_minimum_version = unittest.skipUnless(
228 hasattr(ssl.SSLContext, 'minimum_version'),
229 "required OpenSSL >= 1.1.0g"
230)
231
232
Thomas Woutersed03b412007-08-28 21:37:11 +0000233def handle_error(prefix):
234 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000235 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000236 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000237
Antoine Pitroub5218772010-05-21 09:56:06 +0000238def can_clear_options():
239 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200240 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000241
242def no_sslv2_implies_sslv3_hello():
243 # 0.9.7h or higher
244 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
245
Christian Heimes2427b502013-11-23 11:24:32 +0100246def have_verify_flags():
247 # 0.9.8 or higher
248 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
249
Christian Heimesb7b92252018-02-25 09:49:31 +0100250def _have_secp_curves():
251 if not ssl.HAS_ECDH:
252 return False
253 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
254 try:
255 ctx.set_ecdh_curve("secp384r1")
256 except ValueError:
257 return False
258 else:
259 return True
260
261
262HAVE_SECP_CURVES = _have_secp_curves()
263
264
Antoine Pitrouc695c952014-04-28 20:57:36 +0200265def utc_offset(): #NOTE: ignore issues like #1647654
266 # local time = utc time + utc offset
267 if time.daylight and time.localtime().tm_isdst > 0:
268 return -time.altzone # seconds
269 return -time.timezone
270
Christian Heimes9424bb42013-06-17 15:32:57 +0200271def asn1time(cert_time):
272 # Some versions of OpenSSL ignore seconds, see #18207
273 # 0.9.8.i
274 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
275 fmt = "%b %d %H:%M:%S %Y GMT"
276 dt = datetime.datetime.strptime(cert_time, fmt)
277 dt = dt.replace(second=0)
278 cert_time = dt.strftime(fmt)
279 # %d adds leading zero but ASN1_TIME_print() uses leading space
280 if cert_time[4] == "0":
281 cert_time = cert_time[:4] + " " + cert_time[5:]
282
283 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000284
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100285needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
286
Antoine Pitrou23df4832010-08-04 17:14:06 +0000287
Christian Heimesd0486372016-09-10 23:23:33 +0200288def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
289 cert_reqs=ssl.CERT_NONE, ca_certs=None,
290 ciphers=None, certfile=None, keyfile=None,
291 **kwargs):
292 context = ssl.SSLContext(ssl_version)
293 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200294 if cert_reqs == ssl.CERT_NONE:
295 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200296 context.verify_mode = cert_reqs
297 if ca_certs is not None:
298 context.load_verify_locations(ca_certs)
299 if certfile is not None or keyfile is not None:
300 context.load_cert_chain(certfile, keyfile)
301 if ciphers is not None:
302 context.set_ciphers(ciphers)
303 return context.wrap_socket(sock, **kwargs)
304
Christian Heimesa170fa12017-09-15 20:27:30 +0200305
306def testing_context(server_cert=SIGNED_CERTFILE):
307 """Create context
308
309 client_context, server_context, hostname = testing_context()
310 """
311 if server_cert == SIGNED_CERTFILE:
312 hostname = SIGNED_CERTFILE_HOSTNAME
313 elif server_cert == SIGNED_CERTFILE2:
314 hostname = SIGNED_CERTFILE2_HOSTNAME
315 else:
316 raise ValueError(server_cert)
317
318 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
319 client_context.load_verify_locations(SIGNING_CA)
320
321 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
322 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200323 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200324
325 return client_context, server_context, hostname
326
327
Antoine Pitrou152efa22010-05-16 18:19:27 +0000328class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000329
Antoine Pitrou480a1242010-04-28 21:37:09 +0000330 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000331 ssl.CERT_NONE
332 ssl.CERT_OPTIONAL
333 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100334 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100335 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100336 if ssl.HAS_ECDH:
337 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100338 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
339 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000340 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100341 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700342 ssl.OP_NO_SSLv2
343 ssl.OP_NO_SSLv3
344 ssl.OP_NO_TLSv1
345 ssl.OP_NO_TLSv1_3
346 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
347 ssl.OP_NO_TLSv1_1
348 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200349 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000350
Christian Heimes9d50ab52018-02-27 10:17:30 +0100351 def test_private_init(self):
352 with self.assertRaisesRegex(TypeError, "public constructor"):
353 with socket.socket() as s:
354 ssl.SSLSocket(s)
355
Antoine Pitrou172f0252014-04-18 20:33:08 +0200356 def test_str_for_enums(self):
357 # Make sure that the PROTOCOL_* constants have enum-like string
358 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200359 proto = ssl.PROTOCOL_TLS
360 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200361 ctx = ssl.SSLContext(proto)
362 self.assertIs(ctx.protocol, proto)
363
Antoine Pitrou480a1242010-04-28 21:37:09 +0000364 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000365 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000366 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000367 sys.stdout.write("\n RAND_status is %d (%s)\n"
368 % (v, (v and "sufficient randomness") or
369 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200370
371 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
372 self.assertEqual(len(data), 16)
373 self.assertEqual(is_cryptographic, v == 1)
374 if v:
375 data = ssl.RAND_bytes(16)
376 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200377 else:
378 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200379
Victor Stinner1e81a392013-12-19 16:47:04 +0100380 # negative num is invalid
381 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
382 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
383
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100384 if hasattr(ssl, 'RAND_egd'):
385 self.assertRaises(TypeError, ssl.RAND_egd, 1)
386 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000387 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200388 ssl.RAND_add(b"this is a random bytes object", 75.0)
389 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000390
Christian Heimesf77b4b22013-08-21 13:26:05 +0200391 @unittest.skipUnless(os.name == 'posix', 'requires posix')
392 def test_random_fork(self):
393 status = ssl.RAND_status()
394 if not status:
395 self.fail("OpenSSL's PRNG has insufficient randomness")
396
397 rfd, wfd = os.pipe()
398 pid = os.fork()
399 if pid == 0:
400 try:
401 os.close(rfd)
402 child_random = ssl.RAND_pseudo_bytes(16)[0]
403 self.assertEqual(len(child_random), 16)
404 os.write(wfd, child_random)
405 os.close(wfd)
406 except BaseException:
407 os._exit(1)
408 else:
409 os._exit(0)
410 else:
411 os.close(wfd)
412 self.addCleanup(os.close, rfd)
Victor Stinner278c1e12020-03-31 20:08:12 +0200413 support.wait_process(pid, exitcode=0)
Christian Heimesf77b4b22013-08-21 13:26:05 +0200414
415 child_random = os.read(rfd, 16)
416 self.assertEqual(len(child_random), 16)
417 parent_random = ssl.RAND_pseudo_bytes(16)[0]
418 self.assertEqual(len(parent_random), 16)
419
420 self.assertNotEqual(child_random, parent_random)
421
Christian Heimese6dac002018-08-30 07:25:49 +0200422 maxDiff = None
423
Antoine Pitrou480a1242010-04-28 21:37:09 +0000424 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000425 # note that this uses an 'unofficial' function in _ssl.c,
426 # provided solely for this test, to exercise the certificate
427 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100428 self.assertEqual(
429 ssl._ssl._test_decode_cert(CERTFILE),
430 CERTFILE_INFO
431 )
432 self.assertEqual(
433 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
434 SIGNED_CERTFILE_INFO
435 )
436
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200437 # Issue #13034: the subjectAltName in some certificates
438 # (notably projects.developer.nokia.com:443) wasn't parsed
439 p = ssl._ssl._test_decode_cert(NOKIACERT)
440 if support.verbose:
441 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
442 self.assertEqual(p['subjectAltName'],
443 (('DNS', 'projects.developer.nokia.com'),
444 ('DNS', 'projects.forum.nokia.com'))
445 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100446 # extra OCSP and AIA fields
447 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
448 self.assertEqual(p['caIssuers'],
449 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
450 self.assertEqual(p['crlDistributionPoints'],
451 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000452
Christian Heimesa37f5242019-01-15 23:47:42 +0100453 def test_parse_cert_CVE_2019_5010(self):
454 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
455 if support.verbose:
456 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
457 self.assertEqual(
458 p,
459 {
460 'issuer': (
461 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
462 'notAfter': 'Jun 14 18:00:58 2028 GMT',
463 'notBefore': 'Jun 18 18:00:58 2018 GMT',
464 'serialNumber': '02',
465 'subject': ((('countryName', 'UK'),),
466 (('commonName',
467 'codenomicon-vm-2.test.lal.cisco.com'),)),
468 'subjectAltName': (
469 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
470 'version': 3
471 }
472 )
473
Christian Heimes824f7f32013-08-17 00:54:47 +0200474 def test_parse_cert_CVE_2013_4238(self):
475 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
476 if support.verbose:
477 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
478 subject = ((('countryName', 'US'),),
479 (('stateOrProvinceName', 'Oregon'),),
480 (('localityName', 'Beaverton'),),
481 (('organizationName', 'Python Software Foundation'),),
482 (('organizationalUnitName', 'Python Core Development'),),
483 (('commonName', 'null.python.org\x00example.org'),),
484 (('emailAddress', 'python-dev@python.org'),))
485 self.assertEqual(p['subject'], subject)
486 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200487 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
488 san = (('DNS', 'altnull.python.org\x00example.com'),
489 ('email', 'null@python.org\x00user@example.org'),
490 ('URI', 'http://null.python.org\x00http://example.org'),
491 ('IP Address', '192.0.2.1'),
Christian Heimes2b7de662019-12-07 17:59:36 +0100492 ('IP Address', '2001:DB8:0:0:0:0:0:1'))
Christian Heimes157c9832013-08-25 14:12:41 +0200493 else:
494 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
495 san = (('DNS', 'altnull.python.org\x00example.com'),
496 ('email', 'null@python.org\x00user@example.org'),
497 ('URI', 'http://null.python.org\x00http://example.org'),
498 ('IP Address', '192.0.2.1'),
499 ('IP Address', '<invalid>'))
500
501 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200502
Christian Heimes1c03abd2016-09-06 23:25:35 +0200503 def test_parse_all_sans(self):
504 p = ssl._ssl._test_decode_cert(ALLSANFILE)
505 self.assertEqual(p['subjectAltName'],
506 (
507 ('DNS', 'allsans'),
508 ('othername', '<unsupported>'),
509 ('othername', '<unsupported>'),
510 ('email', 'user@example.org'),
511 ('DNS', 'www.example.org'),
512 ('DirName',
513 ((('countryName', 'XY'),),
514 (('localityName', 'Castle Anthrax'),),
515 (('organizationName', 'Python Software Foundation'),),
516 (('commonName', 'dirname example'),))),
517 ('URI', 'https://www.python.org/'),
518 ('IP Address', '127.0.0.1'),
Christian Heimes2b7de662019-12-07 17:59:36 +0100519 ('IP Address', '0:0:0:0:0:0:0:1'),
Christian Heimes1c03abd2016-09-06 23:25:35 +0200520 ('Registered ID', '1.2.3.4.5')
521 )
522 )
523
Antoine Pitrou480a1242010-04-28 21:37:09 +0000524 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000525 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000526 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000527 d1 = ssl.PEM_cert_to_DER_cert(pem)
528 p2 = ssl.DER_cert_to_PEM_cert(d1)
529 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000530 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000531 if not p2.startswith(ssl.PEM_HEADER + '\n'):
532 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
533 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
534 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000535
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000536 def test_openssl_version(self):
537 n = ssl.OPENSSL_VERSION_NUMBER
538 t = ssl.OPENSSL_VERSION_INFO
539 s = ssl.OPENSSL_VERSION
540 self.assertIsInstance(n, int)
541 self.assertIsInstance(t, tuple)
542 self.assertIsInstance(s, str)
543 # Some sanity checks follow
544 # >= 0.9
545 self.assertGreaterEqual(n, 0x900000)
Christian Heimes2b7de662019-12-07 17:59:36 +0100546 # < 4.0
547 self.assertLess(n, 0x40000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000548 major, minor, fix, patch, status = t
Christian Heimes2b7de662019-12-07 17:59:36 +0100549 self.assertGreaterEqual(major, 1)
550 self.assertLess(major, 4)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000551 self.assertGreaterEqual(minor, 0)
552 self.assertLess(minor, 256)
553 self.assertGreaterEqual(fix, 0)
554 self.assertLess(fix, 256)
555 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100556 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000557 self.assertGreaterEqual(status, 0)
558 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400559 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200560 if IS_LIBRESSL:
561 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100562 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400563 else:
564 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100565 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000566
Antoine Pitrou9d543662010-04-23 23:10:32 +0000567 @support.cpython_only
568 def test_refcycle(self):
569 # Issue #7943: an SSL object doesn't create reference cycles with
570 # itself.
571 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200572 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000573 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100574 with support.check_warnings(("", ResourceWarning)):
575 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100576 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000577
Antoine Pitroua468adc2010-09-14 14:43:44 +0000578 def test_wrapped_unconnected(self):
579 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200580 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000581 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200582 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100583 self.assertRaises(OSError, ss.recv, 1)
584 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
585 self.assertRaises(OSError, ss.recvfrom, 1)
586 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
587 self.assertRaises(OSError, ss.send, b'x')
588 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200589 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100590 self.assertRaises(NotImplementedError, ss.sendmsg,
591 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200592 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
593 self.assertRaises(NotImplementedError, ss.recvmsg_into,
594 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000595
Antoine Pitrou40f08742010-04-24 22:04:40 +0000596 def test_timeout(self):
597 # Issue #8524: when creating an SSL socket, the timeout of the
598 # original socket should be retained.
599 for timeout in (None, 0.0, 5.0):
600 s = socket.socket(socket.AF_INET)
601 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200602 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100603 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000604
Christian Heimesd0486372016-09-10 23:23:33 +0200605 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000606 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000607 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000608 "certfile must be specified",
609 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000610 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000611 "certfile must be specified for server-side operations",
612 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000613 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000614 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200615 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100616 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
617 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200618 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200619 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000620 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000621 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000622 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200623 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000624 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000625 ssl.wrap_socket(sock,
626 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000627 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200628 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000629 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000630 ssl.wrap_socket(sock,
631 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000632 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000633
Martin Panter3464ea22016-02-01 21:58:11 +0000634 def bad_cert_test(self, certfile):
635 """Check that trying to use the given client certificate fails"""
636 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
637 certfile)
638 sock = socket.socket()
639 self.addCleanup(sock.close)
640 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200641 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200642 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000643
644 def test_empty_cert(self):
645 """Wrapping with an empty cert file"""
646 self.bad_cert_test("nullcert.pem")
647
648 def test_malformed_cert(self):
649 """Wrapping with a badly formatted certificate (syntax error)"""
650 self.bad_cert_test("badcert.pem")
651
652 def test_malformed_key(self):
653 """Wrapping with a badly formatted key (syntax error)"""
654 self.bad_cert_test("badkey.pem")
655
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000656 def test_match_hostname(self):
657 def ok(cert, hostname):
658 ssl.match_hostname(cert, hostname)
659 def fail(cert, hostname):
660 self.assertRaises(ssl.CertificateError,
661 ssl.match_hostname, cert, hostname)
662
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100663 # -- Hostname matching --
664
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000665 cert = {'subject': ((('commonName', 'example.com'),),)}
666 ok(cert, 'example.com')
667 ok(cert, 'ExAmple.cOm')
668 fail(cert, 'www.example.com')
669 fail(cert, '.example.com')
670 fail(cert, 'example.org')
671 fail(cert, 'exampleXcom')
672
673 cert = {'subject': ((('commonName', '*.a.com'),),)}
674 ok(cert, 'foo.a.com')
675 fail(cert, 'bar.foo.a.com')
676 fail(cert, 'a.com')
677 fail(cert, 'Xa.com')
678 fail(cert, '.a.com')
679
Mandeep Singhede2ac92017-11-27 04:01:27 +0530680 # only match wildcards when they are the only thing
681 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000682 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530683 fail(cert, 'foo.com')
684 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000685 fail(cert, 'bar.com')
686 fail(cert, 'foo.a.com')
687 fail(cert, 'bar.foo.com')
688
Christian Heimes824f7f32013-08-17 00:54:47 +0200689 # NULL bytes are bad, CVE-2013-4073
690 cert = {'subject': ((('commonName',
691 'null.python.org\x00example.org'),),)}
692 ok(cert, 'null.python.org\x00example.org') # or raise an error?
693 fail(cert, 'example.org')
694 fail(cert, 'null.python.org')
695
Georg Brandl72c98d32013-10-27 07:16:53 +0100696 # error cases with wildcards
697 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
698 fail(cert, 'bar.foo.a.com')
699 fail(cert, 'a.com')
700 fail(cert, 'Xa.com')
701 fail(cert, '.a.com')
702
703 cert = {'subject': ((('commonName', 'a.*.com'),),)}
704 fail(cert, 'a.foo.com')
705 fail(cert, 'a..com')
706 fail(cert, 'a.com')
707
708 # wildcard doesn't match IDNA prefix 'xn--'
709 idna = 'püthon.python.org'.encode("idna").decode("ascii")
710 cert = {'subject': ((('commonName', idna),),)}
711 ok(cert, idna)
712 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
713 fail(cert, idna)
714 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
715 fail(cert, idna)
716
717 # wildcard in first fragment and IDNA A-labels in sequent fragments
718 # are supported.
719 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
720 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530721 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
722 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100723 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
724 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
725
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000726 # Slightly fake real-world example
727 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
728 'subject': ((('commonName', 'linuxfrz.org'),),),
729 'subjectAltName': (('DNS', 'linuxfr.org'),
730 ('DNS', 'linuxfr.com'),
731 ('othername', '<unsupported>'))}
732 ok(cert, 'linuxfr.org')
733 ok(cert, 'linuxfr.com')
734 # Not a "DNS" entry
735 fail(cert, '<unsupported>')
736 # When there is a subjectAltName, commonName isn't used
737 fail(cert, 'linuxfrz.org')
738
739 # A pristine real-world example
740 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
741 'subject': ((('countryName', 'US'),),
742 (('stateOrProvinceName', 'California'),),
743 (('localityName', 'Mountain View'),),
744 (('organizationName', 'Google Inc'),),
745 (('commonName', 'mail.google.com'),))}
746 ok(cert, 'mail.google.com')
747 fail(cert, 'gmail.com')
748 # Only commonName is considered
749 fail(cert, 'California')
750
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100751 # -- IPv4 matching --
752 cert = {'subject': ((('commonName', 'example.com'),),),
753 'subjectAltName': (('DNS', 'example.com'),
754 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200755 ('IP Address', '14.15.16.17'),
756 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100757 ok(cert, '10.11.12.13')
758 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200759 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
760 fail(cert, '127.1')
761 fail(cert, '14.15.16.17 ')
762 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100763 fail(cert, '14.15.16.18')
764 fail(cert, 'example.net')
765
766 # -- IPv6 matching --
Serhiy Storchaka16994912020-04-25 10:06:29 +0300767 if socket_helper.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100768 cert = {'subject': ((('commonName', 'example.com'),),),
769 'subjectAltName': (
770 ('DNS', 'example.com'),
771 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
772 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
773 ok(cert, '2001::cafe')
774 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200775 fail(cert, '2003::baba ')
776 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100777 fail(cert, '2003::bebe')
778 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100779
780 # -- Miscellaneous --
781
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000782 # Neither commonName nor subjectAltName
783 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
784 'subject': ((('countryName', 'US'),),
785 (('stateOrProvinceName', 'California'),),
786 (('localityName', 'Mountain View'),),
787 (('organizationName', 'Google Inc'),))}
788 fail(cert, 'mail.google.com')
789
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200790 # No DNS entry in subjectAltName but a commonName
791 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
792 'subject': ((('countryName', 'US'),),
793 (('stateOrProvinceName', 'California'),),
794 (('localityName', 'Mountain View'),),
795 (('commonName', 'mail.google.com'),)),
796 'subjectAltName': (('othername', 'blabla'), )}
797 ok(cert, 'mail.google.com')
798
799 # No DNS entry subjectAltName and no commonName
800 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
801 'subject': ((('countryName', 'US'),),
802 (('stateOrProvinceName', 'California'),),
803 (('localityName', 'Mountain View'),),
804 (('organizationName', 'Google Inc'),)),
805 'subjectAltName': (('othername', 'blabla'),)}
806 fail(cert, 'google.com')
807
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000808 # Empty cert / no cert
809 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
810 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
811
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200812 # Issue #17980: avoid denials of service by refusing more than one
813 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100814 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
815 with self.assertRaisesRegex(
816 ssl.CertificateError,
817 "partial wildcards in leftmost label are not supported"):
818 ssl.match_hostname(cert, 'axxb.example.com')
819
820 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
821 with self.assertRaisesRegex(
822 ssl.CertificateError,
823 "wildcard can only be present in the leftmost label"):
824 ssl.match_hostname(cert, 'www.sub.example.com')
825
826 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
827 with self.assertRaisesRegex(
828 ssl.CertificateError,
829 "too many wildcards"):
830 ssl.match_hostname(cert, 'axxbxxc.example.com')
831
832 cert = {'subject': ((('commonName', '*'),),)}
833 with self.assertRaisesRegex(
834 ssl.CertificateError,
835 "sole wildcard without additional labels are not support"):
836 ssl.match_hostname(cert, 'host')
837
838 cert = {'subject': ((('commonName', '*.com'),),)}
839 with self.assertRaisesRegex(
840 ssl.CertificateError,
841 r"hostname 'com' doesn't match '\*.com'"):
842 ssl.match_hostname(cert, 'com')
843
844 # extra checks for _inet_paton()
845 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
846 with self.assertRaises(ValueError):
847 ssl._inet_paton(invalid)
848 for ipaddr in ['127.0.0.1', '192.168.0.1']:
849 self.assertTrue(ssl._inet_paton(ipaddr))
Serhiy Storchaka16994912020-04-25 10:06:29 +0300850 if socket_helper.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100851 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
852 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200853
Antoine Pitroud5323212010-10-22 18:19:07 +0000854 def test_server_side(self):
855 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200856 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000857 with socket.socket() as sock:
858 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
859 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000860
Antoine Pitroud6494802011-07-21 01:11:30 +0200861 def test_unknown_channel_binding(self):
862 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200863 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200864 c = socket.socket(socket.AF_INET)
865 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200866 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100867 with self.assertRaises(ValueError):
868 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200869 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200870
871 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
872 "'tls-unique' channel binding not available")
873 def test_tls_unique_channel_binding(self):
874 # unconnected should return None for known type
875 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200876 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100877 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200878 # the same for server-side
879 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200880 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100881 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200882
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600883 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200884 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600885 r = repr(ss)
886 with self.assertWarns(ResourceWarning) as cm:
887 ss = None
888 support.gc_collect()
889 self.assertIn(r, str(cm.warning.args[0]))
890
Christian Heimes6d7ad132013-06-09 18:02:55 +0200891 def test_get_default_verify_paths(self):
892 paths = ssl.get_default_verify_paths()
893 self.assertEqual(len(paths), 6)
894 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
895
896 with support.EnvironmentVarGuard() as env:
897 env["SSL_CERT_DIR"] = CAPATH
898 env["SSL_CERT_FILE"] = CERTFILE
899 paths = ssl.get_default_verify_paths()
900 self.assertEqual(paths.cafile, CERTFILE)
901 self.assertEqual(paths.capath, CAPATH)
902
Christian Heimes44109d72013-11-22 01:51:30 +0100903 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
904 def test_enum_certificates(self):
905 self.assertTrue(ssl.enum_certificates("CA"))
906 self.assertTrue(ssl.enum_certificates("ROOT"))
907
908 self.assertRaises(TypeError, ssl.enum_certificates)
909 self.assertRaises(WindowsError, ssl.enum_certificates, "")
910
Christian Heimesc2d65e12013-11-22 16:13:55 +0100911 trust_oids = set()
912 for storename in ("CA", "ROOT"):
913 store = ssl.enum_certificates(storename)
914 self.assertIsInstance(store, list)
915 for element in store:
916 self.assertIsInstance(element, tuple)
917 self.assertEqual(len(element), 3)
918 cert, enc, trust = element
919 self.assertIsInstance(cert, bytes)
920 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Christian Heimes915cd3f2019-09-09 18:06:55 +0200921 self.assertIsInstance(trust, (frozenset, set, bool))
922 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100923 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100924
925 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100926 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200927
Christian Heimes46bebee2013-06-09 19:03:31 +0200928 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100929 def test_enum_crls(self):
930 self.assertTrue(ssl.enum_crls("CA"))
931 self.assertRaises(TypeError, ssl.enum_crls)
932 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200933
Christian Heimes44109d72013-11-22 01:51:30 +0100934 crls = ssl.enum_crls("CA")
935 self.assertIsInstance(crls, list)
936 for element in crls:
937 self.assertIsInstance(element, tuple)
938 self.assertEqual(len(element), 2)
939 self.assertIsInstance(element[0], bytes)
940 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200941
Christian Heimes46bebee2013-06-09 19:03:31 +0200942
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100943 def test_asn1object(self):
944 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
945 '1.3.6.1.5.5.7.3.1')
946
947 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
948 self.assertEqual(val, expected)
949 self.assertEqual(val.nid, 129)
950 self.assertEqual(val.shortname, 'serverAuth')
951 self.assertEqual(val.longname, 'TLS Web Server Authentication')
952 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
953 self.assertIsInstance(val, ssl._ASN1Object)
954 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
955
956 val = ssl._ASN1Object.fromnid(129)
957 self.assertEqual(val, expected)
958 self.assertIsInstance(val, ssl._ASN1Object)
959 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100960 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
961 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100962 for i in range(1000):
963 try:
964 obj = ssl._ASN1Object.fromnid(i)
965 except ValueError:
966 pass
967 else:
968 self.assertIsInstance(obj.nid, int)
969 self.assertIsInstance(obj.shortname, str)
970 self.assertIsInstance(obj.longname, str)
971 self.assertIsInstance(obj.oid, (str, type(None)))
972
973 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
974 self.assertEqual(val, expected)
975 self.assertIsInstance(val, ssl._ASN1Object)
976 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
977 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
978 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100979 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
980 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100981
Christian Heimes72d28502013-11-23 13:56:58 +0100982 def test_purpose_enum(self):
983 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
984 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
985 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
986 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
987 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
988 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
989 '1.3.6.1.5.5.7.3.1')
990
991 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
992 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
993 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
994 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
995 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
996 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
997 '1.3.6.1.5.5.7.3.2')
998
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100999 def test_unsupported_dtls(self):
1000 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1001 self.addCleanup(s.close)
1002 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +02001003 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001004 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001006 with self.assertRaises(NotImplementedError) as cx:
1007 ctx.wrap_socket(s)
1008 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1009
Antoine Pitrouc695c952014-04-28 20:57:36 +02001010 def cert_time_ok(self, timestring, timestamp):
1011 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1012
1013 def cert_time_fail(self, timestring):
1014 with self.assertRaises(ValueError):
1015 ssl.cert_time_to_seconds(timestring)
1016
1017 @unittest.skipUnless(utc_offset(),
1018 'local time needs to be different from UTC')
1019 def test_cert_time_to_seconds_timezone(self):
1020 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1021 # results if local timezone is not UTC
1022 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1023 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1024
1025 def test_cert_time_to_seconds(self):
1026 timestring = "Jan 5 09:34:43 2018 GMT"
1027 ts = 1515144883.0
1028 self.cert_time_ok(timestring, ts)
1029 # accept keyword parameter, assert its name
1030 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1031 # accept both %e and %d (space or zero generated by strftime)
1032 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1033 # case-insensitive
1034 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1035 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1036 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1037 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1038 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1039 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1040 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1041 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1042
1043 newyear_ts = 1230768000.0
1044 # leap seconds
1045 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1046 # same timestamp
1047 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1048
1049 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1050 # allow 60th second (even if it is not a leap second)
1051 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1052 # allow 2nd leap second for compatibility with time.strptime()
1053 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1054 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1055
Mike53f7a7c2017-12-14 14:04:53 +03001056 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001057 # 99991231235959Z (rfc 5280)
1058 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1059
1060 @support.run_with_locale('LC_ALL', '')
1061 def test_cert_time_to_seconds_locale(self):
1062 # `cert_time_to_seconds()` should be locale independent
1063
1064 def local_february_name():
1065 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1066
1067 if local_february_name().lower() == 'feb':
1068 self.skipTest("locale-specific month name needs to be "
1069 "different from C locale")
1070
1071 # locale-independent
1072 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1073 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1074
Martin Panter3840b2a2016-03-27 01:53:46 +00001075 def test_connect_ex_error(self):
1076 server = socket.socket(socket.AF_INET)
1077 self.addCleanup(server.close)
Serhiy Storchaka16994912020-04-25 10:06:29 +03001078 port = socket_helper.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001079 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001080 cert_reqs=ssl.CERT_REQUIRED)
1081 self.addCleanup(s.close)
1082 rc = s.connect_ex((HOST, port))
1083 # Issue #19919: Windows machines or VMs hosted on Windows
1084 # machines sometimes return EWOULDBLOCK.
1085 errors = (
1086 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1087 errno.EWOULDBLOCK,
1088 )
1089 self.assertIn(rc, errors)
1090
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001091
Antoine Pitrou152efa22010-05-16 18:19:27 +00001092class ContextTests(unittest.TestCase):
1093
1094 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001095 for protocol in PROTOCOLS:
1096 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001097 ctx = ssl.SSLContext()
1098 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001099 self.assertRaises(ValueError, ssl.SSLContext, -1)
1100 self.assertRaises(ValueError, ssl.SSLContext, 42)
1101
1102 def test_protocol(self):
1103 for proto in PROTOCOLS:
1104 ctx = ssl.SSLContext(proto)
1105 self.assertEqual(ctx.protocol, proto)
1106
1107 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001108 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001109 ctx.set_ciphers("ALL")
1110 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001111 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001112 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001113
Christian Heimes892d66e2018-01-29 14:10:18 +01001114 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1115 "Test applies only to Python default ciphers")
1116 def test_python_ciphers(self):
1117 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1118 ciphers = ctx.get_ciphers()
1119 for suite in ciphers:
1120 name = suite['name']
1121 self.assertNotIn("PSK", name)
1122 self.assertNotIn("SRP", name)
1123 self.assertNotIn("MD5", name)
1124 self.assertNotIn("RC4", name)
1125 self.assertNotIn("3DES", name)
1126
Christian Heimes25bfcd52016-09-06 00:04:45 +02001127 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1128 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001130 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001131 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001132 self.assertIn('AES256-GCM-SHA384', names)
1133 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001134
Antoine Pitroub5218772010-05-21 09:56:06 +00001135 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001136 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001137 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001138 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001139 # SSLContext also enables these by default
1140 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001141 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1142 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001143 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001144 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001145 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001146 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001147 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1148 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001149 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001150 # Ubuntu has OP_NO_SSLv3 forced on by default
1151 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001152 else:
1153 with self.assertRaises(ValueError):
1154 ctx.options = 0
1155
Christian Heimesa170fa12017-09-15 20:27:30 +02001156 def test_verify_mode_protocol(self):
1157 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001158 # Default value
1159 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1160 ctx.verify_mode = ssl.CERT_OPTIONAL
1161 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1162 ctx.verify_mode = ssl.CERT_REQUIRED
1163 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1164 ctx.verify_mode = ssl.CERT_NONE
1165 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1166 with self.assertRaises(TypeError):
1167 ctx.verify_mode = None
1168 with self.assertRaises(ValueError):
1169 ctx.verify_mode = 42
1170
Christian Heimesa170fa12017-09-15 20:27:30 +02001171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1172 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1173 self.assertFalse(ctx.check_hostname)
1174
1175 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1176 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1177 self.assertTrue(ctx.check_hostname)
1178
Christian Heimes61d478c2018-01-27 15:51:38 +01001179 def test_hostname_checks_common_name(self):
1180 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1181 self.assertTrue(ctx.hostname_checks_common_name)
1182 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1183 ctx.hostname_checks_common_name = True
1184 self.assertTrue(ctx.hostname_checks_common_name)
1185 ctx.hostname_checks_common_name = False
1186 self.assertFalse(ctx.hostname_checks_common_name)
1187 ctx.hostname_checks_common_name = True
1188 self.assertTrue(ctx.hostname_checks_common_name)
1189 else:
1190 with self.assertRaises(AttributeError):
1191 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001192
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001193 @requires_minimum_version
Christian Heimesc9bc49c2019-09-11 19:24:47 +02001194 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001195 def test_min_max_version(self):
1196 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001197 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1198 # Fedora override the setting to TLS 1.0.
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001199 minimum_range = {
1200 # stock OpenSSL
1201 ssl.TLSVersion.MINIMUM_SUPPORTED,
1202 # Fedora 29 uses TLS 1.0 by default
1203 ssl.TLSVersion.TLSv1,
1204 # RHEL 8 uses TLS 1.2 by default
1205 ssl.TLSVersion.TLSv1_2
1206 }
torsava34864d12019-12-02 17:15:42 +01001207 maximum_range = {
1208 # stock OpenSSL
1209 ssl.TLSVersion.MAXIMUM_SUPPORTED,
1210 # Fedora 32 uses TLS 1.3 by default
1211 ssl.TLSVersion.TLSv1_3
1212 }
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001213
Christian Heimes34de2d32019-01-18 16:09:30 +01001214 self.assertIn(
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001215 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001216 )
torsava34864d12019-12-02 17:15:42 +01001217 self.assertIn(
1218 ctx.maximum_version, maximum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001219 )
1220
1221 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1222 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1223 self.assertEqual(
1224 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1225 )
1226 self.assertEqual(
1227 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1228 )
1229
1230 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1231 ctx.maximum_version = ssl.TLSVersion.TLSv1
1232 self.assertEqual(
1233 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1234 )
1235 self.assertEqual(
1236 ctx.maximum_version, ssl.TLSVersion.TLSv1
1237 )
1238
1239 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1240 self.assertEqual(
1241 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1242 )
1243
1244 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1245 self.assertIn(
1246 ctx.maximum_version,
1247 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1248 )
1249
1250 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1251 self.assertIn(
1252 ctx.minimum_version,
1253 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1254 )
1255
1256 with self.assertRaises(ValueError):
1257 ctx.minimum_version = 42
1258
1259 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1260
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001261 self.assertIn(
1262 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001263 )
1264 self.assertEqual(
1265 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1266 )
1267 with self.assertRaises(ValueError):
1268 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1269 with self.assertRaises(ValueError):
1270 ctx.maximum_version = ssl.TLSVersion.TLSv1
1271
1272
Christian Heimes2427b502013-11-23 11:24:32 +01001273 @unittest.skipUnless(have_verify_flags(),
1274 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001275 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001276 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001277 # default value
1278 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1279 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001280 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1281 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1282 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1283 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1284 ctx.verify_flags = ssl.VERIFY_DEFAULT
1285 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1286 # supports any value
1287 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1288 self.assertEqual(ctx.verify_flags,
1289 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1290 with self.assertRaises(TypeError):
1291 ctx.verify_flags = None
1292
Antoine Pitrou152efa22010-05-16 18:19:27 +00001293 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001294 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001295 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001296 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001297 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1298 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001299 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001300 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001301 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001302 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001303 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001304 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001305 ctx.load_cert_chain(EMPTYCERT)
1306 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001307 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001308 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1309 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1310 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001311 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001312 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001313 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001314 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001315 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001316 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1317 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001318 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001319 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001320 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001321 # Password protected key and cert
1322 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1323 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1324 ctx.load_cert_chain(CERTFILE_PROTECTED,
1325 password=bytearray(KEY_PASSWORD.encode()))
1326 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1327 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1328 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1329 bytearray(KEY_PASSWORD.encode()))
1330 with self.assertRaisesRegex(TypeError, "should be a string"):
1331 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1332 with self.assertRaises(ssl.SSLError):
1333 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1334 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1335 # openssl has a fixed limit on the password buffer.
1336 # PEM_BUFSIZE is generally set to 1kb.
1337 # Return a string larger than this.
1338 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1339 # Password callback
1340 def getpass_unicode():
1341 return KEY_PASSWORD
1342 def getpass_bytes():
1343 return KEY_PASSWORD.encode()
1344 def getpass_bytearray():
1345 return bytearray(KEY_PASSWORD.encode())
1346 def getpass_badpass():
1347 return "badpass"
1348 def getpass_huge():
1349 return b'a' * (1024 * 1024)
1350 def getpass_bad_type():
1351 return 9
1352 def getpass_exception():
1353 raise Exception('getpass error')
1354 class GetPassCallable:
1355 def __call__(self):
1356 return KEY_PASSWORD
1357 def getpass(self):
1358 return KEY_PASSWORD
1359 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1360 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1361 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1362 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1363 ctx.load_cert_chain(CERTFILE_PROTECTED,
1364 password=GetPassCallable().getpass)
1365 with self.assertRaises(ssl.SSLError):
1366 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1367 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1368 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1369 with self.assertRaisesRegex(TypeError, "must return a string"):
1370 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1371 with self.assertRaisesRegex(Exception, "getpass error"):
1372 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1373 # Make sure the password function isn't called if it isn't needed
1374 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001375
1376 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001377 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001378 ctx.load_verify_locations(CERTFILE)
1379 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1380 ctx.load_verify_locations(BYTES_CERTFILE)
1381 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1382 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001383 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001384 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001385 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001386 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001387 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001388 ctx.load_verify_locations(BADCERT)
1389 ctx.load_verify_locations(CERTFILE, CAPATH)
1390 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1391
Victor Stinner80f75e62011-01-29 11:31:20 +00001392 # Issue #10989: crash if the second argument type is invalid
1393 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1394
Christian Heimesefff7062013-11-21 03:35:02 +01001395 def test_load_verify_cadata(self):
1396 # test cadata
1397 with open(CAFILE_CACERT) as f:
1398 cacert_pem = f.read()
1399 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1400 with open(CAFILE_NEURONIO) as f:
1401 neuronio_pem = f.read()
1402 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1403
1404 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001405 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001406 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1407 ctx.load_verify_locations(cadata=cacert_pem)
1408 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1409 ctx.load_verify_locations(cadata=neuronio_pem)
1410 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1411 # cert already in hash table
1412 ctx.load_verify_locations(cadata=neuronio_pem)
1413 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1414
1415 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001416 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001417 combined = "\n".join((cacert_pem, neuronio_pem))
1418 ctx.load_verify_locations(cadata=combined)
1419 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1420
1421 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001423 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1424 neuronio_pem, "tail"]
1425 ctx.load_verify_locations(cadata="\n".join(combined))
1426 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1427
1428 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001429 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001430 ctx.load_verify_locations(cadata=cacert_der)
1431 ctx.load_verify_locations(cadata=neuronio_der)
1432 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1433 # cert already in hash table
1434 ctx.load_verify_locations(cadata=cacert_der)
1435 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1436
1437 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001438 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001439 combined = b"".join((cacert_der, neuronio_der))
1440 ctx.load_verify_locations(cadata=combined)
1441 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1442
1443 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001445 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1446
1447 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1448 ctx.load_verify_locations(cadata="broken")
1449 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1450 ctx.load_verify_locations(cadata=b"broken")
1451
1452
Paul Monsonf3550692019-06-19 13:09:54 -07001453 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001454 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001455 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001456 ctx.load_dh_params(DHFILE)
1457 if os.name != 'nt':
1458 ctx.load_dh_params(BYTES_DHFILE)
1459 self.assertRaises(TypeError, ctx.load_dh_params)
1460 self.assertRaises(TypeError, ctx.load_dh_params, None)
1461 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001462 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001463 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001464 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001465 ctx.load_dh_params(CERTFILE)
1466
Antoine Pitroub0182c82010-10-12 20:09:02 +00001467 def test_session_stats(self):
1468 for proto in PROTOCOLS:
1469 ctx = ssl.SSLContext(proto)
1470 self.assertEqual(ctx.session_stats(), {
1471 'number': 0,
1472 'connect': 0,
1473 'connect_good': 0,
1474 'connect_renegotiate': 0,
1475 'accept': 0,
1476 'accept_good': 0,
1477 'accept_renegotiate': 0,
1478 'hits': 0,
1479 'misses': 0,
1480 'timeouts': 0,
1481 'cache_full': 0,
1482 })
1483
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001484 def test_set_default_verify_paths(self):
1485 # There's not much we can do to test that it acts as expected,
1486 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001487 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001488 ctx.set_default_verify_paths()
1489
Antoine Pitrou501da612011-12-21 09:27:41 +01001490 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001491 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001493 ctx.set_ecdh_curve("prime256v1")
1494 ctx.set_ecdh_curve(b"prime256v1")
1495 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1496 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1497 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1498 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1499
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001500 @needs_sni
1501 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001502 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001503
1504 # set_servername_callback expects a callable, or None
1505 self.assertRaises(TypeError, ctx.set_servername_callback)
1506 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1507 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1508 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1509
1510 def dummycallback(sock, servername, ctx):
1511 pass
1512 ctx.set_servername_callback(None)
1513 ctx.set_servername_callback(dummycallback)
1514
1515 @needs_sni
1516 def test_sni_callback_refcycle(self):
1517 # Reference cycles through the servername callback are detected
1518 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001519 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001520 def dummycallback(sock, servername, ctx, cycle=ctx):
1521 pass
1522 ctx.set_servername_callback(dummycallback)
1523 wr = weakref.ref(ctx)
1524 del ctx, dummycallback
1525 gc.collect()
1526 self.assertIs(wr(), None)
1527
Christian Heimes9a5395a2013-06-17 15:44:12 +02001528 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001529 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001530 self.assertEqual(ctx.cert_store_stats(),
1531 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1532 ctx.load_cert_chain(CERTFILE)
1533 self.assertEqual(ctx.cert_store_stats(),
1534 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1535 ctx.load_verify_locations(CERTFILE)
1536 self.assertEqual(ctx.cert_store_stats(),
1537 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001538 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001539 self.assertEqual(ctx.cert_store_stats(),
1540 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1541
1542 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001543 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001544 self.assertEqual(ctx.get_ca_certs(), [])
1545 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1546 ctx.load_verify_locations(CERTFILE)
1547 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001548 # but CAFILE_CACERT is a CA cert
1549 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001550 self.assertEqual(ctx.get_ca_certs(),
1551 [{'issuer': ((('organizationName', 'Root CA'),),
1552 (('organizationalUnitName', 'http://www.cacert.org'),),
1553 (('commonName', 'CA Cert Signing Authority'),),
1554 (('emailAddress', 'support@cacert.org'),)),
1555 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1556 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1557 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001558 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001559 'subject': ((('organizationName', 'Root CA'),),
1560 (('organizationalUnitName', 'http://www.cacert.org'),),
1561 (('commonName', 'CA Cert Signing Authority'),),
1562 (('emailAddress', 'support@cacert.org'),)),
1563 'version': 3}])
1564
Martin Panterb55f8b72016-01-14 12:53:56 +00001565 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001566 pem = f.read()
1567 der = ssl.PEM_cert_to_DER_cert(pem)
1568 self.assertEqual(ctx.get_ca_certs(True), [der])
1569
Christian Heimes72d28502013-11-23 13:56:58 +01001570 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001571 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001572 ctx.load_default_certs()
1573
Christian Heimesa170fa12017-09-15 20:27:30 +02001574 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001575 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1576 ctx.load_default_certs()
1577
Christian Heimesa170fa12017-09-15 20:27:30 +02001578 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001579 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1580
Christian Heimesa170fa12017-09-15 20:27:30 +02001581 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001582 self.assertRaises(TypeError, ctx.load_default_certs, None)
1583 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1584
Benjamin Peterson91244e02014-10-03 18:17:15 -04001585 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001586 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001587 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001588 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001589 with support.EnvironmentVarGuard() as env:
1590 env["SSL_CERT_DIR"] = CAPATH
1591 env["SSL_CERT_FILE"] = CERTFILE
1592 ctx.load_default_certs()
1593 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1594
Benjamin Peterson91244e02014-10-03 18:17:15 -04001595 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001596 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001597 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001598 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001599 ctx.load_default_certs()
1600 stats = ctx.cert_store_stats()
1601
Christian Heimesa170fa12017-09-15 20:27:30 +02001602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001603 with support.EnvironmentVarGuard() as env:
1604 env["SSL_CERT_DIR"] = CAPATH
1605 env["SSL_CERT_FILE"] = CERTFILE
1606 ctx.load_default_certs()
1607 stats["x509"] += 1
1608 self.assertEqual(ctx.cert_store_stats(), stats)
1609
Christian Heimes358cfd42016-09-10 22:43:48 +02001610 def _assert_context_options(self, ctx):
1611 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1612 if OP_NO_COMPRESSION != 0:
1613 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1614 OP_NO_COMPRESSION)
1615 if OP_SINGLE_DH_USE != 0:
1616 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1617 OP_SINGLE_DH_USE)
1618 if OP_SINGLE_ECDH_USE != 0:
1619 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1620 OP_SINGLE_ECDH_USE)
1621 if OP_CIPHER_SERVER_PREFERENCE != 0:
1622 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1623 OP_CIPHER_SERVER_PREFERENCE)
1624
Christian Heimes4c05b472013-11-23 15:58:30 +01001625 def test_create_default_context(self):
1626 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001627
Christian Heimesa170fa12017-09-15 20:27:30 +02001628 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001629 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001630 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001631 self._assert_context_options(ctx)
1632
Christian Heimes4c05b472013-11-23 15:58:30 +01001633 with open(SIGNING_CA) as f:
1634 cadata = f.read()
1635 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1636 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001638 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001639 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001640
1641 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001642 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001643 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001644 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001645
Christian Heimes67986f92013-11-23 22:43:47 +01001646 def test__create_stdlib_context(self):
1647 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001648 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001649 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001650 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001651 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001652
1653 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1654 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1655 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001656 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001657
1658 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001659 cert_reqs=ssl.CERT_REQUIRED,
1660 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001661 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1662 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001663 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001664 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001665
1666 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001667 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001668 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001669 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001670
Christian Heimes1aa9a752013-12-02 02:41:19 +01001671 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001672 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001673 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001674 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001675
Christian Heimese82c0342017-09-15 20:29:57 +02001676 # Auto set CERT_REQUIRED
1677 ctx.check_hostname = True
1678 self.assertTrue(ctx.check_hostname)
1679 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1680 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001681 ctx.verify_mode = ssl.CERT_REQUIRED
1682 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001683 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001684
Christian Heimese82c0342017-09-15 20:29:57 +02001685 # Changing verify_mode does not affect check_hostname
1686 ctx.check_hostname = False
1687 ctx.verify_mode = ssl.CERT_NONE
1688 ctx.check_hostname = False
1689 self.assertFalse(ctx.check_hostname)
1690 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1691 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001692 ctx.check_hostname = True
1693 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001694 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1695
1696 ctx.check_hostname = False
1697 ctx.verify_mode = ssl.CERT_OPTIONAL
1698 ctx.check_hostname = False
1699 self.assertFalse(ctx.check_hostname)
1700 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1701 # keep CERT_OPTIONAL
1702 ctx.check_hostname = True
1703 self.assertTrue(ctx.check_hostname)
1704 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001705
1706 # Cannot set CERT_NONE with check_hostname enabled
1707 with self.assertRaises(ValueError):
1708 ctx.verify_mode = ssl.CERT_NONE
1709 ctx.check_hostname = False
1710 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001711 ctx.verify_mode = ssl.CERT_NONE
1712 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001713
Christian Heimes5fe668c2016-09-12 00:01:11 +02001714 def test_context_client_server(self):
1715 # PROTOCOL_TLS_CLIENT has sane defaults
1716 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1717 self.assertTrue(ctx.check_hostname)
1718 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1719
1720 # PROTOCOL_TLS_SERVER has different but also sane defaults
1721 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1722 self.assertFalse(ctx.check_hostname)
1723 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1724
Christian Heimes4df60f12017-09-15 20:26:05 +02001725 def test_context_custom_class(self):
1726 class MySSLSocket(ssl.SSLSocket):
1727 pass
1728
1729 class MySSLObject(ssl.SSLObject):
1730 pass
1731
1732 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1733 ctx.sslsocket_class = MySSLSocket
1734 ctx.sslobject_class = MySSLObject
1735
1736 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1737 self.assertIsInstance(sock, MySSLSocket)
1738 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1739 self.assertIsInstance(obj, MySSLObject)
1740
Christian Heimes78c7d522019-06-03 21:00:10 +02001741 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1742 def test_num_tickest(self):
1743 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1744 self.assertEqual(ctx.num_tickets, 2)
1745 ctx.num_tickets = 1
1746 self.assertEqual(ctx.num_tickets, 1)
1747 ctx.num_tickets = 0
1748 self.assertEqual(ctx.num_tickets, 0)
1749 with self.assertRaises(ValueError):
1750 ctx.num_tickets = -1
1751 with self.assertRaises(TypeError):
1752 ctx.num_tickets = None
1753
1754 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1755 self.assertEqual(ctx.num_tickets, 2)
1756 with self.assertRaises(ValueError):
1757 ctx.num_tickets = 1
1758
Antoine Pitrou152efa22010-05-16 18:19:27 +00001759
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001760class SSLErrorTests(unittest.TestCase):
1761
1762 def test_str(self):
1763 # The str() of a SSLError doesn't include the errno
1764 e = ssl.SSLError(1, "foo")
1765 self.assertEqual(str(e), "foo")
1766 self.assertEqual(e.errno, 1)
1767 # Same for a subclass
1768 e = ssl.SSLZeroReturnError(1, "foo")
1769 self.assertEqual(str(e), "foo")
1770 self.assertEqual(e.errno, 1)
1771
Paul Monsonf3550692019-06-19 13:09:54 -07001772 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001773 def test_lib_reason(self):
1774 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001775 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001776 with self.assertRaises(ssl.SSLError) as cm:
1777 ctx.load_dh_params(CERTFILE)
1778 self.assertEqual(cm.exception.library, 'PEM')
1779 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1780 s = str(cm.exception)
1781 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1782
1783 def test_subclass(self):
1784 # Check that the appropriate SSLError subclass is raised
1785 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001786 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1787 ctx.check_hostname = False
1788 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001789 with socket.create_server(("127.0.0.1", 0)) as s:
1790 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001791 c.setblocking(False)
1792 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001793 with self.assertRaises(ssl.SSLWantReadError) as cm:
1794 c.do_handshake()
1795 s = str(cm.exception)
1796 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1797 # For compatibility
1798 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1799
1800
Christian Heimes61d478c2018-01-27 15:51:38 +01001801 def test_bad_server_hostname(self):
1802 ctx = ssl.create_default_context()
1803 with self.assertRaises(ValueError):
1804 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1805 server_hostname="")
1806 with self.assertRaises(ValueError):
1807 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1808 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001809 with self.assertRaises(TypeError):
1810 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1811 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001812
1813
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001814class MemoryBIOTests(unittest.TestCase):
1815
1816 def test_read_write(self):
1817 bio = ssl.MemoryBIO()
1818 bio.write(b'foo')
1819 self.assertEqual(bio.read(), b'foo')
1820 self.assertEqual(bio.read(), b'')
1821 bio.write(b'foo')
1822 bio.write(b'bar')
1823 self.assertEqual(bio.read(), b'foobar')
1824 self.assertEqual(bio.read(), b'')
1825 bio.write(b'baz')
1826 self.assertEqual(bio.read(2), b'ba')
1827 self.assertEqual(bio.read(1), b'z')
1828 self.assertEqual(bio.read(1), b'')
1829
1830 def test_eof(self):
1831 bio = ssl.MemoryBIO()
1832 self.assertFalse(bio.eof)
1833 self.assertEqual(bio.read(), b'')
1834 self.assertFalse(bio.eof)
1835 bio.write(b'foo')
1836 self.assertFalse(bio.eof)
1837 bio.write_eof()
1838 self.assertFalse(bio.eof)
1839 self.assertEqual(bio.read(2), b'fo')
1840 self.assertFalse(bio.eof)
1841 self.assertEqual(bio.read(1), b'o')
1842 self.assertTrue(bio.eof)
1843 self.assertEqual(bio.read(), b'')
1844 self.assertTrue(bio.eof)
1845
1846 def test_pending(self):
1847 bio = ssl.MemoryBIO()
1848 self.assertEqual(bio.pending, 0)
1849 bio.write(b'foo')
1850 self.assertEqual(bio.pending, 3)
1851 for i in range(3):
1852 bio.read(1)
1853 self.assertEqual(bio.pending, 3-i-1)
1854 for i in range(3):
1855 bio.write(b'x')
1856 self.assertEqual(bio.pending, i+1)
1857 bio.read()
1858 self.assertEqual(bio.pending, 0)
1859
1860 def test_buffer_types(self):
1861 bio = ssl.MemoryBIO()
1862 bio.write(b'foo')
1863 self.assertEqual(bio.read(), b'foo')
1864 bio.write(bytearray(b'bar'))
1865 self.assertEqual(bio.read(), b'bar')
1866 bio.write(memoryview(b'baz'))
1867 self.assertEqual(bio.read(), b'baz')
1868
1869 def test_error_types(self):
1870 bio = ssl.MemoryBIO()
1871 self.assertRaises(TypeError, bio.write, 'foo')
1872 self.assertRaises(TypeError, bio.write, None)
1873 self.assertRaises(TypeError, bio.write, True)
1874 self.assertRaises(TypeError, bio.write, 1)
1875
1876
Christian Heimes9d50ab52018-02-27 10:17:30 +01001877class SSLObjectTests(unittest.TestCase):
1878 def test_private_init(self):
1879 bio = ssl.MemoryBIO()
1880 with self.assertRaisesRegex(TypeError, "public constructor"):
1881 ssl.SSLObject(bio, bio)
1882
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001883 def test_unwrap(self):
1884 client_ctx, server_ctx, hostname = testing_context()
1885 c_in = ssl.MemoryBIO()
1886 c_out = ssl.MemoryBIO()
1887 s_in = ssl.MemoryBIO()
1888 s_out = ssl.MemoryBIO()
1889 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1890 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1891
1892 # Loop on the handshake for a bit to get it settled
1893 for _ in range(5):
1894 try:
1895 client.do_handshake()
1896 except ssl.SSLWantReadError:
1897 pass
1898 if c_out.pending:
1899 s_in.write(c_out.read())
1900 try:
1901 server.do_handshake()
1902 except ssl.SSLWantReadError:
1903 pass
1904 if s_out.pending:
1905 c_in.write(s_out.read())
1906 # Now the handshakes should be complete (don't raise WantReadError)
1907 client.do_handshake()
1908 server.do_handshake()
1909
1910 # Now if we unwrap one side unilaterally, it should send close-notify
1911 # and raise WantReadError:
1912 with self.assertRaises(ssl.SSLWantReadError):
1913 client.unwrap()
1914
1915 # But server.unwrap() does not raise, because it reads the client's
1916 # close-notify:
1917 s_in.write(c_out.read())
1918 server.unwrap()
1919
1920 # And now that the client gets the server's close-notify, it doesn't
1921 # raise either.
1922 c_in.write(s_out.read())
1923 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001924
Martin Panter3840b2a2016-03-27 01:53:46 +00001925class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 """Tests that connect to a simple server running in the background"""
1927
1928 def setUp(self):
1929 server = ThreadedEchoServer(SIGNED_CERTFILE)
1930 self.server_addr = (HOST, server.port)
1931 server.__enter__()
1932 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001933
Antoine Pitrou480a1242010-04-28 21:37:09 +00001934 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001935 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 cert_reqs=ssl.CERT_NONE) as s:
1937 s.connect(self.server_addr)
1938 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001939 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001940
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001942 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001943 cert_reqs=ssl.CERT_REQUIRED,
1944 ca_certs=SIGNING_CA) as s:
1945 s.connect(self.server_addr)
1946 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001947 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001948
Martin Panter3840b2a2016-03-27 01:53:46 +00001949 def test_connect_fail(self):
1950 # This should fail because we have no verification certs. Connection
1951 # failure crashes ThreadedEchoServer, so run this in an independent
1952 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001953 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001954 cert_reqs=ssl.CERT_REQUIRED)
1955 self.addCleanup(s.close)
1956 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1957 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001958
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001959 def test_connect_ex(self):
1960 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001961 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 cert_reqs=ssl.CERT_REQUIRED,
1963 ca_certs=SIGNING_CA)
1964 self.addCleanup(s.close)
1965 self.assertEqual(0, s.connect_ex(self.server_addr))
1966 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001967
1968 def test_non_blocking_connect_ex(self):
1969 # Issue #11326: non-blocking connect_ex() should allow handshake
1970 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001971 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001972 cert_reqs=ssl.CERT_REQUIRED,
1973 ca_certs=SIGNING_CA,
1974 do_handshake_on_connect=False)
1975 self.addCleanup(s.close)
1976 s.setblocking(False)
1977 rc = s.connect_ex(self.server_addr)
1978 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1979 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1980 # Wait for connect to finish
1981 select.select([], [s], [], 5.0)
1982 # Non-blocking handshake
1983 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001984 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 s.do_handshake()
1986 break
1987 except ssl.SSLWantReadError:
1988 select.select([s], [], [], 5.0)
1989 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001990 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 # SSL established
1992 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001993
Antoine Pitrou152efa22010-05-16 18:19:27 +00001994 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001996 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001997 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1998 s.connect(self.server_addr)
1999 self.assertEqual({}, s.getpeercert())
2000 # Same with a server hostname
2001 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2002 server_hostname="dummy") as s:
2003 s.connect(self.server_addr)
2004 ctx.verify_mode = ssl.CERT_REQUIRED
2005 # This should succeed because we specify the root cert
2006 ctx.load_verify_locations(SIGNING_CA)
2007 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2008 s.connect(self.server_addr)
2009 cert = s.getpeercert()
2010 self.assertTrue(cert)
2011
2012 def test_connect_with_context_fail(self):
2013 # This should fail because we have no verification certs. Connection
2014 # failure crashes ThreadedEchoServer, so run this in an independent
2015 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002016 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002017 ctx.verify_mode = ssl.CERT_REQUIRED
2018 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2019 self.addCleanup(s.close)
2020 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2021 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002022
2023 def test_connect_capath(self):
2024 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002025 # NOTE: the subject hashing algorithm has been changed between
2026 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2027 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002028 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002029 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002030 ctx.verify_mode = ssl.CERT_REQUIRED
2031 ctx.load_verify_locations(capath=CAPATH)
2032 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2033 s.connect(self.server_addr)
2034 cert = s.getpeercert()
2035 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002036
Martin Panter3840b2a2016-03-27 01:53:46 +00002037 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002038 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002039 ctx.verify_mode = ssl.CERT_REQUIRED
2040 ctx.load_verify_locations(capath=BYTES_CAPATH)
2041 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2042 s.connect(self.server_addr)
2043 cert = s.getpeercert()
2044 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002045
Christian Heimesefff7062013-11-21 03:35:02 +01002046 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002047 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002048 pem = f.read()
2049 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 ctx.verify_mode = ssl.CERT_REQUIRED
2052 ctx.load_verify_locations(cadata=pem)
2053 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2054 s.connect(self.server_addr)
2055 cert = s.getpeercert()
2056 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002057
Martin Panter3840b2a2016-03-27 01:53:46 +00002058 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002060 ctx.verify_mode = ssl.CERT_REQUIRED
2061 ctx.load_verify_locations(cadata=der)
2062 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2063 s.connect(self.server_addr)
2064 cert = s.getpeercert()
2065 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002066
Antoine Pitroue3220242010-04-24 11:13:53 +00002067 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2068 def test_makefile_close(self):
2069 # Issue #5238: creating a file-like object with makefile() shouldn't
2070 # delay closing the underlying "real socket" (here tested with its
2071 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002072 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002073 ss.connect(self.server_addr)
2074 fd = ss.fileno()
2075 f = ss.makefile()
2076 f.close()
2077 # The fd is still open
2078 os.read(fd, 0)
2079 # Closing the SSL socket should close the fd too
2080 ss.close()
2081 gc.collect()
2082 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002083 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002084 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002085
Antoine Pitrou480a1242010-04-28 21:37:09 +00002086 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002087 s = socket.socket(socket.AF_INET)
2088 s.connect(self.server_addr)
2089 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002090 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002091 cert_reqs=ssl.CERT_NONE,
2092 do_handshake_on_connect=False)
2093 self.addCleanup(s.close)
2094 count = 0
2095 while True:
2096 try:
2097 count += 1
2098 s.do_handshake()
2099 break
2100 except ssl.SSLWantReadError:
2101 select.select([s], [], [])
2102 except ssl.SSLWantWriteError:
2103 select.select([], [s], [])
2104 if support.verbose:
2105 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002106
Antoine Pitrou480a1242010-04-28 21:37:09 +00002107 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002108 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002109
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 def test_get_server_certificate_fail(self):
2111 # Connection failure crashes ThreadedEchoServer, so run this in an
2112 # independent test method
2113 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002114
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002115 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002116 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002117 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2118 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002119 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002120 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2121 s.connect(self.server_addr)
2122 # Error checking can happen at instantiation or when connecting
2123 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2124 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002125 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002126 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2127 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002128
Christian Heimes9a5395a2013-06-17 15:44:12 +02002129 def test_get_ca_certs_capath(self):
2130 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002131 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002132 ctx.load_verify_locations(capath=CAPATH)
2133 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002134 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2135 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002136 s.connect(self.server_addr)
2137 cert = s.getpeercert()
2138 self.assertTrue(cert)
2139 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002140
Christian Heimes575596e2013-12-15 21:49:17 +01002141 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002142 def test_context_setget(self):
2143 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002144 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2145 ctx1.load_verify_locations(capath=CAPATH)
2146 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2147 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002148 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002149 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002150 ss.connect(self.server_addr)
2151 self.assertIs(ss.context, ctx1)
2152 self.assertIs(ss._sslobj.context, ctx1)
2153 ss.context = ctx2
2154 self.assertIs(ss.context, ctx2)
2155 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002156
2157 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2158 # A simple IO loop. Call func(*args) depending on the error we get
2159 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
Victor Stinner0d63bac2019-12-11 11:30:03 +01002160 timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
Victor Stinner50a72af2017-09-11 09:34:24 -07002161 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002162 count = 0
2163 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002164 if time.monotonic() > deadline:
2165 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002166 errno = None
2167 count += 1
2168 try:
2169 ret = func(*args)
2170 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002171 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002172 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002173 raise
2174 errno = e.errno
2175 # Get any data from the outgoing BIO irrespective of any error, and
2176 # send it to the socket.
2177 buf = outgoing.read()
2178 sock.sendall(buf)
2179 # If there's no error, we're done. For WANT_READ, we need to get
2180 # data from the socket and put it in the incoming BIO.
2181 if errno is None:
2182 break
2183 elif errno == ssl.SSL_ERROR_WANT_READ:
2184 buf = sock.recv(32768)
2185 if buf:
2186 incoming.write(buf)
2187 else:
2188 incoming.write_eof()
2189 if support.verbose:
2190 sys.stdout.write("Needed %d calls to complete %s().\n"
2191 % (count, func.__name__))
2192 return ret
2193
Martin Panter3840b2a2016-03-27 01:53:46 +00002194 def test_bio_handshake(self):
2195 sock = socket.socket(socket.AF_INET)
2196 self.addCleanup(sock.close)
2197 sock.connect(self.server_addr)
2198 incoming = ssl.MemoryBIO()
2199 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002200 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2201 self.assertTrue(ctx.check_hostname)
2202 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002203 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002204 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2205 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002206 self.assertIs(sslobj._sslobj.owner, sslobj)
2207 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002208 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002209 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002210 self.assertRaises(ValueError, sslobj.getpeercert)
2211 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2212 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2213 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2214 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002215 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002216 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002217 self.assertTrue(sslobj.getpeercert())
2218 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2219 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2220 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002221 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002222 except ssl.SSLSyscallError:
2223 # If the server shuts down the TCP connection without sending a
2224 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2225 pass
2226 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2227
2228 def test_bio_read_write_data(self):
2229 sock = socket.socket(socket.AF_INET)
2230 self.addCleanup(sock.close)
2231 sock.connect(self.server_addr)
2232 incoming = ssl.MemoryBIO()
2233 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002234 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002235 ctx.verify_mode = ssl.CERT_NONE
2236 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2237 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2238 req = b'FOO\n'
2239 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2240 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2241 self.assertEqual(buf, b'foo\n')
2242 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002243
2244
Martin Panter3840b2a2016-03-27 01:53:46 +00002245class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002246
Martin Panter3840b2a2016-03-27 01:53:46 +00002247 def test_timeout_connect_ex(self):
2248 # Issue #12065: on a timeout, connect_ex() should return the original
2249 # errno (mimicking the behaviour of non-SSL sockets).
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +03002250 with socket_helper.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002251 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002252 cert_reqs=ssl.CERT_REQUIRED,
2253 do_handshake_on_connect=False)
2254 self.addCleanup(s.close)
2255 s.settimeout(0.0000001)
2256 rc = s.connect_ex((REMOTE_HOST, 443))
2257 if rc == 0:
2258 self.skipTest("REMOTE_HOST responded too quickly")
2259 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2260
Serhiy Storchaka16994912020-04-25 10:06:29 +03002261 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6')
Martin Panter3840b2a2016-03-27 01:53:46 +00002262 def test_get_server_certificate_ipv6(self):
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +03002263 with socket_helper.transient_internet('ipv6.google.com'):
Martin Panter3840b2a2016-03-27 01:53:46 +00002264 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2265 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2266
Martin Panter3840b2a2016-03-27 01:53:46 +00002267
2268def _test_get_server_certificate(test, host, port, cert=None):
2269 pem = ssl.get_server_certificate((host, port))
2270 if not pem:
2271 test.fail("No server certificate on %s:%s!" % (host, port))
2272
2273 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2274 if not pem:
2275 test.fail("No server certificate on %s:%s!" % (host, port))
2276 if support.verbose:
2277 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2278
2279def _test_get_server_certificate_fail(test, host, port):
2280 try:
2281 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2282 except ssl.SSLError as x:
2283 #should fail
2284 if support.verbose:
2285 sys.stdout.write("%s\n" % x)
2286 else:
2287 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2288
2289
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002290from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002292class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002293
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002295
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002296 """A mildly complicated class, because we want it to work both
2297 with and without the SSL wrapper around the socket connection, so
2298 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002299
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002300 def __init__(self, server, connsock, addr):
2301 self.server = server
2302 self.running = False
2303 self.sock = connsock
2304 self.addr = addr
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03002305 self.sock.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002306 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002307 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002308 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002309
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002310 def wrap_conn(self):
2311 try:
2312 self.sslconn = self.server.context.wrap_socket(
2313 self.sock, server_side=True)
2314 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2315 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002316 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002317 # We treat ConnectionResetError as though it were an
2318 # SSLError - OpenSSL on Ubuntu abruptly closes the
2319 # connection when asked to use an unsupported protocol.
2320 #
Christian Heimes529525f2018-05-23 22:24:45 +02002321 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2322 # tries to send session tickets after handshake.
2323 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002324 #
2325 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2326 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002327 self.server.conn_errors.append(str(e))
2328 if self.server.chatty:
2329 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2330 self.running = False
2331 self.close()
2332 return False
2333 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002334 # OSError may occur with wrong protocols, e.g. both
2335 # sides use PROTOCOL_TLS_SERVER.
2336 #
2337 # XXX Various errors can have happened here, for example
2338 # a mismatching protocol version, an invalid certificate,
2339 # or a low-level bug. This should be made more discriminating.
2340 #
2341 # bpo-31323: Store the exception as string to prevent
2342 # a reference leak: server -> conn_errors -> exception
2343 # -> traceback -> self (ConnectionHandler) -> server
2344 self.server.conn_errors.append(str(e))
2345 if self.server.chatty:
2346 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2347 self.running = False
2348 self.server.stop()
2349 self.close()
2350 return False
2351 else:
2352 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2353 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2354 cert = self.sslconn.getpeercert()
2355 if support.verbose and self.server.chatty:
2356 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2357 cert_binary = self.sslconn.getpeercert(True)
2358 if support.verbose and self.server.chatty:
2359 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2360 cipher = self.sslconn.cipher()
2361 if support.verbose and self.server.chatty:
2362 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2363 sys.stdout.write(" server: selected protocol is now "
2364 + str(self.sslconn.selected_npn_protocol()) + "\n")
2365 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002366
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002367 def read(self):
2368 if self.sslconn:
2369 return self.sslconn.read()
2370 else:
2371 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002372
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002373 def write(self, bytes):
2374 if self.sslconn:
2375 return self.sslconn.write(bytes)
2376 else:
2377 return self.sock.send(bytes)
2378
2379 def close(self):
2380 if self.sslconn:
2381 self.sslconn.close()
2382 else:
2383 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002384
Antoine Pitrou480a1242010-04-28 21:37:09 +00002385 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002386 self.running = True
2387 if not self.server.starttls_server:
2388 if not self.wrap_conn():
2389 return
2390 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002391 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002392 msg = self.read()
2393 stripped = msg.strip()
2394 if not stripped:
2395 # eof, so quit this handler
2396 self.running = False
2397 try:
2398 self.sock = self.sslconn.unwrap()
2399 except OSError:
2400 # Many tests shut the TCP connection down
2401 # without an SSL shutdown. This causes
2402 # unwrap() to raise OSError with errno=0!
2403 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002404 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002405 self.sslconn = None
2406 self.close()
2407 elif stripped == b'over':
2408 if support.verbose and self.server.connectionchatty:
2409 sys.stdout.write(" server: client closed connection\n")
2410 self.close()
2411 return
2412 elif (self.server.starttls_server and
2413 stripped == b'STARTTLS'):
2414 if support.verbose and self.server.connectionchatty:
2415 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2416 self.write(b"OK\n")
2417 if not self.wrap_conn():
2418 return
2419 elif (self.server.starttls_server and self.sslconn
2420 and stripped == b'ENDTLS'):
2421 if support.verbose and self.server.connectionchatty:
2422 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2423 self.write(b"OK\n")
2424 self.sock = self.sslconn.unwrap()
2425 self.sslconn = None
2426 if support.verbose and self.server.connectionchatty:
2427 sys.stdout.write(" server: connection is now unencrypted...\n")
2428 elif stripped == b'CB tls-unique':
2429 if support.verbose and self.server.connectionchatty:
2430 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2431 data = self.sslconn.get_channel_binding("tls-unique")
2432 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002433 elif stripped == b'PHA':
2434 if support.verbose and self.server.connectionchatty:
2435 sys.stdout.write(" server: initiating post handshake auth\n")
2436 try:
2437 self.sslconn.verify_client_post_handshake()
2438 except ssl.SSLError as e:
2439 self.write(repr(e).encode("us-ascii") + b"\n")
2440 else:
2441 self.write(b"OK\n")
2442 elif stripped == b'HASCERT':
2443 if self.sslconn.getpeercert() is not None:
2444 self.write(b'TRUE\n')
2445 else:
2446 self.write(b'FALSE\n')
2447 elif stripped == b'GETCERT':
2448 cert = self.sslconn.getpeercert()
2449 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002450 else:
2451 if (support.verbose and
2452 self.server.connectionchatty):
2453 ctype = (self.sslconn and "encrypted") or "unencrypted"
2454 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2455 % (msg, ctype, msg.lower(), ctype))
2456 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002457 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002458 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2459 # when connection is not shut down gracefully.
2460 if self.server.chatty and support.verbose:
2461 sys.stdout.write(
2462 " Connection reset by peer: {}\n".format(
2463 self.addr)
2464 )
2465 self.close()
2466 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002467 except ssl.SSLError as err:
2468 # On Windows sometimes test_pha_required_nocert receives the
2469 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2470 # before the 'tlsv13 alert certificate required' exception.
2471 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2472 # is received test_pha_required_nocert fails with ConnectionResetError
2473 # because the underlying socket is closed
2474 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2475 if self.server.chatty and support.verbose:
2476 sys.stdout.write(err.args[1])
2477 # test_pha_required_nocert is expecting this exception
2478 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 except OSError:
2480 if self.server.chatty:
2481 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002482 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002484
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002485 # normally, we'd just stop here, but for the test
2486 # harness, we want to stop the server
2487 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002488
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002489 def __init__(self, certificate=None, ssl_version=None,
2490 certreqs=None, cacerts=None,
2491 chatty=True, connectionchatty=False, starttls_server=False,
2492 npn_protocols=None, alpn_protocols=None,
2493 ciphers=None, context=None):
2494 if context:
2495 self.context = context
2496 else:
2497 self.context = ssl.SSLContext(ssl_version
2498 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002499 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002500 self.context.verify_mode = (certreqs if certreqs is not None
2501 else ssl.CERT_NONE)
2502 if cacerts:
2503 self.context.load_verify_locations(cacerts)
2504 if certificate:
2505 self.context.load_cert_chain(certificate)
2506 if npn_protocols:
2507 self.context.set_npn_protocols(npn_protocols)
2508 if alpn_protocols:
2509 self.context.set_alpn_protocols(alpn_protocols)
2510 if ciphers:
2511 self.context.set_ciphers(ciphers)
2512 self.chatty = chatty
2513 self.connectionchatty = connectionchatty
2514 self.starttls_server = starttls_server
2515 self.sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03002516 self.port = socket_helper.bind_port(self.sock)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517 self.flag = None
2518 self.active = False
2519 self.selected_npn_protocols = []
2520 self.selected_alpn_protocols = []
2521 self.shared_ciphers = []
2522 self.conn_errors = []
2523 threading.Thread.__init__(self)
2524 self.daemon = True
2525
2526 def __enter__(self):
2527 self.start(threading.Event())
2528 self.flag.wait()
2529 return self
2530
2531 def __exit__(self, *args):
2532 self.stop()
2533 self.join()
2534
2535 def start(self, flag=None):
2536 self.flag = flag
2537 threading.Thread.start(self)
2538
2539 def run(self):
2540 self.sock.settimeout(0.05)
2541 self.sock.listen()
2542 self.active = True
2543 if self.flag:
2544 # signal an event
2545 self.flag.set()
2546 while self.active:
2547 try:
2548 newconn, connaddr = self.sock.accept()
2549 if support.verbose and self.chatty:
2550 sys.stdout.write(' server: new connection from '
2551 + repr(connaddr) + '\n')
2552 handler = self.ConnectionHandler(self, newconn, connaddr)
2553 handler.start()
2554 handler.join()
2555 except socket.timeout:
2556 pass
2557 except KeyboardInterrupt:
2558 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002559 except BaseException as e:
2560 if support.verbose and self.chatty:
2561 sys.stdout.write(
2562 ' connection handling failed: ' + repr(e) + '\n')
2563
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002564 self.sock.close()
2565
2566 def stop(self):
2567 self.active = False
2568
2569class AsyncoreEchoServer(threading.Thread):
2570
2571 # this one's based on asyncore.dispatcher
2572
2573 class EchoServer (asyncore.dispatcher):
2574
2575 class ConnectionHandler(asyncore.dispatcher_with_send):
2576
2577 def __init__(self, conn, certfile):
2578 self.socket = test_wrap_socket(conn, server_side=True,
2579 certfile=certfile,
2580 do_handshake_on_connect=False)
2581 asyncore.dispatcher_with_send.__init__(self, self.socket)
2582 self._ssl_accepting = True
2583 self._do_ssl_handshake()
2584
2585 def readable(self):
2586 if isinstance(self.socket, ssl.SSLSocket):
2587 while self.socket.pending() > 0:
2588 self.handle_read_event()
2589 return True
2590
2591 def _do_ssl_handshake(self):
2592 try:
2593 self.socket.do_handshake()
2594 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2595 return
2596 except ssl.SSLEOFError:
2597 return self.handle_close()
2598 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002599 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002600 except OSError as err:
2601 if err.args[0] == errno.ECONNABORTED:
2602 return self.handle_close()
2603 else:
2604 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002605
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002606 def handle_read(self):
2607 if self._ssl_accepting:
2608 self._do_ssl_handshake()
2609 else:
2610 data = self.recv(1024)
2611 if support.verbose:
2612 sys.stdout.write(" server: read %s from client\n" % repr(data))
2613 if not data:
2614 self.close()
2615 else:
2616 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002617
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002618 def handle_close(self):
2619 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002620 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002621 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002622
2623 def handle_error(self):
2624 raise
2625
Trent Nelson78520002008-04-10 20:54:35 +00002626 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002627 self.certfile = certfile
2628 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Serhiy Storchaka16994912020-04-25 10:06:29 +03002629 self.port = socket_helper.bind_port(sock, '')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002630 asyncore.dispatcher.__init__(self, sock)
2631 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002632
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002633 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002634 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002635 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2636 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002637
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002638 def handle_error(self):
2639 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002640
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002641 def __init__(self, certfile):
2642 self.flag = None
2643 self.active = False
2644 self.server = self.EchoServer(certfile)
2645 self.port = self.server.port
2646 threading.Thread.__init__(self)
2647 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002648
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002649 def __str__(self):
2650 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002651
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002652 def __enter__(self):
2653 self.start(threading.Event())
2654 self.flag.wait()
2655 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002656
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002657 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002658 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002659 sys.stdout.write(" cleanup: stopping server.\n")
2660 self.stop()
2661 if support.verbose:
2662 sys.stdout.write(" cleanup: joining server thread.\n")
2663 self.join()
2664 if support.verbose:
2665 sys.stdout.write(" cleanup: successfully joined.\n")
2666 # make sure that ConnectionHandler is removed from socket_map
2667 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002668
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002669 def start (self, flag=None):
2670 self.flag = flag
2671 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002672
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002673 def run(self):
2674 self.active = True
2675 if self.flag:
2676 self.flag.set()
2677 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002678 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002679 asyncore.loop(1)
2680 except:
2681 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002682
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 def stop(self):
2684 self.active = False
2685 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002686
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687def server_params_test(client_context, server_context, indata=b"FOO\n",
2688 chatty=True, connectionchatty=False, sni_name=None,
2689 session=None):
2690 """
2691 Launch a server, connect a client to it and try various reads
2692 and writes.
2693 """
2694 stats = {}
2695 server = ThreadedEchoServer(context=server_context,
2696 chatty=chatty,
2697 connectionchatty=False)
2698 with server:
2699 with client_context.wrap_socket(socket.socket(),
2700 server_hostname=sni_name, session=session) as s:
2701 s.connect((HOST, server.port))
2702 for arg in [indata, bytearray(indata), memoryview(indata)]:
2703 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002704 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002705 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002706 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002707 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002708 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709 if connectionchatty:
2710 if support.verbose:
2711 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002712 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002714 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2715 % (outdata[:20], len(outdata),
2716 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717 s.write(b"over\n")
2718 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002719 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002720 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002721 stats.update({
2722 'compression': s.compression(),
2723 'cipher': s.cipher(),
2724 'peercert': s.getpeercert(),
2725 'client_alpn_protocol': s.selected_alpn_protocol(),
2726 'client_npn_protocol': s.selected_npn_protocol(),
2727 'version': s.version(),
2728 'session_reused': s.session_reused,
2729 'session': s.session,
2730 })
2731 s.close()
2732 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2733 stats['server_npn_protocols'] = server.selected_npn_protocols
2734 stats['server_shared_ciphers'] = server.shared_ciphers
2735 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002736
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002737def try_protocol_combo(server_protocol, client_protocol, expect_success,
2738 certsreqs=None, server_options=0, client_options=0):
2739 """
2740 Try to SSL-connect using *client_protocol* to *server_protocol*.
2741 If *expect_success* is true, assert that the connection succeeds,
2742 if it's false, assert that the connection fails.
2743 Also, if *expect_success* is a string, assert that it is the protocol
2744 version actually used by the connection.
2745 """
2746 if certsreqs is None:
2747 certsreqs = ssl.CERT_NONE
2748 certtype = {
2749 ssl.CERT_NONE: "CERT_NONE",
2750 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2751 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2752 }[certsreqs]
2753 if support.verbose:
2754 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2755 sys.stdout.write(formatstr %
2756 (ssl.get_protocol_name(client_protocol),
2757 ssl.get_protocol_name(server_protocol),
2758 certtype))
2759 client_context = ssl.SSLContext(client_protocol)
2760 client_context.options |= client_options
2761 server_context = ssl.SSLContext(server_protocol)
2762 server_context.options |= server_options
2763
Victor Stinner3ef63442019-02-19 18:06:03 +01002764 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2765 if (min_version is not None
2766 # SSLContext.minimum_version is only available on recent OpenSSL
2767 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2768 and hasattr(server_context, 'minimum_version')
2769 and server_protocol == ssl.PROTOCOL_TLS
2770 and server_context.minimum_version > min_version):
2771 # If OpenSSL configuration is strict and requires more recent TLS
2772 # version, we have to change the minimum to test old TLS versions.
2773 server_context.minimum_version = min_version
2774
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2776 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2777 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002778 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002779 client_context.set_ciphers("ALL")
2780
2781 for ctx in (client_context, server_context):
2782 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002783 ctx.load_cert_chain(SIGNED_CERTFILE)
2784 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002785 try:
2786 stats = server_params_test(client_context, server_context,
2787 chatty=False, connectionchatty=False)
2788 # Protocol mismatch can result in either an SSLError, or a
2789 # "Connection reset by peer" error.
2790 except ssl.SSLError:
2791 if expect_success:
2792 raise
2793 except OSError as e:
2794 if expect_success or e.errno != errno.ECONNRESET:
2795 raise
2796 else:
2797 if not expect_success:
2798 raise AssertionError(
2799 "Client protocol %s succeeded with server protocol %s!"
2800 % (ssl.get_protocol_name(client_protocol),
2801 ssl.get_protocol_name(server_protocol)))
2802 elif (expect_success is not True
2803 and expect_success != stats['version']):
2804 raise AssertionError("version mismatch: expected %r, got %r"
2805 % (expect_success, stats['version']))
2806
2807
2808class ThreadedTests(unittest.TestCase):
2809
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 def test_echo(self):
2811 """Basic test of an SSL client connecting to a server"""
2812 if support.verbose:
2813 sys.stdout.write("\n")
2814 for protocol in PROTOCOLS:
2815 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2816 continue
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02002817 if not has_tls_protocol(protocol):
2818 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002819 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2820 context = ssl.SSLContext(protocol)
2821 context.load_cert_chain(CERTFILE)
2822 server_params_test(context, context,
2823 chatty=True, connectionchatty=True)
2824
Christian Heimesa170fa12017-09-15 20:27:30 +02002825 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002826
2827 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2828 server_params_test(client_context=client_context,
2829 server_context=server_context,
2830 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002831 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002832
2833 client_context.check_hostname = False
2834 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2835 with self.assertRaises(ssl.SSLError) as e:
2836 server_params_test(client_context=server_context,
2837 server_context=client_context,
2838 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002839 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002840 self.assertIn('called a function you should not call',
2841 str(e.exception))
2842
2843 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2844 with self.assertRaises(ssl.SSLError) as e:
2845 server_params_test(client_context=server_context,
2846 server_context=server_context,
2847 chatty=True, connectionchatty=True)
2848 self.assertIn('called a function you should not call',
2849 str(e.exception))
2850
2851 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2852 with self.assertRaises(ssl.SSLError) as e:
2853 server_params_test(client_context=server_context,
2854 server_context=client_context,
2855 chatty=True, connectionchatty=True)
2856 self.assertIn('called a function you should not call',
2857 str(e.exception))
2858
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002859 def test_getpeercert(self):
2860 if support.verbose:
2861 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002862
2863 client_context, server_context, hostname = testing_context()
2864 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002865 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002866 with client_context.wrap_socket(socket.socket(),
2867 do_handshake_on_connect=False,
2868 server_hostname=hostname) as s:
2869 s.connect((HOST, server.port))
2870 # getpeercert() raise ValueError while the handshake isn't
2871 # done.
2872 with self.assertRaises(ValueError):
2873 s.getpeercert()
2874 s.do_handshake()
2875 cert = s.getpeercert()
2876 self.assertTrue(cert, "Can't get peer certificate.")
2877 cipher = s.cipher()
2878 if support.verbose:
2879 sys.stdout.write(pprint.pformat(cert) + '\n')
2880 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2881 if 'subject' not in cert:
2882 self.fail("No subject field in certificate: %s." %
2883 pprint.pformat(cert))
2884 if ((('organizationName', 'Python Software Foundation'),)
2885 not in cert['subject']):
2886 self.fail(
2887 "Missing or invalid 'organizationName' field in certificate subject; "
2888 "should be 'Python Software Foundation'.")
2889 self.assertIn('notBefore', cert)
2890 self.assertIn('notAfter', cert)
2891 before = ssl.cert_time_to_seconds(cert['notBefore'])
2892 after = ssl.cert_time_to_seconds(cert['notAfter'])
2893 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002894
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002895 @unittest.skipUnless(have_verify_flags(),
2896 "verify_flags need OpenSSL > 0.9.8")
2897 def test_crl_check(self):
2898 if support.verbose:
2899 sys.stdout.write("\n")
2900
Christian Heimesa170fa12017-09-15 20:27:30 +02002901 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002902
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002903 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002904 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002905
2906 # VERIFY_DEFAULT should pass
2907 server = ThreadedEchoServer(context=server_context, chatty=True)
2908 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002909 with client_context.wrap_socket(socket.socket(),
2910 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002911 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002912 cert = s.getpeercert()
2913 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002914
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002915 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002916 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002917
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002918 server = ThreadedEchoServer(context=server_context, chatty=True)
2919 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002920 with client_context.wrap_socket(socket.socket(),
2921 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002922 with self.assertRaisesRegex(ssl.SSLError,
2923 "certificate verify failed"):
2924 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002925
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002926 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002927 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002928
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002929 server = ThreadedEchoServer(context=server_context, chatty=True)
2930 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002931 with client_context.wrap_socket(socket.socket(),
2932 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002933 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 cert = s.getpeercert()
2935 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002936
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 def test_check_hostname(self):
2938 if support.verbose:
2939 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002940
Christian Heimesa170fa12017-09-15 20:27:30 +02002941 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 # correct hostname should verify
2944 server = ThreadedEchoServer(context=server_context, chatty=True)
2945 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002946 with client_context.wrap_socket(socket.socket(),
2947 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 s.connect((HOST, server.port))
2949 cert = s.getpeercert()
2950 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002951
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002952 # incorrect hostname should raise an exception
2953 server = ThreadedEchoServer(context=server_context, chatty=True)
2954 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002955 with client_context.wrap_socket(socket.socket(),
2956 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002957 with self.assertRaisesRegex(
2958 ssl.CertificateError,
2959 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002960 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002961
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002962 # missing server_hostname arg should cause an exception, too
2963 server = ThreadedEchoServer(context=server_context, chatty=True)
2964 with server:
2965 with socket.socket() as s:
2966 with self.assertRaisesRegex(ValueError,
2967 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002968 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002969
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002970 def test_ecc_cert(self):
2971 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2972 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002973 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002974 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2975
2976 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2977 # load ECC cert
2978 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2979
2980 # correct hostname should verify
2981 server = ThreadedEchoServer(context=server_context, chatty=True)
2982 with server:
2983 with client_context.wrap_socket(socket.socket(),
2984 server_hostname=hostname) as s:
2985 s.connect((HOST, server.port))
2986 cert = s.getpeercert()
2987 self.assertTrue(cert, "Can't get peer certificate.")
2988 cipher = s.cipher()[0].split('-')
2989 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2990
2991 def test_dual_rsa_ecc(self):
2992 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2993 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002994 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2995 # algorithms.
2996 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002997 # only ECDSA certs
2998 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2999 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
3000
3001 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3002 # load ECC and RSA key/cert pairs
3003 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
3004 server_context.load_cert_chain(SIGNED_CERTFILE)
3005
3006 # correct hostname should verify
3007 server = ThreadedEchoServer(context=server_context, chatty=True)
3008 with server:
3009 with client_context.wrap_socket(socket.socket(),
3010 server_hostname=hostname) as s:
3011 s.connect((HOST, server.port))
3012 cert = s.getpeercert()
3013 self.assertTrue(cert, "Can't get peer certificate.")
3014 cipher = s.cipher()[0].split('-')
3015 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3016
Christian Heimes66e57422018-01-29 14:25:13 +01003017 def test_check_hostname_idn(self):
3018 if support.verbose:
3019 sys.stdout.write("\n")
3020
Christian Heimes11a14932018-02-24 02:35:08 +01003021 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003022 server_context.load_cert_chain(IDNSANSFILE)
3023
Christian Heimes11a14932018-02-24 02:35:08 +01003024 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003025 context.verify_mode = ssl.CERT_REQUIRED
3026 context.check_hostname = True
3027 context.load_verify_locations(SIGNING_CA)
3028
3029 # correct hostname should verify, when specified in several
3030 # different ways
3031 idn_hostnames = [
3032 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003033 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003034 ('xn--knig-5qa.idn.pythontest.net',
3035 'xn--knig-5qa.idn.pythontest.net'),
3036 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003037 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003038
3039 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003040 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003041 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3042 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3043 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003044 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3045
3046 # ('königsgäßchen.idna2008.pythontest.net',
3047 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3048 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3049 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3050 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3051 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3052
Christian Heimes66e57422018-01-29 14:25:13 +01003053 ]
3054 for server_hostname, expected_hostname in idn_hostnames:
3055 server = ThreadedEchoServer(context=server_context, chatty=True)
3056 with server:
3057 with context.wrap_socket(socket.socket(),
3058 server_hostname=server_hostname) as s:
3059 self.assertEqual(s.server_hostname, expected_hostname)
3060 s.connect((HOST, server.port))
3061 cert = s.getpeercert()
3062 self.assertEqual(s.server_hostname, expected_hostname)
3063 self.assertTrue(cert, "Can't get peer certificate.")
3064
Christian Heimes66e57422018-01-29 14:25:13 +01003065 # incorrect hostname should raise an exception
3066 server = ThreadedEchoServer(context=server_context, chatty=True)
3067 with server:
3068 with context.wrap_socket(socket.socket(),
3069 server_hostname="python.example.org") as s:
3070 with self.assertRaises(ssl.CertificateError):
3071 s.connect((HOST, server.port))
3072
Christian Heimes529525f2018-05-23 22:24:45 +02003073 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 """Connecting when the server rejects the client's certificate
3075
3076 Launch a server with CERT_REQUIRED, and check that trying to
3077 connect to it with a wrong client certificate fails.
3078 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003079 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003080 # load client cert that is not signed by trusted CA
3081 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003082 # require TLS client authentication
3083 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003084 # TLS 1.3 has different handshake
3085 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003086
3087 server = ThreadedEchoServer(
3088 context=server_context, chatty=True, connectionchatty=True,
3089 )
3090
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003092 client_context.wrap_socket(socket.socket(),
3093 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003094 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003095 # Expect either an SSL error about the server rejecting
3096 # the connection, or a low-level connection reset (which
3097 # sometimes happens on Windows)
3098 s.connect((HOST, server.port))
3099 except ssl.SSLError as e:
3100 if support.verbose:
3101 sys.stdout.write("\nSSLError is %r\n" % e)
3102 except OSError as e:
3103 if e.errno != errno.ECONNRESET:
3104 raise
3105 if support.verbose:
3106 sys.stdout.write("\nsocket.error is %r\n" % e)
3107 else:
3108 self.fail("Use of invalid cert should have failed!")
3109
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003110 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003111 def test_wrong_cert_tls13(self):
3112 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003113 # load client cert that is not signed by trusted CA
3114 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003115 server_context.verify_mode = ssl.CERT_REQUIRED
3116 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3117 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3118
3119 server = ThreadedEchoServer(
3120 context=server_context, chatty=True, connectionchatty=True,
3121 )
3122 with server, \
3123 client_context.wrap_socket(socket.socket(),
3124 server_hostname=hostname) as s:
3125 # TLS 1.3 perform client cert exchange after handshake
3126 s.connect((HOST, server.port))
3127 try:
3128 s.write(b'data')
3129 s.read(4)
3130 except ssl.SSLError as e:
3131 if support.verbose:
3132 sys.stdout.write("\nSSLError is %r\n" % e)
3133 except OSError as e:
3134 if e.errno != errno.ECONNRESET:
3135 raise
3136 if support.verbose:
3137 sys.stdout.write("\nsocket.error is %r\n" % e)
3138 else:
3139 self.fail("Use of invalid cert should have failed!")
3140
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003141 def test_rude_shutdown(self):
3142 """A brutal shutdown of an SSL server should raise an OSError
3143 in the client when attempting handshake.
3144 """
3145 listener_ready = threading.Event()
3146 listener_gone = threading.Event()
3147
3148 s = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03003149 port = socket_helper.bind_port(s, HOST)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003150
3151 # `listener` runs in a thread. It sits in an accept() until
3152 # the main thread connects. Then it rudely closes the socket,
3153 # and sets Event `listener_gone` to let the main thread know
3154 # the socket is gone.
3155 def listener():
3156 s.listen()
3157 listener_ready.set()
3158 newsock, addr = s.accept()
3159 newsock.close()
3160 s.close()
3161 listener_gone.set()
3162
3163 def connector():
3164 listener_ready.wait()
3165 with socket.socket() as c:
3166 c.connect((HOST, port))
3167 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003168 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169 ssl_sock = test_wrap_socket(c)
3170 except OSError:
3171 pass
3172 else:
3173 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003174
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003175 t = threading.Thread(target=listener)
3176 t.start()
3177 try:
3178 connector()
3179 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003180 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003181
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003182 def test_ssl_cert_verify_error(self):
3183 if support.verbose:
3184 sys.stdout.write("\n")
3185
3186 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3187 server_context.load_cert_chain(SIGNED_CERTFILE)
3188
3189 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3190
3191 server = ThreadedEchoServer(context=server_context, chatty=True)
3192 with server:
3193 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003194 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003195 try:
3196 s.connect((HOST, server.port))
3197 except ssl.SSLError as e:
3198 msg = 'unable to get local issuer certificate'
3199 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3200 self.assertEqual(e.verify_code, 20)
3201 self.assertEqual(e.verify_message, msg)
3202 self.assertIn(msg, repr(e))
3203 self.assertIn('certificate verify failed', repr(e))
3204
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003205 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 def test_protocol_sslv2(self):
3207 """Connecting to an SSLv2 server with various client options"""
3208 if support.verbose:
3209 sys.stdout.write("\n")
3210 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3211 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3212 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003214 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003215 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3216 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3217 # SSLv23 client with specific SSL options
3218 if no_sslv2_implies_sslv3_hello():
3219 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003220 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003222 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003223 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003224 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003225 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003226
Christian Heimesa170fa12017-09-15 20:27:30 +02003227 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003228 """Connecting to an SSLv23 server with various client options"""
3229 if support.verbose:
3230 sys.stdout.write("\n")
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003231 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003232 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003233 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003234 except OSError as x:
3235 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3236 if support.verbose:
3237 sys.stdout.write(
3238 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3239 % str(x))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003240 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003241 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3242 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003243 if has_tls_version('TLSv1'):
3244 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003245
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003246 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003247 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3248 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003249 if has_tls_version('TLSv1'):
3250 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003251
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003252 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003253 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3254 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003255 if has_tls_version('TLSv1'):
3256 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003257
3258 # Server with specific SSL options
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003259 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003260 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003261 server_options=ssl.OP_NO_SSLv3)
3262 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003263 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003264 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003265 if has_tls_version('TLSv1'):
3266 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3267 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003268
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003269 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003270 def test_protocol_sslv3(self):
3271 """Connecting to an SSLv3 server with various client options"""
3272 if support.verbose:
3273 sys.stdout.write("\n")
3274 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3275 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3276 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003277 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003278 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003279 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003280 client_options=ssl.OP_NO_SSLv3)
3281 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3282 if no_sslv2_implies_sslv3_hello():
3283 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003284 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003285 False, client_options=ssl.OP_NO_SSLv2)
3286
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003287 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003288 def test_protocol_tlsv1(self):
3289 """Connecting to a TLSv1 server with various client options"""
3290 if support.verbose:
3291 sys.stdout.write("\n")
3292 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3293 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3294 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003295 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003296 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003297 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003298 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003299 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 client_options=ssl.OP_NO_TLSv1)
3301
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003302 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003303 def test_protocol_tlsv1_1(self):
3304 """Connecting to a TLSv1.1 server with various client options.
3305 Testing against older TLS versions."""
3306 if support.verbose:
3307 sys.stdout.write("\n")
3308 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003309 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003311 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003313 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003314 client_options=ssl.OP_NO_TLSv1_1)
3315
Christian Heimesa170fa12017-09-15 20:27:30 +02003316 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003317 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3318 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003319
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003320 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003321 def test_protocol_tlsv1_2(self):
3322 """Connecting to a TLSv1.2 server with various client options.
3323 Testing against older TLS versions."""
3324 if support.verbose:
3325 sys.stdout.write("\n")
3326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3327 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3328 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003329 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003331 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003332 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003333 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003334 client_options=ssl.OP_NO_TLSv1_2)
3335
Christian Heimesa170fa12017-09-15 20:27:30 +02003336 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003337 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3338 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3339 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3340 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3341
3342 def test_starttls(self):
3343 """Switching from clear text to encrypted and back again."""
3344 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3345
3346 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003347 starttls_server=True,
3348 chatty=True,
3349 connectionchatty=True)
3350 wrapped = False
3351 with server:
3352 s = socket.socket()
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03003353 s.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003354 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003355 if support.verbose:
3356 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003357 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003358 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003359 sys.stdout.write(
3360 " client: sending %r...\n" % indata)
3361 if wrapped:
3362 conn.write(indata)
3363 outdata = conn.read()
3364 else:
3365 s.send(indata)
3366 outdata = s.recv(1024)
3367 msg = outdata.strip().lower()
3368 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3369 # STARTTLS ok, switch to secure mode
3370 if support.verbose:
3371 sys.stdout.write(
3372 " client: read %r from server, starting TLS...\n"
3373 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003374 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003375 wrapped = True
3376 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3377 # ENDTLS ok, switch back to clear text
3378 if support.verbose:
3379 sys.stdout.write(
3380 " client: read %r from server, ending TLS...\n"
3381 % msg)
3382 s = conn.unwrap()
3383 wrapped = False
3384 else:
3385 if support.verbose:
3386 sys.stdout.write(
3387 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003388 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003389 sys.stdout.write(" client: closing connection.\n")
3390 if wrapped:
3391 conn.write(b"over\n")
3392 else:
3393 s.send(b"over\n")
3394 if wrapped:
3395 conn.close()
3396 else:
3397 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003398
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003399 def test_socketserver(self):
3400 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003401 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003402 # try to connect
3403 if support.verbose:
3404 sys.stdout.write('\n')
3405 with open(CERTFILE, 'rb') as f:
3406 d1 = f.read()
3407 d2 = ''
3408 # now fetch the same data from the HTTPS server
3409 url = 'https://localhost:%d/%s' % (
3410 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003411 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003412 f = urllib.request.urlopen(url, context=context)
3413 try:
3414 dlen = f.info().get("content-length")
3415 if dlen and (int(dlen) > 0):
3416 d2 = f.read(int(dlen))
3417 if support.verbose:
3418 sys.stdout.write(
3419 " client: read %d bytes from remote server '%s'\n"
3420 % (len(d2), server))
3421 finally:
3422 f.close()
3423 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003424
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003425 def test_asyncore_server(self):
3426 """Check the example asyncore integration."""
3427 if support.verbose:
3428 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003429
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003430 indata = b"FOO\n"
3431 server = AsyncoreEchoServer(CERTFILE)
3432 with server:
3433 s = test_wrap_socket(socket.socket())
3434 s.connect(('127.0.0.1', server.port))
3435 if support.verbose:
3436 sys.stdout.write(
3437 " client: sending %r...\n" % indata)
3438 s.write(indata)
3439 outdata = s.read()
3440 if support.verbose:
3441 sys.stdout.write(" client: read %r\n" % outdata)
3442 if outdata != indata.lower():
3443 self.fail(
3444 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3445 % (outdata[:20], len(outdata),
3446 indata[:20].lower(), len(indata)))
3447 s.write(b"over\n")
3448 if support.verbose:
3449 sys.stdout.write(" client: closing connection.\n")
3450 s.close()
3451 if support.verbose:
3452 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003453
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003454 def test_recv_send(self):
3455 """Test recv(), send() and friends."""
3456 if support.verbose:
3457 sys.stdout.write("\n")
3458
3459 server = ThreadedEchoServer(CERTFILE,
3460 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003461 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003462 cacerts=CERTFILE,
3463 chatty=True,
3464 connectionchatty=False)
3465 with server:
3466 s = test_wrap_socket(socket.socket(),
3467 server_side=False,
3468 certfile=CERTFILE,
3469 ca_certs=CERTFILE,
3470 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003471 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003472 s.connect((HOST, server.port))
3473 # helper methods for standardising recv* method signatures
3474 def _recv_into():
3475 b = bytearray(b"\0"*100)
3476 count = s.recv_into(b)
3477 return b[:count]
3478
3479 def _recvfrom_into():
3480 b = bytearray(b"\0"*100)
3481 count, addr = s.recvfrom_into(b)
3482 return b[:count]
3483
3484 # (name, method, expect success?, *args, return value func)
3485 send_methods = [
3486 ('send', s.send, True, [], len),
3487 ('sendto', s.sendto, False, ["some.address"], len),
3488 ('sendall', s.sendall, True, [], lambda x: None),
3489 ]
3490 # (name, method, whether to expect success, *args)
3491 recv_methods = [
3492 ('recv', s.recv, True, []),
3493 ('recvfrom', s.recvfrom, False, ["some.address"]),
3494 ('recv_into', _recv_into, True, []),
3495 ('recvfrom_into', _recvfrom_into, False, []),
3496 ]
3497 data_prefix = "PREFIX_"
3498
3499 for (meth_name, send_meth, expect_success, args,
3500 ret_val_meth) in send_methods:
3501 indata = (data_prefix + meth_name).encode('ascii')
3502 try:
3503 ret = send_meth(indata, *args)
3504 msg = "sending with {}".format(meth_name)
3505 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3506 outdata = s.read()
3507 if outdata != indata.lower():
3508 self.fail(
3509 "While sending with <<{name:s}>> bad data "
3510 "<<{outdata:r}>> ({nout:d}) received; "
3511 "expected <<{indata:r}>> ({nin:d})\n".format(
3512 name=meth_name, outdata=outdata[:20],
3513 nout=len(outdata),
3514 indata=indata[:20], nin=len(indata)
3515 )
3516 )
3517 except ValueError as e:
3518 if expect_success:
3519 self.fail(
3520 "Failed to send with method <<{name:s}>>; "
3521 "expected to succeed.\n".format(name=meth_name)
3522 )
3523 if not str(e).startswith(meth_name):
3524 self.fail(
3525 "Method <<{name:s}>> failed with unexpected "
3526 "exception message: {exp:s}\n".format(
3527 name=meth_name, exp=e
3528 )
3529 )
3530
3531 for meth_name, recv_meth, expect_success, args in recv_methods:
3532 indata = (data_prefix + meth_name).encode('ascii')
3533 try:
3534 s.send(indata)
3535 outdata = recv_meth(*args)
3536 if outdata != indata.lower():
3537 self.fail(
3538 "While receiving with <<{name:s}>> bad data "
3539 "<<{outdata:r}>> ({nout:d}) received; "
3540 "expected <<{indata:r}>> ({nin:d})\n".format(
3541 name=meth_name, outdata=outdata[:20],
3542 nout=len(outdata),
3543 indata=indata[:20], nin=len(indata)
3544 )
3545 )
3546 except ValueError as e:
3547 if expect_success:
3548 self.fail(
3549 "Failed to receive with method <<{name:s}>>; "
3550 "expected to succeed.\n".format(name=meth_name)
3551 )
3552 if not str(e).startswith(meth_name):
3553 self.fail(
3554 "Method <<{name:s}>> failed with unexpected "
3555 "exception message: {exp:s}\n".format(
3556 name=meth_name, exp=e
3557 )
3558 )
3559 # consume data
3560 s.read()
3561
3562 # read(-1, buffer) is supported, even though read(-1) is not
3563 data = b"data"
3564 s.send(data)
3565 buffer = bytearray(len(data))
3566 self.assertEqual(s.read(-1, buffer), len(data))
3567 self.assertEqual(buffer, data)
3568
Christian Heimes888bbdc2017-09-07 14:18:21 -07003569 # sendall accepts bytes-like objects
3570 if ctypes is not None:
3571 ubyte = ctypes.c_ubyte * len(data)
3572 byteslike = ubyte.from_buffer_copy(data)
3573 s.sendall(byteslike)
3574 self.assertEqual(s.read(), data)
3575
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003576 # Make sure sendmsg et al are disallowed to avoid
3577 # inadvertent disclosure of data and/or corruption
3578 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003579 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003580 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3581 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3582 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003583 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003584 s.write(b"over\n")
3585
3586 self.assertRaises(ValueError, s.recv, -1)
3587 self.assertRaises(ValueError, s.read, -1)
3588
3589 s.close()
3590
3591 def test_recv_zero(self):
3592 server = ThreadedEchoServer(CERTFILE)
3593 server.__enter__()
3594 self.addCleanup(server.__exit__, None, None)
3595 s = socket.create_connection((HOST, server.port))
3596 self.addCleanup(s.close)
3597 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3598 self.addCleanup(s.close)
3599
3600 # recv/read(0) should return no data
3601 s.send(b"data")
3602 self.assertEqual(s.recv(0), b"")
3603 self.assertEqual(s.read(0), b"")
3604 self.assertEqual(s.read(), b"data")
3605
3606 # Should not block if the other end sends no data
3607 s.setblocking(False)
3608 self.assertEqual(s.recv(0), b"")
3609 self.assertEqual(s.recv_into(bytearray()), 0)
3610
3611 def test_nonblocking_send(self):
3612 server = ThreadedEchoServer(CERTFILE,
3613 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003614 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003615 cacerts=CERTFILE,
3616 chatty=True,
3617 connectionchatty=False)
3618 with server:
3619 s = test_wrap_socket(socket.socket(),
3620 server_side=False,
3621 certfile=CERTFILE,
3622 ca_certs=CERTFILE,
3623 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003624 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003625 s.connect((HOST, server.port))
3626 s.setblocking(False)
3627
3628 # If we keep sending data, at some point the buffers
3629 # will be full and the call will block
3630 buf = bytearray(8192)
3631 def fill_buffer():
3632 while True:
3633 s.send(buf)
3634 self.assertRaises((ssl.SSLWantWriteError,
3635 ssl.SSLWantReadError), fill_buffer)
3636
3637 # Now read all the output and discard it
3638 s.setblocking(True)
3639 s.close()
3640
3641 def test_handshake_timeout(self):
3642 # Issue #5103: SSL handshake must respect the socket timeout
3643 server = socket.socket(socket.AF_INET)
3644 host = "127.0.0.1"
Serhiy Storchaka16994912020-04-25 10:06:29 +03003645 port = socket_helper.bind_port(server)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003646 started = threading.Event()
3647 finish = False
3648
3649 def serve():
3650 server.listen()
3651 started.set()
3652 conns = []
3653 while not finish:
3654 r, w, e = select.select([server], [], [], 0.1)
3655 if server in r:
3656 # Let the socket hang around rather than having
3657 # it closed by garbage collection.
3658 conns.append(server.accept()[0])
3659 for sock in conns:
3660 sock.close()
3661
3662 t = threading.Thread(target=serve)
3663 t.start()
3664 started.wait()
3665
3666 try:
3667 try:
3668 c = socket.socket(socket.AF_INET)
3669 c.settimeout(0.2)
3670 c.connect((host, port))
3671 # Will attempt handshake and time out
3672 self.assertRaisesRegex(socket.timeout, "timed out",
3673 test_wrap_socket, c)
3674 finally:
3675 c.close()
3676 try:
3677 c = socket.socket(socket.AF_INET)
3678 c = test_wrap_socket(c)
3679 c.settimeout(0.2)
3680 # Will attempt handshake and time out
3681 self.assertRaisesRegex(socket.timeout, "timed out",
3682 c.connect, (host, port))
3683 finally:
3684 c.close()
3685 finally:
3686 finish = True
3687 t.join()
3688 server.close()
3689
3690 def test_server_accept(self):
3691 # Issue #16357: accept() on a SSLSocket created through
3692 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003693 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003694 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003695 context.load_verify_locations(SIGNING_CA)
3696 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003697 server = socket.socket(socket.AF_INET)
3698 host = "127.0.0.1"
Serhiy Storchaka16994912020-04-25 10:06:29 +03003699 port = socket_helper.bind_port(server)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003700 server = context.wrap_socket(server, server_side=True)
3701 self.assertTrue(server.server_side)
3702
3703 evt = threading.Event()
3704 remote = None
3705 peer = None
3706 def serve():
3707 nonlocal remote, peer
3708 server.listen()
3709 # Block on the accept and wait on the connection to close.
3710 evt.set()
3711 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003712 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003713
3714 t = threading.Thread(target=serve)
3715 t.start()
3716 # Client wait until server setup and perform a connect.
3717 evt.wait()
3718 client = context.wrap_socket(socket.socket())
3719 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003720 client.send(b'data')
3721 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003722 client_addr = client.getsockname()
3723 client.close()
3724 t.join()
3725 remote.close()
3726 server.close()
3727 # Sanity checks.
3728 self.assertIsInstance(remote, ssl.SSLSocket)
3729 self.assertEqual(peer, client_addr)
3730
3731 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003732 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003733 with context.wrap_socket(socket.socket()) as sock:
3734 with self.assertRaises(OSError) as cm:
3735 sock.getpeercert()
3736 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3737
3738 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003739 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003740 with context.wrap_socket(socket.socket()) as sock:
3741 with self.assertRaises(OSError) as cm:
3742 sock.do_handshake()
3743 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3744
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003745 def test_no_shared_ciphers(self):
3746 client_context, server_context, hostname = testing_context()
3747 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3748 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003749 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003750 client_context.set_ciphers("AES128")
3751 server_context.set_ciphers("AES256")
3752 with ThreadedEchoServer(context=server_context) as server:
3753 with client_context.wrap_socket(socket.socket(),
3754 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003755 with self.assertRaises(OSError):
3756 s.connect((HOST, server.port))
3757 self.assertIn("no shared cipher", server.conn_errors[0])
3758
3759 def test_version_basic(self):
3760 """
3761 Basic tests for SSLSocket.version().
3762 More tests are done in the test_protocol_*() methods.
3763 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003764 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3765 context.check_hostname = False
3766 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003767 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003768 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 chatty=False) as server:
3770 with context.wrap_socket(socket.socket()) as s:
3771 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003772 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003773 s.connect((HOST, server.port))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003774 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003775 self.assertEqual(s.version(), 'TLSv1.3')
3776 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003777 self.assertEqual(s.version(), 'TLSv1.2')
3778 else: # 0.9.8 to 1.0.1
3779 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003780 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003781 self.assertIs(s.version(), None)
3782
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003783 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003784 def test_tls1_3(self):
3785 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3786 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003787 context.options |= (
3788 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3789 )
3790 with ThreadedEchoServer(context=context) as server:
3791 with context.wrap_socket(socket.socket()) as s:
3792 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003793 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003794 'TLS_AES_256_GCM_SHA384',
3795 'TLS_CHACHA20_POLY1305_SHA256',
3796 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003797 })
3798 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003799
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003800 @requires_minimum_version
3801 @requires_tls_version('TLSv1_2')
3802 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003803 client_context, server_context, hostname = testing_context()
3804 # client TLSv1.0 to 1.2
3805 client_context.minimum_version = ssl.TLSVersion.TLSv1
3806 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3807 # server only TLSv1.2
3808 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3809 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3810
3811 with ThreadedEchoServer(context=server_context) as server:
3812 with client_context.wrap_socket(socket.socket(),
3813 server_hostname=hostname) as s:
3814 s.connect((HOST, server.port))
3815 self.assertEqual(s.version(), 'TLSv1.2')
3816
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003817 @requires_minimum_version
3818 @requires_tls_version('TLSv1_1')
3819 def test_min_max_version_tlsv1_1(self):
3820 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003821 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003822 client_context.minimum_version = ssl.TLSVersion.TLSv1
3823 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003824 server_context.minimum_version = ssl.TLSVersion.TLSv1
3825 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3826
3827 with ThreadedEchoServer(context=server_context) as server:
3828 with client_context.wrap_socket(socket.socket(),
3829 server_hostname=hostname) as s:
3830 s.connect((HOST, server.port))
3831 self.assertEqual(s.version(), 'TLSv1.1')
3832
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003833 @requires_minimum_version
3834 @requires_tls_version('TLSv1_2')
3835 def test_min_max_version_mismatch(self):
3836 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003837 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003838 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimesc9bc49c2019-09-11 19:24:47 +02003839 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003840 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimesde606ea2019-09-11 19:48:58 +02003841 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003842 with ThreadedEchoServer(context=server_context) as server:
3843 with client_context.wrap_socket(socket.socket(),
3844 server_hostname=hostname) as s:
3845 with self.assertRaises(ssl.SSLError) as e:
3846 s.connect((HOST, server.port))
3847 self.assertIn("alert", str(e.exception))
3848
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003849 @requires_minimum_version
3850 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003851 def test_min_max_version_sslv3(self):
3852 client_context, server_context, hostname = testing_context()
3853 server_context.minimum_version = ssl.TLSVersion.SSLv3
3854 client_context.minimum_version = ssl.TLSVersion.SSLv3
3855 client_context.maximum_version = ssl.TLSVersion.SSLv3
3856 with ThreadedEchoServer(context=server_context) as server:
3857 with client_context.wrap_socket(socket.socket(),
3858 server_hostname=hostname) as s:
3859 s.connect((HOST, server.port))
3860 self.assertEqual(s.version(), 'SSLv3')
3861
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3863 def test_default_ecdh_curve(self):
3864 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3865 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003866 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003867 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003868 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3869 # cipher name.
3870 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003871 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3872 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3873 # our default cipher list should prefer ECDH-based ciphers
3874 # automatically.
3875 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3876 context.set_ciphers("ECCdraft:ECDH")
3877 with ThreadedEchoServer(context=context) as server:
3878 with context.wrap_socket(socket.socket()) as s:
3879 s.connect((HOST, server.port))
3880 self.assertIn("ECDH", s.cipher()[0])
3881
3882 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3883 "'tls-unique' channel binding not available")
3884 def test_tls_unique_channel_binding(self):
3885 """Test tls-unique channel binding."""
3886 if support.verbose:
3887 sys.stdout.write("\n")
3888
Christian Heimes05d9fe32018-02-27 08:55:39 +01003889 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003890
3891 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003892 chatty=True,
3893 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003894
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003895 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003896 with client_context.wrap_socket(
3897 socket.socket(),
3898 server_hostname=hostname) as s:
3899 s.connect((HOST, server.port))
3900 # get the data
3901 cb_data = s.get_channel_binding("tls-unique")
3902 if support.verbose:
3903 sys.stdout.write(
3904 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003905
Christian Heimes05d9fe32018-02-27 08:55:39 +01003906 # check if it is sane
3907 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003908 if s.version() == 'TLSv1.3':
3909 self.assertEqual(len(cb_data), 48)
3910 else:
3911 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912
Christian Heimes05d9fe32018-02-27 08:55:39 +01003913 # and compare with the peers version
3914 s.write(b"CB tls-unique\n")
3915 peer_data_repr = s.read().strip()
3916 self.assertEqual(peer_data_repr,
3917 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003918
3919 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003920 with client_context.wrap_socket(
3921 socket.socket(),
3922 server_hostname=hostname) as s:
3923 s.connect((HOST, server.port))
3924 new_cb_data = s.get_channel_binding("tls-unique")
3925 if support.verbose:
3926 sys.stdout.write(
3927 "got another channel binding data: {0!r}\n".format(
3928 new_cb_data)
3929 )
3930 # is it really unique
3931 self.assertNotEqual(cb_data, new_cb_data)
3932 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003933 if s.version() == 'TLSv1.3':
3934 self.assertEqual(len(cb_data), 48)
3935 else:
3936 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003937 s.write(b"CB tls-unique\n")
3938 peer_data_repr = s.read().strip()
3939 self.assertEqual(peer_data_repr,
3940 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941
3942 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003943 client_context, server_context, hostname = testing_context()
3944 stats = server_params_test(client_context, server_context,
3945 chatty=True, connectionchatty=True,
3946 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 if support.verbose:
3948 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3949 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3950
3951 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3952 "ssl.OP_NO_COMPRESSION needed for this test")
3953 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003954 client_context, server_context, hostname = testing_context()
3955 client_context.options |= ssl.OP_NO_COMPRESSION
3956 server_context.options |= ssl.OP_NO_COMPRESSION
3957 stats = server_params_test(client_context, server_context,
3958 chatty=True, connectionchatty=True,
3959 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 self.assertIs(stats['compression'], None)
3961
Paul Monsonf3550692019-06-19 13:09:54 -07003962 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003963 def test_dh_params(self):
3964 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003965 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003966 # test scenario needs TLS <= 1.2
3967 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003968 server_context.load_dh_params(DHFILE)
3969 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003970 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003971 stats = server_params_test(client_context, server_context,
3972 chatty=True, connectionchatty=True,
3973 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003974 cipher = stats["cipher"][0]
3975 parts = cipher.split("-")
3976 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3977 self.fail("Non-DH cipher: " + cipher[0])
3978
Christian Heimesb7b92252018-02-25 09:49:31 +01003979 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003980 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003981 def test_ecdh_curve(self):
3982 # server secp384r1, client auto
3983 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003984
Christian Heimesb7b92252018-02-25 09:49:31 +01003985 server_context.set_ecdh_curve("secp384r1")
3986 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3987 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3988 stats = server_params_test(client_context, server_context,
3989 chatty=True, connectionchatty=True,
3990 sni_name=hostname)
3991
3992 # server auto, client secp384r1
3993 client_context, server_context, hostname = testing_context()
3994 client_context.set_ecdh_curve("secp384r1")
3995 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3996 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3997 stats = server_params_test(client_context, server_context,
3998 chatty=True, connectionchatty=True,
3999 sni_name=hostname)
4000
4001 # server / client curve mismatch
4002 client_context, server_context, hostname = testing_context()
4003 client_context.set_ecdh_curve("prime256v1")
4004 server_context.set_ecdh_curve("secp384r1")
4005 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
4006 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4007 try:
4008 stats = server_params_test(client_context, server_context,
4009 chatty=True, connectionchatty=True,
4010 sni_name=hostname)
4011 except ssl.SSLError:
4012 pass
4013 else:
4014 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004015 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004016 self.fail("mismatch curve did not fail")
4017
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004018 def test_selected_alpn_protocol(self):
4019 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004020 client_context, server_context, hostname = testing_context()
4021 stats = server_params_test(client_context, server_context,
4022 chatty=True, connectionchatty=True,
4023 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004024 self.assertIs(stats['client_alpn_protocol'], None)
4025
4026 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4027 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4028 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004029 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004030 server_context.set_alpn_protocols(['foo', 'bar'])
4031 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004032 chatty=True, connectionchatty=True,
4033 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004034 self.assertIs(stats['client_alpn_protocol'], None)
4035
4036 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4037 def test_alpn_protocols(self):
4038 server_protocols = ['foo', 'bar', 'milkshake']
4039 protocol_tests = [
4040 (['foo', 'bar'], 'foo'),
4041 (['bar', 'foo'], 'foo'),
4042 (['milkshake'], 'milkshake'),
4043 (['http/3.0', 'http/4.0'], None)
4044 ]
4045 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004046 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004048 client_context.set_alpn_protocols(client_protocols)
4049
4050 try:
4051 stats = server_params_test(client_context,
4052 server_context,
4053 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004054 connectionchatty=True,
4055 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004056 except ssl.SSLError as e:
4057 stats = e
4058
Christian Heimes05d9fe32018-02-27 08:55:39 +01004059 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4061 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4062 self.assertIsInstance(stats, ssl.SSLError)
4063 else:
4064 msg = "failed trying %s (s) and %s (c).\n" \
4065 "was expecting %s, but got %%s from the %%s" \
4066 % (str(server_protocols), str(client_protocols),
4067 str(expected))
4068 client_result = stats['client_alpn_protocol']
4069 self.assertEqual(client_result, expected,
4070 msg % (client_result, "client"))
4071 server_result = stats['server_alpn_protocols'][-1] \
4072 if len(stats['server_alpn_protocols']) else 'nothing'
4073 self.assertEqual(server_result, expected,
4074 msg % (server_result, "server"))
4075
4076 def test_selected_npn_protocol(self):
4077 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004078 client_context, server_context, hostname = testing_context()
4079 stats = server_params_test(client_context, server_context,
4080 chatty=True, connectionchatty=True,
4081 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004082 self.assertIs(stats['client_npn_protocol'], None)
4083
4084 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4085 def test_npn_protocols(self):
4086 server_protocols = ['http/1.1', 'spdy/2']
4087 protocol_tests = [
4088 (['http/1.1', 'spdy/2'], 'http/1.1'),
4089 (['spdy/2', 'http/1.1'], 'http/1.1'),
4090 (['spdy/2', 'test'], 'spdy/2'),
4091 (['abc', 'def'], 'abc')
4092 ]
4093 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004095 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004097 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004098 chatty=True, connectionchatty=True,
4099 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004100 msg = "failed trying %s (s) and %s (c).\n" \
4101 "was expecting %s, but got %%s from the %%s" \
4102 % (str(server_protocols), str(client_protocols),
4103 str(expected))
4104 client_result = stats['client_npn_protocol']
4105 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4106 server_result = stats['server_npn_protocols'][-1] \
4107 if len(stats['server_npn_protocols']) else 'nothing'
4108 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4109
4110 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004111 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004112 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004113 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004114 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004115 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004116 client_context.load_verify_locations(SIGNING_CA)
4117 return server_context, other_context, client_context
4118
4119 def check_common_name(self, stats, name):
4120 cert = stats['peercert']
4121 self.assertIn((('commonName', name),), cert['subject'])
4122
4123 @needs_sni
4124 def test_sni_callback(self):
4125 calls = []
4126 server_context, other_context, client_context = self.sni_contexts()
4127
Christian Heimesa170fa12017-09-15 20:27:30 +02004128 client_context.check_hostname = False
4129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004130 def servername_cb(ssl_sock, server_name, initial_context):
4131 calls.append((server_name, initial_context))
4132 if server_name is not None:
4133 ssl_sock.context = other_context
4134 server_context.set_servername_callback(servername_cb)
4135
4136 stats = server_params_test(client_context, server_context,
4137 chatty=True,
4138 sni_name='supermessage')
4139 # The hostname was fetched properly, and the certificate was
4140 # changed for the connection.
4141 self.assertEqual(calls, [("supermessage", server_context)])
4142 # CERTFILE4 was selected
4143 self.check_common_name(stats, 'fakehostname')
4144
4145 calls = []
4146 # The callback is called with server_name=None
4147 stats = server_params_test(client_context, server_context,
4148 chatty=True,
4149 sni_name=None)
4150 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004151 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004152
4153 # Check disabling the callback
4154 calls = []
4155 server_context.set_servername_callback(None)
4156
4157 stats = server_params_test(client_context, server_context,
4158 chatty=True,
4159 sni_name='notfunny')
4160 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004161 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004162 self.assertEqual(calls, [])
4163
4164 @needs_sni
4165 def test_sni_callback_alert(self):
4166 # Returning a TLS alert is reflected to the connecting client
4167 server_context, other_context, client_context = self.sni_contexts()
4168
4169 def cb_returning_alert(ssl_sock, server_name, initial_context):
4170 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4171 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004172 with self.assertRaises(ssl.SSLError) as cm:
4173 stats = server_params_test(client_context, server_context,
4174 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004175 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004176 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004178 @needs_sni
4179 def test_sni_callback_raising(self):
4180 # Raising fails the connection with a TLS handshake failure alert.
4181 server_context, other_context, client_context = self.sni_contexts()
4182
4183 def cb_raising(ssl_sock, server_name, initial_context):
4184 1/0
4185 server_context.set_servername_callback(cb_raising)
4186
Victor Stinner00253502019-06-03 03:51:43 +02004187 with support.catch_unraisable_exception() as catch:
4188 with self.assertRaises(ssl.SSLError) as cm:
4189 stats = server_params_test(client_context, server_context,
4190 chatty=False,
4191 sni_name='supermessage')
4192
4193 self.assertEqual(cm.exception.reason,
4194 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4195 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004196
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004197 @needs_sni
4198 def test_sni_callback_wrong_return_type(self):
4199 # Returning the wrong return type terminates the TLS connection
4200 # with an internal error alert.
4201 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004202
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004203 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4204 return "foo"
4205 server_context.set_servername_callback(cb_wrong_return_type)
4206
Victor Stinner00253502019-06-03 03:51:43 +02004207 with support.catch_unraisable_exception() as catch:
4208 with self.assertRaises(ssl.SSLError) as cm:
4209 stats = server_params_test(client_context, server_context,
4210 chatty=False,
4211 sni_name='supermessage')
4212
4213
4214 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4215 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004216
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004217 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004218 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004219 client_context.set_ciphers("AES128:AES256")
4220 server_context.set_ciphers("AES256")
4221 expected_algs = [
4222 "AES256", "AES-256",
4223 # TLS 1.3 ciphers are always enabled
4224 "TLS_CHACHA20", "TLS_AES",
4225 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004226
Christian Heimesa170fa12017-09-15 20:27:30 +02004227 stats = server_params_test(client_context, server_context,
4228 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004229 ciphers = stats['server_shared_ciphers'][0]
4230 self.assertGreater(len(ciphers), 0)
4231 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004232 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004233 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004234
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004235 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004236 client_context, server_context, hostname = testing_context()
4237 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004238
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004239 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004240 s = client_context.wrap_socket(socket.socket(),
4241 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004242 s.connect((HOST, server.port))
4243 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004244
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004245 self.assertRaises(ValueError, s.read, 1024)
4246 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004247
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004248 def test_sendfile(self):
4249 TEST_DATA = b"x" * 512
4250 with open(support.TESTFN, 'wb') as f:
4251 f.write(TEST_DATA)
4252 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004253 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004254 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004255 context.load_verify_locations(SIGNING_CA)
4256 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004257 server = ThreadedEchoServer(context=context, chatty=False)
4258 with server:
4259 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004260 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004261 with open(support.TESTFN, 'rb') as file:
4262 s.sendfile(file)
4263 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004264
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004265 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004266 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004267 # TODO: sessions aren't compatible with TLSv1.3 yet
4268 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004269
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004270 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004271 stats = server_params_test(client_context, server_context,
4272 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004273 session = stats['session']
4274 self.assertTrue(session.id)
4275 self.assertGreater(session.time, 0)
4276 self.assertGreater(session.timeout, 0)
4277 self.assertTrue(session.has_ticket)
4278 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4279 self.assertGreater(session.ticket_lifetime_hint, 0)
4280 self.assertFalse(stats['session_reused'])
4281 sess_stat = server_context.session_stats()
4282 self.assertEqual(sess_stat['accept'], 1)
4283 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004284
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004285 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004286 stats = server_params_test(client_context, server_context,
4287 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004288 sess_stat = server_context.session_stats()
4289 self.assertEqual(sess_stat['accept'], 2)
4290 self.assertEqual(sess_stat['hits'], 1)
4291 self.assertTrue(stats['session_reused'])
4292 session2 = stats['session']
4293 self.assertEqual(session2.id, session.id)
4294 self.assertEqual(session2, session)
4295 self.assertIsNot(session2, session)
4296 self.assertGreaterEqual(session2.time, session.time)
4297 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004298
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004299 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004300 stats = server_params_test(client_context, server_context,
4301 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004302 self.assertFalse(stats['session_reused'])
4303 session3 = stats['session']
4304 self.assertNotEqual(session3.id, session.id)
4305 self.assertNotEqual(session3, session)
4306 sess_stat = server_context.session_stats()
4307 self.assertEqual(sess_stat['accept'], 3)
4308 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004309
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004310 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004311 stats = server_params_test(client_context, server_context,
4312 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004313 self.assertTrue(stats['session_reused'])
4314 session4 = stats['session']
4315 self.assertEqual(session4.id, session.id)
4316 self.assertEqual(session4, session)
4317 self.assertGreaterEqual(session4.time, session.time)
4318 self.assertGreaterEqual(session4.timeout, session.timeout)
4319 sess_stat = server_context.session_stats()
4320 self.assertEqual(sess_stat['accept'], 4)
4321 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004322
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004323 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004324 client_context, server_context, hostname = testing_context()
4325 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004326
Christian Heimes05d9fe32018-02-27 08:55:39 +01004327 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004328 client_context.options |= ssl.OP_NO_TLSv1_3
4329 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004330
Christian Heimesa170fa12017-09-15 20:27:30 +02004331 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004332 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004333 with client_context.wrap_socket(socket.socket(),
4334 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004335 # session is None before handshake
4336 self.assertEqual(s.session, None)
4337 self.assertEqual(s.session_reused, None)
4338 s.connect((HOST, server.port))
4339 session = s.session
4340 self.assertTrue(session)
4341 with self.assertRaises(TypeError) as e:
4342 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004343 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004344
Christian Heimesa170fa12017-09-15 20:27:30 +02004345 with client_context.wrap_socket(socket.socket(),
4346 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004347 s.connect((HOST, server.port))
4348 # cannot set session after handshake
4349 with self.assertRaises(ValueError) as e:
4350 s.session = session
4351 self.assertEqual(str(e.exception),
4352 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004353
Christian Heimesa170fa12017-09-15 20:27:30 +02004354 with client_context.wrap_socket(socket.socket(),
4355 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004356 # can set session before handshake and before the
4357 # connection was established
4358 s.session = session
4359 s.connect((HOST, server.port))
4360 self.assertEqual(s.session.id, session.id)
4361 self.assertEqual(s.session, session)
4362 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004363
Christian Heimesa170fa12017-09-15 20:27:30 +02004364 with client_context2.wrap_socket(socket.socket(),
4365 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004366 # cannot re-use session with a different SSLContext
4367 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004368 s.session = session
4369 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004370 self.assertEqual(str(e.exception),
4371 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004372
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004373
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02004374@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004375class TestPostHandshakeAuth(unittest.TestCase):
4376 def test_pha_setter(self):
4377 protocols = [
4378 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4379 ]
4380 for protocol in protocols:
4381 ctx = ssl.SSLContext(protocol)
4382 self.assertEqual(ctx.post_handshake_auth, False)
4383
4384 ctx.post_handshake_auth = True
4385 self.assertEqual(ctx.post_handshake_auth, True)
4386
4387 ctx.verify_mode = ssl.CERT_REQUIRED
4388 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4389 self.assertEqual(ctx.post_handshake_auth, True)
4390
4391 ctx.post_handshake_auth = False
4392 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4393 self.assertEqual(ctx.post_handshake_auth, False)
4394
4395 ctx.verify_mode = ssl.CERT_OPTIONAL
4396 ctx.post_handshake_auth = True
4397 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4398 self.assertEqual(ctx.post_handshake_auth, True)
4399
4400 def test_pha_required(self):
4401 client_context, server_context, hostname = testing_context()
4402 server_context.post_handshake_auth = True
4403 server_context.verify_mode = ssl.CERT_REQUIRED
4404 client_context.post_handshake_auth = True
4405 client_context.load_cert_chain(SIGNED_CERTFILE)
4406
4407 server = ThreadedEchoServer(context=server_context, chatty=False)
4408 with server:
4409 with client_context.wrap_socket(socket.socket(),
4410 server_hostname=hostname) as s:
4411 s.connect((HOST, server.port))
4412 s.write(b'HASCERT')
4413 self.assertEqual(s.recv(1024), b'FALSE\n')
4414 s.write(b'PHA')
4415 self.assertEqual(s.recv(1024), b'OK\n')
4416 s.write(b'HASCERT')
4417 self.assertEqual(s.recv(1024), b'TRUE\n')
4418 # PHA method just returns true when cert is already available
4419 s.write(b'PHA')
4420 self.assertEqual(s.recv(1024), b'OK\n')
4421 s.write(b'GETCERT')
4422 cert_text = s.recv(4096).decode('us-ascii')
4423 self.assertIn('Python Software Foundation CA', cert_text)
4424
4425 def test_pha_required_nocert(self):
4426 client_context, server_context, hostname = testing_context()
4427 server_context.post_handshake_auth = True
4428 server_context.verify_mode = ssl.CERT_REQUIRED
4429 client_context.post_handshake_auth = True
4430
Victor Stinner73ea5462019-07-09 14:33:49 +02004431 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4432 # (it is only raised sometimes on Windows)
Hai Shie80697d2020-05-28 06:10:27 +08004433 with threading_helper.catch_threading_exception() as cm:
Victor Stinner73ea5462019-07-09 14:33:49 +02004434 server = ThreadedEchoServer(context=server_context, chatty=False)
4435 with server:
4436 with client_context.wrap_socket(socket.socket(),
4437 server_hostname=hostname) as s:
4438 s.connect((HOST, server.port))
4439 s.write(b'PHA')
4440 # receive CertificateRequest
4441 self.assertEqual(s.recv(1024), b'OK\n')
4442 # send empty Certificate + Finish
4443 s.write(b'HASCERT')
4444 # receive alert
4445 with self.assertRaisesRegex(
4446 ssl.SSLError,
4447 'tlsv13 alert certificate required'):
4448 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004449
4450 def test_pha_optional(self):
4451 if support.verbose:
4452 sys.stdout.write("\n")
4453
4454 client_context, server_context, hostname = testing_context()
4455 server_context.post_handshake_auth = True
4456 server_context.verify_mode = ssl.CERT_REQUIRED
4457 client_context.post_handshake_auth = True
4458 client_context.load_cert_chain(SIGNED_CERTFILE)
4459
4460 # check CERT_OPTIONAL
4461 server_context.verify_mode = ssl.CERT_OPTIONAL
4462 server = ThreadedEchoServer(context=server_context, chatty=False)
4463 with server:
4464 with client_context.wrap_socket(socket.socket(),
4465 server_hostname=hostname) as s:
4466 s.connect((HOST, server.port))
4467 s.write(b'HASCERT')
4468 self.assertEqual(s.recv(1024), b'FALSE\n')
4469 s.write(b'PHA')
4470 self.assertEqual(s.recv(1024), b'OK\n')
4471 s.write(b'HASCERT')
4472 self.assertEqual(s.recv(1024), b'TRUE\n')
4473
4474 def test_pha_optional_nocert(self):
4475 if support.verbose:
4476 sys.stdout.write("\n")
4477
4478 client_context, server_context, hostname = testing_context()
4479 server_context.post_handshake_auth = True
4480 server_context.verify_mode = ssl.CERT_OPTIONAL
4481 client_context.post_handshake_auth = True
4482
4483 server = ThreadedEchoServer(context=server_context, chatty=False)
4484 with server:
4485 with client_context.wrap_socket(socket.socket(),
4486 server_hostname=hostname) as s:
4487 s.connect((HOST, server.port))
4488 s.write(b'HASCERT')
4489 self.assertEqual(s.recv(1024), b'FALSE\n')
4490 s.write(b'PHA')
4491 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004492 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004493 s.write(b'HASCERT')
4494 self.assertEqual(s.recv(1024), b'FALSE\n')
4495
4496 def test_pha_no_pha_client(self):
4497 client_context, server_context, hostname = testing_context()
4498 server_context.post_handshake_auth = True
4499 server_context.verify_mode = ssl.CERT_REQUIRED
4500 client_context.load_cert_chain(SIGNED_CERTFILE)
4501
4502 server = ThreadedEchoServer(context=server_context, chatty=False)
4503 with server:
4504 with client_context.wrap_socket(socket.socket(),
4505 server_hostname=hostname) as s:
4506 s.connect((HOST, server.port))
4507 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4508 s.verify_client_post_handshake()
4509 s.write(b'PHA')
4510 self.assertIn(b'extension not received', s.recv(1024))
4511
4512 def test_pha_no_pha_server(self):
4513 # server doesn't have PHA enabled, cert is requested in handshake
4514 client_context, server_context, hostname = testing_context()
4515 server_context.verify_mode = ssl.CERT_REQUIRED
4516 client_context.post_handshake_auth = True
4517 client_context.load_cert_chain(SIGNED_CERTFILE)
4518
4519 server = ThreadedEchoServer(context=server_context, chatty=False)
4520 with server:
4521 with client_context.wrap_socket(socket.socket(),
4522 server_hostname=hostname) as s:
4523 s.connect((HOST, server.port))
4524 s.write(b'HASCERT')
4525 self.assertEqual(s.recv(1024), b'TRUE\n')
4526 # PHA doesn't fail if there is already a cert
4527 s.write(b'PHA')
4528 self.assertEqual(s.recv(1024), b'OK\n')
4529 s.write(b'HASCERT')
4530 self.assertEqual(s.recv(1024), b'TRUE\n')
4531
4532 def test_pha_not_tls13(self):
4533 # TLS 1.2
4534 client_context, server_context, hostname = testing_context()
4535 server_context.verify_mode = ssl.CERT_REQUIRED
4536 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4537 client_context.post_handshake_auth = True
4538 client_context.load_cert_chain(SIGNED_CERTFILE)
4539
4540 server = ThreadedEchoServer(context=server_context, chatty=False)
4541 with server:
4542 with client_context.wrap_socket(socket.socket(),
4543 server_hostname=hostname) as s:
4544 s.connect((HOST, server.port))
4545 # PHA fails for TLS != 1.3
4546 s.write(b'PHA')
4547 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4548
Christian Heimesf0f59302019-07-01 08:29:17 +02004549 def test_bpo37428_pha_cert_none(self):
4550 # verify that post_handshake_auth does not implicitly enable cert
4551 # validation.
4552 hostname = SIGNED_CERTFILE_HOSTNAME
4553 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4554 client_context.post_handshake_auth = True
4555 client_context.load_cert_chain(SIGNED_CERTFILE)
4556 # no cert validation and CA on client side
4557 client_context.check_hostname = False
4558 client_context.verify_mode = ssl.CERT_NONE
4559
4560 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4561 server_context.load_cert_chain(SIGNED_CERTFILE)
4562 server_context.load_verify_locations(SIGNING_CA)
4563 server_context.post_handshake_auth = True
4564 server_context.verify_mode = ssl.CERT_REQUIRED
4565
4566 server = ThreadedEchoServer(context=server_context, chatty=False)
4567 with server:
4568 with client_context.wrap_socket(socket.socket(),
4569 server_hostname=hostname) as s:
4570 s.connect((HOST, server.port))
4571 s.write(b'HASCERT')
4572 self.assertEqual(s.recv(1024), b'FALSE\n')
4573 s.write(b'PHA')
4574 self.assertEqual(s.recv(1024), b'OK\n')
4575 s.write(b'HASCERT')
4576 self.assertEqual(s.recv(1024), b'TRUE\n')
4577 # server cert has not been validated
4578 self.assertEqual(s.getpeercert(), {})
4579
Christian Heimes9fb051f2018-09-23 08:32:31 +02004580
Christian Heimesc7f70692019-05-31 11:44:05 +02004581HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4582requires_keylog = unittest.skipUnless(
4583 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4584
4585class TestSSLDebug(unittest.TestCase):
4586
4587 def keylog_lines(self, fname=support.TESTFN):
4588 with open(fname) as f:
4589 return len(list(f))
4590
4591 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004592 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004593 def test_keylog_defaults(self):
4594 self.addCleanup(support.unlink, support.TESTFN)
4595 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4596 self.assertEqual(ctx.keylog_filename, None)
4597
4598 self.assertFalse(os.path.isfile(support.TESTFN))
4599 ctx.keylog_filename = support.TESTFN
4600 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4601 self.assertTrue(os.path.isfile(support.TESTFN))
4602 self.assertEqual(self.keylog_lines(), 1)
4603
4604 ctx.keylog_filename = None
4605 self.assertEqual(ctx.keylog_filename, None)
4606
4607 with self.assertRaises((IsADirectoryError, PermissionError)):
4608 # Windows raises PermissionError
4609 ctx.keylog_filename = os.path.dirname(
4610 os.path.abspath(support.TESTFN))
4611
4612 with self.assertRaises(TypeError):
4613 ctx.keylog_filename = 1
4614
4615 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004616 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004617 def test_keylog_filename(self):
4618 self.addCleanup(support.unlink, support.TESTFN)
4619 client_context, server_context, hostname = testing_context()
4620
4621 client_context.keylog_filename = support.TESTFN
4622 server = ThreadedEchoServer(context=server_context, chatty=False)
4623 with server:
4624 with client_context.wrap_socket(socket.socket(),
4625 server_hostname=hostname) as s:
4626 s.connect((HOST, server.port))
4627 # header, 5 lines for TLS 1.3
4628 self.assertEqual(self.keylog_lines(), 6)
4629
4630 client_context.keylog_filename = None
4631 server_context.keylog_filename = support.TESTFN
4632 server = ThreadedEchoServer(context=server_context, chatty=False)
4633 with server:
4634 with client_context.wrap_socket(socket.socket(),
4635 server_hostname=hostname) as s:
4636 s.connect((HOST, server.port))
4637 self.assertGreaterEqual(self.keylog_lines(), 11)
4638
4639 client_context.keylog_filename = support.TESTFN
4640 server_context.keylog_filename = support.TESTFN
4641 server = ThreadedEchoServer(context=server_context, chatty=False)
4642 with server:
4643 with client_context.wrap_socket(socket.socket(),
4644 server_hostname=hostname) as s:
4645 s.connect((HOST, server.port))
4646 self.assertGreaterEqual(self.keylog_lines(), 21)
4647
4648 client_context.keylog_filename = None
4649 server_context.keylog_filename = None
4650
4651 @requires_keylog
4652 @unittest.skipIf(sys.flags.ignore_environment,
4653 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004654 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004655 def test_keylog_env(self):
4656 self.addCleanup(support.unlink, support.TESTFN)
4657 with unittest.mock.patch.dict(os.environ):
4658 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4659 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4660
4661 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4662 self.assertEqual(ctx.keylog_filename, None)
4663
4664 ctx = ssl.create_default_context()
4665 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4666
4667 ctx = ssl._create_stdlib_context()
4668 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4669
4670 def test_msg_callback(self):
4671 client_context, server_context, hostname = testing_context()
4672
4673 def msg_cb(conn, direction, version, content_type, msg_type, data):
4674 pass
4675
4676 self.assertIs(client_context._msg_callback, None)
4677 client_context._msg_callback = msg_cb
4678 self.assertIs(client_context._msg_callback, msg_cb)
4679 with self.assertRaises(TypeError):
4680 client_context._msg_callback = object()
4681
4682 def test_msg_callback_tls12(self):
4683 client_context, server_context, hostname = testing_context()
4684 client_context.options |= ssl.OP_NO_TLSv1_3
4685
4686 msg = []
4687
4688 def msg_cb(conn, direction, version, content_type, msg_type, data):
4689 self.assertIsInstance(conn, ssl.SSLSocket)
4690 self.assertIsInstance(data, bytes)
4691 self.assertIn(direction, {'read', 'write'})
4692 msg.append((direction, version, content_type, msg_type))
4693
4694 client_context._msg_callback = msg_cb
4695
4696 server = ThreadedEchoServer(context=server_context, chatty=False)
4697 with server:
4698 with client_context.wrap_socket(socket.socket(),
4699 server_hostname=hostname) as s:
4700 s.connect((HOST, server.port))
4701
Christian Heimese35d1ba2019-06-03 20:40:15 +02004702 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004703 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4704 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004705 msg
4706 )
4707 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004708 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4709 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004710 msg
4711 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004712
4713
Thomas Woutersed03b412007-08-28 21:37:11 +00004714def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004715 if support.verbose:
4716 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004717 'Mac': platform.mac_ver,
4718 'Windows': platform.win32_ver,
4719 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004720 for name, func in plats.items():
4721 plat = func()
4722 if plat and plat[0]:
4723 plat = '%s %r' % (name, plat)
4724 break
4725 else:
4726 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004727 print("test_ssl: testing with %r %r" %
4728 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4729 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004730 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004731 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4732 try:
4733 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4734 except AttributeError:
4735 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004736
Antoine Pitrou152efa22010-05-16 18:19:27 +00004737 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004738 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004739 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004740 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004741 BADCERT, BADKEY, EMPTYCERT]:
4742 if not os.path.exists(filename):
4743 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004744
Martin Panter3840b2a2016-03-27 01:53:46 +00004745 tests = [
4746 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004747 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004748 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004749 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004750
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004751 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004752 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004753
Hai Shie80697d2020-05-28 06:10:27 +08004754 thread_info = threading_helper.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004755 try:
4756 support.run_unittest(*tests)
4757 finally:
Hai Shie80697d2020-05-28 06:10:27 +08004758 threading_helper.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004759
4760if __name__ == "__main__":
4761 test_main()