blob: de778d34658b7a00ed0da5629dd9ef33d08921d4 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Serhiy Storchaka16994912020-04-25 10:06:29 +03007from test.support import socket_helper
Hai Shie80697d2020-05-28 06:10:27 +08008from test.support import threading_helper
Thomas Woutersed03b412007-08-28 21:37:11 +00009import socket
Bill Janssen6e027db2007-11-15 22:23:56 +000010import select
Thomas Woutersed03b412007-08-28 21:37:11 +000011import time
Christian Heimes9424bb42013-06-17 15:32:57 +020012import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000014import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000015import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000016import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000017import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020018import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000019import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000020import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000021import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000022import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010023import sysconfig
Christian Heimesdf6ac7e2019-09-26 17:02:59 +020024import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070025try:
26 import ctypes
27except ImportError:
28 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000029
Antoine Pitrou05d936d2010-10-13 11:38:36 +000030ssl = support.import_module("ssl")
31
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020032from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000033
Paul Monsonf3550692019-06-19 13:09:54 -070034Py_DEBUG = hasattr(sys, 'gettotalrefcount')
35Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
36
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010037PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Serhiy Storchaka16994912020-04-25 10:06:29 +030038HOST = socket_helper.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020039IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010040IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
41IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010042PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000043
Victor Stinner3ef63442019-02-19 18:06:03 +010044PROTOCOL_TO_TLS_VERSION = {}
45for proto, ver in (
46 ("PROTOCOL_SSLv23", "SSLv3"),
47 ("PROTOCOL_TLSv1", "TLSv1"),
48 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
49):
50 try:
51 proto = getattr(ssl, proto)
52 ver = getattr(ssl.TLSVersion, ver)
53 except AttributeError:
54 continue
55 PROTOCOL_TO_TLS_VERSION[proto] = ver
56
Christian Heimesefff7062013-11-21 03:35:02 +010057def data_file(*name):
58 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000059
Antoine Pitrou81564092010-10-08 23:06:24 +000060# The custom key and certificate files used in test_ssl are generated
61# using Lib/test/make_ssl_certs.py.
62# Other certificates are simply fetched from the Internet servers they
63# are meant to authenticate.
64
Antoine Pitrou152efa22010-05-16 18:19:27 +000065CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000066BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000067ONLYCERT = data_file("ssl_cert.pem")
68ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000069BYTES_ONLYCERT = os.fsencode(ONLYCERT)
70BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020071CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
72ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
73KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000074CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000075BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010076CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
77CAFILE_CACERT = data_file("capath", "5ed36f99.0")
78
Christian Heimesbd5c7d22018-01-20 15:16:30 +010079CERTFILE_INFO = {
80 'issuer': ((('countryName', 'XY'),),
81 (('localityName', 'Castle Anthrax'),),
82 (('organizationName', 'Python Software Foundation'),),
83 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020084 'notAfter': 'Aug 26 14:23:15 2028 GMT',
85 'notBefore': 'Aug 29 14:23:15 2018 GMT',
86 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010087 'subject': ((('countryName', 'XY'),),
88 (('localityName', 'Castle Anthrax'),),
89 (('organizationName', 'Python Software Foundation'),),
90 (('commonName', 'localhost'),)),
91 'subjectAltName': (('DNS', 'localhost'),),
92 'version': 3
93}
Antoine Pitrou152efa22010-05-16 18:19:27 +000094
Christian Heimes22587792013-11-21 23:56:13 +010095# empty CRL
96CRLFILE = data_file("revocation.crl")
97
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010098# Two keys and certs signed by the same CA (for SNI tests)
99SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101
102SIGNED_CERTFILE_INFO = {
103 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
104 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
105 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
106 'issuer': ((('countryName', 'XY'),),
107 (('organizationName', 'Python Software Foundation CA'),),
108 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200109 'notAfter': 'Jul 7 14:23:16 2028 GMT',
110 'notBefore': 'Aug 29 14:23:16 2018 GMT',
111 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100112 'subject': ((('countryName', 'XY'),),
113 (('localityName', 'Castle Anthrax'),),
114 (('organizationName', 'Python Software Foundation'),),
115 (('commonName', 'localhost'),)),
116 'subjectAltName': (('DNS', 'localhost'),),
117 'version': 3
118}
119
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100120SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200121SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100122SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
123SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
124
Martin Panter3840b2a2016-03-27 01:53:46 +0000125# Same certificate as pycacert.pem, but without extra text in file
126SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200127# cert with all kinds of subject alt names
128ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100129IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Martin Panter3d81d932016-01-14 09:36:00 +0000131REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000132
133EMPTYCERT = data_file("nullcert.pem")
134BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000135NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000136BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200137NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200138NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100139TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000140
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200141DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100142BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000143
Christian Heimes358cfd42016-09-10 22:43:48 +0200144# Not defined in all versions of OpenSSL
145OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
146OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
147OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
148OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100149OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200150
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100151
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200152def has_tls_protocol(protocol):
153 """Check if a TLS protocol is available and enabled
154
155 :param protocol: enum ssl._SSLMethod member or name
156 :return: bool
157 """
158 if isinstance(protocol, str):
159 assert protocol.startswith('PROTOCOL_')
160 protocol = getattr(ssl, protocol, None)
161 if protocol is None:
162 return False
163 if protocol in {
164 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
165 ssl.PROTOCOL_TLS_CLIENT
166 }:
167 # auto-negotiate protocols are always available
168 return True
169 name = protocol.name
170 return has_tls_version(name[len('PROTOCOL_'):])
171
172
173@functools.lru_cache
174def has_tls_version(version):
175 """Check if a TLS/SSL version is enabled
176
177 :param version: TLS version name or ssl.TLSVersion member
178 :return: bool
179 """
180 if version == "SSLv2":
181 # never supported and not even in TLSVersion enum
182 return False
183
184 if isinstance(version, str):
185 version = ssl.TLSVersion.__members__[version]
186
187 # check compile time flags like ssl.HAS_TLSv1_2
188 if not getattr(ssl, f'HAS_{version.name}'):
189 return False
190
191 # check runtime and dynamic crypto policy settings. A TLS version may
192 # be compiled in but disabled by a policy or config option.
193 ctx = ssl.SSLContext()
194 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200195 hasattr(ctx, 'minimum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200196 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
197 version < ctx.minimum_version
198 ):
199 return False
200 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200201 hasattr(ctx, 'maximum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200202 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
203 version > ctx.maximum_version
204 ):
205 return False
206
207 return True
208
209
210def requires_tls_version(version):
211 """Decorator to skip tests when a required TLS version is not available
212
213 :param version: TLS version name or ssl.TLSVersion member
214 :return:
215 """
216 def decorator(func):
217 @functools.wraps(func)
218 def wrapper(*args, **kw):
219 if not has_tls_version(version):
220 raise unittest.SkipTest(f"{version} is not available.")
221 else:
222 return func(*args, **kw)
223 return wrapper
224 return decorator
225
226
227requires_minimum_version = unittest.skipUnless(
228 hasattr(ssl.SSLContext, 'minimum_version'),
229 "required OpenSSL >= 1.1.0g"
230)
231
232
Thomas Woutersed03b412007-08-28 21:37:11 +0000233def handle_error(prefix):
234 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000235 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000236 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000237
Antoine Pitroub5218772010-05-21 09:56:06 +0000238def can_clear_options():
239 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200240 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000241
242def no_sslv2_implies_sslv3_hello():
243 # 0.9.7h or higher
244 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
245
Christian Heimes2427b502013-11-23 11:24:32 +0100246def have_verify_flags():
247 # 0.9.8 or higher
248 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
249
Christian Heimesb7b92252018-02-25 09:49:31 +0100250def _have_secp_curves():
251 if not ssl.HAS_ECDH:
252 return False
253 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
254 try:
255 ctx.set_ecdh_curve("secp384r1")
256 except ValueError:
257 return False
258 else:
259 return True
260
261
262HAVE_SECP_CURVES = _have_secp_curves()
263
264
Antoine Pitrouc695c952014-04-28 20:57:36 +0200265def utc_offset(): #NOTE: ignore issues like #1647654
266 # local time = utc time + utc offset
267 if time.daylight and time.localtime().tm_isdst > 0:
268 return -time.altzone # seconds
269 return -time.timezone
270
Christian Heimes9424bb42013-06-17 15:32:57 +0200271def asn1time(cert_time):
272 # Some versions of OpenSSL ignore seconds, see #18207
273 # 0.9.8.i
274 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
275 fmt = "%b %d %H:%M:%S %Y GMT"
276 dt = datetime.datetime.strptime(cert_time, fmt)
277 dt = dt.replace(second=0)
278 cert_time = dt.strftime(fmt)
279 # %d adds leading zero but ASN1_TIME_print() uses leading space
280 if cert_time[4] == "0":
281 cert_time = cert_time[:4] + " " + cert_time[5:]
282
283 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000284
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100285needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
286
Antoine Pitrou23df4832010-08-04 17:14:06 +0000287
Christian Heimesd0486372016-09-10 23:23:33 +0200288def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
289 cert_reqs=ssl.CERT_NONE, ca_certs=None,
290 ciphers=None, certfile=None, keyfile=None,
291 **kwargs):
292 context = ssl.SSLContext(ssl_version)
293 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200294 if cert_reqs == ssl.CERT_NONE:
295 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200296 context.verify_mode = cert_reqs
297 if ca_certs is not None:
298 context.load_verify_locations(ca_certs)
299 if certfile is not None or keyfile is not None:
300 context.load_cert_chain(certfile, keyfile)
301 if ciphers is not None:
302 context.set_ciphers(ciphers)
303 return context.wrap_socket(sock, **kwargs)
304
Christian Heimesa170fa12017-09-15 20:27:30 +0200305
306def testing_context(server_cert=SIGNED_CERTFILE):
307 """Create context
308
309 client_context, server_context, hostname = testing_context()
310 """
311 if server_cert == SIGNED_CERTFILE:
312 hostname = SIGNED_CERTFILE_HOSTNAME
313 elif server_cert == SIGNED_CERTFILE2:
314 hostname = SIGNED_CERTFILE2_HOSTNAME
315 else:
316 raise ValueError(server_cert)
317
318 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
319 client_context.load_verify_locations(SIGNING_CA)
320
321 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
322 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200323 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200324
325 return client_context, server_context, hostname
326
327
Antoine Pitrou152efa22010-05-16 18:19:27 +0000328class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000329
Antoine Pitrou480a1242010-04-28 21:37:09 +0000330 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000331 ssl.CERT_NONE
332 ssl.CERT_OPTIONAL
333 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100334 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100335 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100336 if ssl.HAS_ECDH:
337 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100338 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
339 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000340 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100341 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700342 ssl.OP_NO_SSLv2
343 ssl.OP_NO_SSLv3
344 ssl.OP_NO_TLSv1
345 ssl.OP_NO_TLSv1_3
346 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
347 ssl.OP_NO_TLSv1_1
348 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200349 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000350
Christian Heimes9d50ab52018-02-27 10:17:30 +0100351 def test_private_init(self):
352 with self.assertRaisesRegex(TypeError, "public constructor"):
353 with socket.socket() as s:
354 ssl.SSLSocket(s)
355
Antoine Pitrou172f0252014-04-18 20:33:08 +0200356 def test_str_for_enums(self):
357 # Make sure that the PROTOCOL_* constants have enum-like string
358 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200359 proto = ssl.PROTOCOL_TLS
360 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200361 ctx = ssl.SSLContext(proto)
362 self.assertIs(ctx.protocol, proto)
363
Antoine Pitrou480a1242010-04-28 21:37:09 +0000364 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000365 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000366 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000367 sys.stdout.write("\n RAND_status is %d (%s)\n"
368 % (v, (v and "sufficient randomness") or
369 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200370
371 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
372 self.assertEqual(len(data), 16)
373 self.assertEqual(is_cryptographic, v == 1)
374 if v:
375 data = ssl.RAND_bytes(16)
376 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200377 else:
378 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200379
Victor Stinner1e81a392013-12-19 16:47:04 +0100380 # negative num is invalid
381 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
382 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
383
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100384 if hasattr(ssl, 'RAND_egd'):
385 self.assertRaises(TypeError, ssl.RAND_egd, 1)
386 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000387 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200388 ssl.RAND_add(b"this is a random bytes object", 75.0)
389 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000390
Christian Heimesf77b4b22013-08-21 13:26:05 +0200391 @unittest.skipUnless(os.name == 'posix', 'requires posix')
392 def test_random_fork(self):
393 status = ssl.RAND_status()
394 if not status:
395 self.fail("OpenSSL's PRNG has insufficient randomness")
396
397 rfd, wfd = os.pipe()
398 pid = os.fork()
399 if pid == 0:
400 try:
401 os.close(rfd)
402 child_random = ssl.RAND_pseudo_bytes(16)[0]
403 self.assertEqual(len(child_random), 16)
404 os.write(wfd, child_random)
405 os.close(wfd)
406 except BaseException:
407 os._exit(1)
408 else:
409 os._exit(0)
410 else:
411 os.close(wfd)
412 self.addCleanup(os.close, rfd)
Victor Stinner278c1e12020-03-31 20:08:12 +0200413 support.wait_process(pid, exitcode=0)
Christian Heimesf77b4b22013-08-21 13:26:05 +0200414
415 child_random = os.read(rfd, 16)
416 self.assertEqual(len(child_random), 16)
417 parent_random = ssl.RAND_pseudo_bytes(16)[0]
418 self.assertEqual(len(parent_random), 16)
419
420 self.assertNotEqual(child_random, parent_random)
421
Christian Heimese6dac002018-08-30 07:25:49 +0200422 maxDiff = None
423
Antoine Pitrou480a1242010-04-28 21:37:09 +0000424 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000425 # note that this uses an 'unofficial' function in _ssl.c,
426 # provided solely for this test, to exercise the certificate
427 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100428 self.assertEqual(
429 ssl._ssl._test_decode_cert(CERTFILE),
430 CERTFILE_INFO
431 )
432 self.assertEqual(
433 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
434 SIGNED_CERTFILE_INFO
435 )
436
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200437 # Issue #13034: the subjectAltName in some certificates
438 # (notably projects.developer.nokia.com:443) wasn't parsed
439 p = ssl._ssl._test_decode_cert(NOKIACERT)
440 if support.verbose:
441 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
442 self.assertEqual(p['subjectAltName'],
443 (('DNS', 'projects.developer.nokia.com'),
444 ('DNS', 'projects.forum.nokia.com'))
445 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100446 # extra OCSP and AIA fields
447 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
448 self.assertEqual(p['caIssuers'],
449 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
450 self.assertEqual(p['crlDistributionPoints'],
451 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000452
Christian Heimesa37f5242019-01-15 23:47:42 +0100453 def test_parse_cert_CVE_2019_5010(self):
454 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
455 if support.verbose:
456 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
457 self.assertEqual(
458 p,
459 {
460 'issuer': (
461 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
462 'notAfter': 'Jun 14 18:00:58 2028 GMT',
463 'notBefore': 'Jun 18 18:00:58 2018 GMT',
464 'serialNumber': '02',
465 'subject': ((('countryName', 'UK'),),
466 (('commonName',
467 'codenomicon-vm-2.test.lal.cisco.com'),)),
468 'subjectAltName': (
469 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
470 'version': 3
471 }
472 )
473
Christian Heimes824f7f32013-08-17 00:54:47 +0200474 def test_parse_cert_CVE_2013_4238(self):
475 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
476 if support.verbose:
477 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
478 subject = ((('countryName', 'US'),),
479 (('stateOrProvinceName', 'Oregon'),),
480 (('localityName', 'Beaverton'),),
481 (('organizationName', 'Python Software Foundation'),),
482 (('organizationalUnitName', 'Python Core Development'),),
483 (('commonName', 'null.python.org\x00example.org'),),
484 (('emailAddress', 'python-dev@python.org'),))
485 self.assertEqual(p['subject'], subject)
486 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200487 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
488 san = (('DNS', 'altnull.python.org\x00example.com'),
489 ('email', 'null@python.org\x00user@example.org'),
490 ('URI', 'http://null.python.org\x00http://example.org'),
491 ('IP Address', '192.0.2.1'),
Christian Heimes2b7de662019-12-07 17:59:36 +0100492 ('IP Address', '2001:DB8:0:0:0:0:0:1'))
Christian Heimes157c9832013-08-25 14:12:41 +0200493 else:
494 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
495 san = (('DNS', 'altnull.python.org\x00example.com'),
496 ('email', 'null@python.org\x00user@example.org'),
497 ('URI', 'http://null.python.org\x00http://example.org'),
498 ('IP Address', '192.0.2.1'),
499 ('IP Address', '<invalid>'))
500
501 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200502
Christian Heimes1c03abd2016-09-06 23:25:35 +0200503 def test_parse_all_sans(self):
504 p = ssl._ssl._test_decode_cert(ALLSANFILE)
505 self.assertEqual(p['subjectAltName'],
506 (
507 ('DNS', 'allsans'),
508 ('othername', '<unsupported>'),
509 ('othername', '<unsupported>'),
510 ('email', 'user@example.org'),
511 ('DNS', 'www.example.org'),
512 ('DirName',
513 ((('countryName', 'XY'),),
514 (('localityName', 'Castle Anthrax'),),
515 (('organizationName', 'Python Software Foundation'),),
516 (('commonName', 'dirname example'),))),
517 ('URI', 'https://www.python.org/'),
518 ('IP Address', '127.0.0.1'),
Christian Heimes2b7de662019-12-07 17:59:36 +0100519 ('IP Address', '0:0:0:0:0:0:0:1'),
Christian Heimes1c03abd2016-09-06 23:25:35 +0200520 ('Registered ID', '1.2.3.4.5')
521 )
522 )
523
Antoine Pitrou480a1242010-04-28 21:37:09 +0000524 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000525 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000526 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000527 d1 = ssl.PEM_cert_to_DER_cert(pem)
528 p2 = ssl.DER_cert_to_PEM_cert(d1)
529 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000530 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000531 if not p2.startswith(ssl.PEM_HEADER + '\n'):
532 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
533 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
534 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000535
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000536 def test_openssl_version(self):
537 n = ssl.OPENSSL_VERSION_NUMBER
538 t = ssl.OPENSSL_VERSION_INFO
539 s = ssl.OPENSSL_VERSION
540 self.assertIsInstance(n, int)
541 self.assertIsInstance(t, tuple)
542 self.assertIsInstance(s, str)
543 # Some sanity checks follow
544 # >= 0.9
545 self.assertGreaterEqual(n, 0x900000)
Christian Heimes2b7de662019-12-07 17:59:36 +0100546 # < 4.0
547 self.assertLess(n, 0x40000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000548 major, minor, fix, patch, status = t
Christian Heimes2b7de662019-12-07 17:59:36 +0100549 self.assertGreaterEqual(major, 1)
550 self.assertLess(major, 4)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000551 self.assertGreaterEqual(minor, 0)
552 self.assertLess(minor, 256)
553 self.assertGreaterEqual(fix, 0)
554 self.assertLess(fix, 256)
555 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100556 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000557 self.assertGreaterEqual(status, 0)
558 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400559 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200560 if IS_LIBRESSL:
561 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100562 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400563 else:
564 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100565 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000566
Antoine Pitrou9d543662010-04-23 23:10:32 +0000567 @support.cpython_only
568 def test_refcycle(self):
569 # Issue #7943: an SSL object doesn't create reference cycles with
570 # itself.
571 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200572 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000573 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100574 with support.check_warnings(("", ResourceWarning)):
575 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100576 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000577
Antoine Pitroua468adc2010-09-14 14:43:44 +0000578 def test_wrapped_unconnected(self):
579 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200580 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000581 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200582 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100583 self.assertRaises(OSError, ss.recv, 1)
584 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
585 self.assertRaises(OSError, ss.recvfrom, 1)
586 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
587 self.assertRaises(OSError, ss.send, b'x')
588 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200589 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100590 self.assertRaises(NotImplementedError, ss.sendmsg,
591 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200592 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
593 self.assertRaises(NotImplementedError, ss.recvmsg_into,
594 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000595
Antoine Pitrou40f08742010-04-24 22:04:40 +0000596 def test_timeout(self):
597 # Issue #8524: when creating an SSL socket, the timeout of the
598 # original socket should be retained.
599 for timeout in (None, 0.0, 5.0):
600 s = socket.socket(socket.AF_INET)
601 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200602 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100603 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000604
Christian Heimesd0486372016-09-10 23:23:33 +0200605 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000606 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000607 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000608 "certfile must be specified",
609 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000610 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000611 "certfile must be specified for server-side operations",
612 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000613 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000614 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200615 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100616 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
617 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200618 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200619 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000620 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000621 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000622 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200623 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000624 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000625 ssl.wrap_socket(sock,
626 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000627 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200628 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000629 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000630 ssl.wrap_socket(sock,
631 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000632 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000633
Martin Panter3464ea22016-02-01 21:58:11 +0000634 def bad_cert_test(self, certfile):
635 """Check that trying to use the given client certificate fails"""
636 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
637 certfile)
638 sock = socket.socket()
639 self.addCleanup(sock.close)
640 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200641 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200642 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000643
644 def test_empty_cert(self):
645 """Wrapping with an empty cert file"""
646 self.bad_cert_test("nullcert.pem")
647
648 def test_malformed_cert(self):
649 """Wrapping with a badly formatted certificate (syntax error)"""
650 self.bad_cert_test("badcert.pem")
651
652 def test_malformed_key(self):
653 """Wrapping with a badly formatted key (syntax error)"""
654 self.bad_cert_test("badkey.pem")
655
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000656 def test_match_hostname(self):
657 def ok(cert, hostname):
658 ssl.match_hostname(cert, hostname)
659 def fail(cert, hostname):
660 self.assertRaises(ssl.CertificateError,
661 ssl.match_hostname, cert, hostname)
662
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100663 # -- Hostname matching --
664
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000665 cert = {'subject': ((('commonName', 'example.com'),),)}
666 ok(cert, 'example.com')
667 ok(cert, 'ExAmple.cOm')
668 fail(cert, 'www.example.com')
669 fail(cert, '.example.com')
670 fail(cert, 'example.org')
671 fail(cert, 'exampleXcom')
672
673 cert = {'subject': ((('commonName', '*.a.com'),),)}
674 ok(cert, 'foo.a.com')
675 fail(cert, 'bar.foo.a.com')
676 fail(cert, 'a.com')
677 fail(cert, 'Xa.com')
678 fail(cert, '.a.com')
679
Mandeep Singhede2ac92017-11-27 04:01:27 +0530680 # only match wildcards when they are the only thing
681 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000682 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530683 fail(cert, 'foo.com')
684 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000685 fail(cert, 'bar.com')
686 fail(cert, 'foo.a.com')
687 fail(cert, 'bar.foo.com')
688
Christian Heimes824f7f32013-08-17 00:54:47 +0200689 # NULL bytes are bad, CVE-2013-4073
690 cert = {'subject': ((('commonName',
691 'null.python.org\x00example.org'),),)}
692 ok(cert, 'null.python.org\x00example.org') # or raise an error?
693 fail(cert, 'example.org')
694 fail(cert, 'null.python.org')
695
Georg Brandl72c98d32013-10-27 07:16:53 +0100696 # error cases with wildcards
697 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
698 fail(cert, 'bar.foo.a.com')
699 fail(cert, 'a.com')
700 fail(cert, 'Xa.com')
701 fail(cert, '.a.com')
702
703 cert = {'subject': ((('commonName', 'a.*.com'),),)}
704 fail(cert, 'a.foo.com')
705 fail(cert, 'a..com')
706 fail(cert, 'a.com')
707
708 # wildcard doesn't match IDNA prefix 'xn--'
709 idna = 'püthon.python.org'.encode("idna").decode("ascii")
710 cert = {'subject': ((('commonName', idna),),)}
711 ok(cert, idna)
712 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
713 fail(cert, idna)
714 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
715 fail(cert, idna)
716
717 # wildcard in first fragment and IDNA A-labels in sequent fragments
718 # are supported.
719 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
720 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530721 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
722 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100723 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
724 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
725
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000726 # Slightly fake real-world example
727 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
728 'subject': ((('commonName', 'linuxfrz.org'),),),
729 'subjectAltName': (('DNS', 'linuxfr.org'),
730 ('DNS', 'linuxfr.com'),
731 ('othername', '<unsupported>'))}
732 ok(cert, 'linuxfr.org')
733 ok(cert, 'linuxfr.com')
734 # Not a "DNS" entry
735 fail(cert, '<unsupported>')
736 # When there is a subjectAltName, commonName isn't used
737 fail(cert, 'linuxfrz.org')
738
739 # A pristine real-world example
740 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
741 'subject': ((('countryName', 'US'),),
742 (('stateOrProvinceName', 'California'),),
743 (('localityName', 'Mountain View'),),
744 (('organizationName', 'Google Inc'),),
745 (('commonName', 'mail.google.com'),))}
746 ok(cert, 'mail.google.com')
747 fail(cert, 'gmail.com')
748 # Only commonName is considered
749 fail(cert, 'California')
750
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100751 # -- IPv4 matching --
752 cert = {'subject': ((('commonName', 'example.com'),),),
753 'subjectAltName': (('DNS', 'example.com'),
754 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200755 ('IP Address', '14.15.16.17'),
756 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100757 ok(cert, '10.11.12.13')
758 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200759 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
760 fail(cert, '127.1')
761 fail(cert, '14.15.16.17 ')
762 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100763 fail(cert, '14.15.16.18')
764 fail(cert, 'example.net')
765
766 # -- IPv6 matching --
Serhiy Storchaka16994912020-04-25 10:06:29 +0300767 if socket_helper.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100768 cert = {'subject': ((('commonName', 'example.com'),),),
769 'subjectAltName': (
770 ('DNS', 'example.com'),
771 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
772 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
773 ok(cert, '2001::cafe')
774 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200775 fail(cert, '2003::baba ')
776 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100777 fail(cert, '2003::bebe')
778 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100779
780 # -- Miscellaneous --
781
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000782 # Neither commonName nor subjectAltName
783 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
784 'subject': ((('countryName', 'US'),),
785 (('stateOrProvinceName', 'California'),),
786 (('localityName', 'Mountain View'),),
787 (('organizationName', 'Google Inc'),))}
788 fail(cert, 'mail.google.com')
789
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200790 # No DNS entry in subjectAltName but a commonName
791 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
792 'subject': ((('countryName', 'US'),),
793 (('stateOrProvinceName', 'California'),),
794 (('localityName', 'Mountain View'),),
795 (('commonName', 'mail.google.com'),)),
796 'subjectAltName': (('othername', 'blabla'), )}
797 ok(cert, 'mail.google.com')
798
799 # No DNS entry subjectAltName and no commonName
800 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
801 'subject': ((('countryName', 'US'),),
802 (('stateOrProvinceName', 'California'),),
803 (('localityName', 'Mountain View'),),
804 (('organizationName', 'Google Inc'),)),
805 'subjectAltName': (('othername', 'blabla'),)}
806 fail(cert, 'google.com')
807
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000808 # Empty cert / no cert
809 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
810 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
811
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200812 # Issue #17980: avoid denials of service by refusing more than one
813 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100814 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
815 with self.assertRaisesRegex(
816 ssl.CertificateError,
817 "partial wildcards in leftmost label are not supported"):
818 ssl.match_hostname(cert, 'axxb.example.com')
819
820 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
821 with self.assertRaisesRegex(
822 ssl.CertificateError,
823 "wildcard can only be present in the leftmost label"):
824 ssl.match_hostname(cert, 'www.sub.example.com')
825
826 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
827 with self.assertRaisesRegex(
828 ssl.CertificateError,
829 "too many wildcards"):
830 ssl.match_hostname(cert, 'axxbxxc.example.com')
831
832 cert = {'subject': ((('commonName', '*'),),)}
833 with self.assertRaisesRegex(
834 ssl.CertificateError,
835 "sole wildcard without additional labels are not support"):
836 ssl.match_hostname(cert, 'host')
837
838 cert = {'subject': ((('commonName', '*.com'),),)}
839 with self.assertRaisesRegex(
840 ssl.CertificateError,
841 r"hostname 'com' doesn't match '\*.com'"):
842 ssl.match_hostname(cert, 'com')
843
844 # extra checks for _inet_paton()
845 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
846 with self.assertRaises(ValueError):
847 ssl._inet_paton(invalid)
848 for ipaddr in ['127.0.0.1', '192.168.0.1']:
849 self.assertTrue(ssl._inet_paton(ipaddr))
Serhiy Storchaka16994912020-04-25 10:06:29 +0300850 if socket_helper.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100851 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
852 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200853
Antoine Pitroud5323212010-10-22 18:19:07 +0000854 def test_server_side(self):
855 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200856 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000857 with socket.socket() as sock:
858 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
859 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000860
Antoine Pitroud6494802011-07-21 01:11:30 +0200861 def test_unknown_channel_binding(self):
862 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200863 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200864 c = socket.socket(socket.AF_INET)
865 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200866 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100867 with self.assertRaises(ValueError):
868 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200869 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200870
871 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
872 "'tls-unique' channel binding not available")
873 def test_tls_unique_channel_binding(self):
874 # unconnected should return None for known type
875 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200876 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100877 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200878 # the same for server-side
879 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200880 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100881 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200882
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600883 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200884 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600885 r = repr(ss)
886 with self.assertWarns(ResourceWarning) as cm:
887 ss = None
888 support.gc_collect()
889 self.assertIn(r, str(cm.warning.args[0]))
890
Christian Heimes6d7ad132013-06-09 18:02:55 +0200891 def test_get_default_verify_paths(self):
892 paths = ssl.get_default_verify_paths()
893 self.assertEqual(len(paths), 6)
894 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
895
896 with support.EnvironmentVarGuard() as env:
897 env["SSL_CERT_DIR"] = CAPATH
898 env["SSL_CERT_FILE"] = CERTFILE
899 paths = ssl.get_default_verify_paths()
900 self.assertEqual(paths.cafile, CERTFILE)
901 self.assertEqual(paths.capath, CAPATH)
902
Christian Heimes44109d72013-11-22 01:51:30 +0100903 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
904 def test_enum_certificates(self):
905 self.assertTrue(ssl.enum_certificates("CA"))
906 self.assertTrue(ssl.enum_certificates("ROOT"))
907
908 self.assertRaises(TypeError, ssl.enum_certificates)
909 self.assertRaises(WindowsError, ssl.enum_certificates, "")
910
Christian Heimesc2d65e12013-11-22 16:13:55 +0100911 trust_oids = set()
912 for storename in ("CA", "ROOT"):
913 store = ssl.enum_certificates(storename)
914 self.assertIsInstance(store, list)
915 for element in store:
916 self.assertIsInstance(element, tuple)
917 self.assertEqual(len(element), 3)
918 cert, enc, trust = element
919 self.assertIsInstance(cert, bytes)
920 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Christian Heimes915cd3f2019-09-09 18:06:55 +0200921 self.assertIsInstance(trust, (frozenset, set, bool))
922 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100923 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100924
925 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100926 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200927
Christian Heimes46bebee2013-06-09 19:03:31 +0200928 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100929 def test_enum_crls(self):
930 self.assertTrue(ssl.enum_crls("CA"))
931 self.assertRaises(TypeError, ssl.enum_crls)
932 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200933
Christian Heimes44109d72013-11-22 01:51:30 +0100934 crls = ssl.enum_crls("CA")
935 self.assertIsInstance(crls, list)
936 for element in crls:
937 self.assertIsInstance(element, tuple)
938 self.assertEqual(len(element), 2)
939 self.assertIsInstance(element[0], bytes)
940 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200941
Christian Heimes46bebee2013-06-09 19:03:31 +0200942
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100943 def test_asn1object(self):
944 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
945 '1.3.6.1.5.5.7.3.1')
946
947 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
948 self.assertEqual(val, expected)
949 self.assertEqual(val.nid, 129)
950 self.assertEqual(val.shortname, 'serverAuth')
951 self.assertEqual(val.longname, 'TLS Web Server Authentication')
952 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
953 self.assertIsInstance(val, ssl._ASN1Object)
954 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
955
956 val = ssl._ASN1Object.fromnid(129)
957 self.assertEqual(val, expected)
958 self.assertIsInstance(val, ssl._ASN1Object)
959 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100960 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
961 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100962 for i in range(1000):
963 try:
964 obj = ssl._ASN1Object.fromnid(i)
965 except ValueError:
966 pass
967 else:
968 self.assertIsInstance(obj.nid, int)
969 self.assertIsInstance(obj.shortname, str)
970 self.assertIsInstance(obj.longname, str)
971 self.assertIsInstance(obj.oid, (str, type(None)))
972
973 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
974 self.assertEqual(val, expected)
975 self.assertIsInstance(val, ssl._ASN1Object)
976 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
977 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
978 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100979 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
980 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100981
Christian Heimes72d28502013-11-23 13:56:58 +0100982 def test_purpose_enum(self):
983 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
984 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
985 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
986 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
987 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
988 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
989 '1.3.6.1.5.5.7.3.1')
990
991 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
992 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
993 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
994 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
995 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
996 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
997 '1.3.6.1.5.5.7.3.2')
998
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100999 def test_unsupported_dtls(self):
1000 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1001 self.addCleanup(s.close)
1002 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +02001003 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001004 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001006 with self.assertRaises(NotImplementedError) as cx:
1007 ctx.wrap_socket(s)
1008 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1009
Antoine Pitrouc695c952014-04-28 20:57:36 +02001010 def cert_time_ok(self, timestring, timestamp):
1011 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1012
1013 def cert_time_fail(self, timestring):
1014 with self.assertRaises(ValueError):
1015 ssl.cert_time_to_seconds(timestring)
1016
1017 @unittest.skipUnless(utc_offset(),
1018 'local time needs to be different from UTC')
1019 def test_cert_time_to_seconds_timezone(self):
1020 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1021 # results if local timezone is not UTC
1022 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1023 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1024
1025 def test_cert_time_to_seconds(self):
1026 timestring = "Jan 5 09:34:43 2018 GMT"
1027 ts = 1515144883.0
1028 self.cert_time_ok(timestring, ts)
1029 # accept keyword parameter, assert its name
1030 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1031 # accept both %e and %d (space or zero generated by strftime)
1032 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1033 # case-insensitive
1034 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1035 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1036 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1037 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1038 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1039 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1040 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1041 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1042
1043 newyear_ts = 1230768000.0
1044 # leap seconds
1045 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1046 # same timestamp
1047 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1048
1049 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1050 # allow 60th second (even if it is not a leap second)
1051 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1052 # allow 2nd leap second for compatibility with time.strptime()
1053 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1054 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1055
Mike53f7a7c2017-12-14 14:04:53 +03001056 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001057 # 99991231235959Z (rfc 5280)
1058 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1059
1060 @support.run_with_locale('LC_ALL', '')
1061 def test_cert_time_to_seconds_locale(self):
1062 # `cert_time_to_seconds()` should be locale independent
1063
1064 def local_february_name():
1065 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1066
1067 if local_february_name().lower() == 'feb':
1068 self.skipTest("locale-specific month name needs to be "
1069 "different from C locale")
1070
1071 # locale-independent
1072 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1073 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1074
Martin Panter3840b2a2016-03-27 01:53:46 +00001075 def test_connect_ex_error(self):
1076 server = socket.socket(socket.AF_INET)
1077 self.addCleanup(server.close)
Serhiy Storchaka16994912020-04-25 10:06:29 +03001078 port = socket_helper.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001079 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001080 cert_reqs=ssl.CERT_REQUIRED)
1081 self.addCleanup(s.close)
1082 rc = s.connect_ex((HOST, port))
1083 # Issue #19919: Windows machines or VMs hosted on Windows
1084 # machines sometimes return EWOULDBLOCK.
1085 errors = (
1086 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1087 errno.EWOULDBLOCK,
1088 )
1089 self.assertIn(rc, errors)
1090
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001091
Antoine Pitrou152efa22010-05-16 18:19:27 +00001092class ContextTests(unittest.TestCase):
1093
1094 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001095 for protocol in PROTOCOLS:
1096 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001097 ctx = ssl.SSLContext()
1098 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001099 self.assertRaises(ValueError, ssl.SSLContext, -1)
1100 self.assertRaises(ValueError, ssl.SSLContext, 42)
1101
1102 def test_protocol(self):
1103 for proto in PROTOCOLS:
1104 ctx = ssl.SSLContext(proto)
1105 self.assertEqual(ctx.protocol, proto)
1106
1107 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001108 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001109 ctx.set_ciphers("ALL")
1110 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001111 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001112 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001113
Christian Heimes892d66e2018-01-29 14:10:18 +01001114 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1115 "Test applies only to Python default ciphers")
1116 def test_python_ciphers(self):
1117 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1118 ciphers = ctx.get_ciphers()
1119 for suite in ciphers:
1120 name = suite['name']
1121 self.assertNotIn("PSK", name)
1122 self.assertNotIn("SRP", name)
1123 self.assertNotIn("MD5", name)
1124 self.assertNotIn("RC4", name)
1125 self.assertNotIn("3DES", name)
1126
Christian Heimes25bfcd52016-09-06 00:04:45 +02001127 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1128 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001130 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001131 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001132 self.assertIn('AES256-GCM-SHA384', names)
1133 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001134
Antoine Pitroub5218772010-05-21 09:56:06 +00001135 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001136 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001137 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001138 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001139 # SSLContext also enables these by default
1140 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001141 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1142 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001143 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001144 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001145 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001146 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001147 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1148 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001149 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001150 # Ubuntu has OP_NO_SSLv3 forced on by default
1151 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001152 else:
1153 with self.assertRaises(ValueError):
1154 ctx.options = 0
1155
Christian Heimesa170fa12017-09-15 20:27:30 +02001156 def test_verify_mode_protocol(self):
1157 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001158 # Default value
1159 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1160 ctx.verify_mode = ssl.CERT_OPTIONAL
1161 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1162 ctx.verify_mode = ssl.CERT_REQUIRED
1163 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1164 ctx.verify_mode = ssl.CERT_NONE
1165 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1166 with self.assertRaises(TypeError):
1167 ctx.verify_mode = None
1168 with self.assertRaises(ValueError):
1169 ctx.verify_mode = 42
1170
Christian Heimesa170fa12017-09-15 20:27:30 +02001171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1172 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1173 self.assertFalse(ctx.check_hostname)
1174
1175 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1176 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1177 self.assertTrue(ctx.check_hostname)
1178
Christian Heimes61d478c2018-01-27 15:51:38 +01001179 def test_hostname_checks_common_name(self):
1180 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1181 self.assertTrue(ctx.hostname_checks_common_name)
1182 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1183 ctx.hostname_checks_common_name = True
1184 self.assertTrue(ctx.hostname_checks_common_name)
1185 ctx.hostname_checks_common_name = False
1186 self.assertFalse(ctx.hostname_checks_common_name)
1187 ctx.hostname_checks_common_name = True
1188 self.assertTrue(ctx.hostname_checks_common_name)
1189 else:
1190 with self.assertRaises(AttributeError):
1191 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001192
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001193 @requires_minimum_version
Christian Heimesc9bc49c2019-09-11 19:24:47 +02001194 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001195 def test_min_max_version(self):
1196 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001197 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1198 # Fedora override the setting to TLS 1.0.
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001199 minimum_range = {
1200 # stock OpenSSL
1201 ssl.TLSVersion.MINIMUM_SUPPORTED,
1202 # Fedora 29 uses TLS 1.0 by default
1203 ssl.TLSVersion.TLSv1,
1204 # RHEL 8 uses TLS 1.2 by default
1205 ssl.TLSVersion.TLSv1_2
1206 }
torsava34864d12019-12-02 17:15:42 +01001207 maximum_range = {
1208 # stock OpenSSL
1209 ssl.TLSVersion.MAXIMUM_SUPPORTED,
1210 # Fedora 32 uses TLS 1.3 by default
1211 ssl.TLSVersion.TLSv1_3
1212 }
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001213
Christian Heimes34de2d32019-01-18 16:09:30 +01001214 self.assertIn(
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001215 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001216 )
torsava34864d12019-12-02 17:15:42 +01001217 self.assertIn(
1218 ctx.maximum_version, maximum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001219 )
1220
1221 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1222 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1223 self.assertEqual(
1224 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1225 )
1226 self.assertEqual(
1227 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1228 )
1229
1230 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1231 ctx.maximum_version = ssl.TLSVersion.TLSv1
1232 self.assertEqual(
1233 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1234 )
1235 self.assertEqual(
1236 ctx.maximum_version, ssl.TLSVersion.TLSv1
1237 )
1238
1239 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1240 self.assertEqual(
1241 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1242 )
1243
1244 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1245 self.assertIn(
1246 ctx.maximum_version,
1247 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1248 )
1249
1250 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1251 self.assertIn(
1252 ctx.minimum_version,
1253 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1254 )
1255
1256 with self.assertRaises(ValueError):
1257 ctx.minimum_version = 42
1258
1259 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1260
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001261 self.assertIn(
1262 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001263 )
1264 self.assertEqual(
1265 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1266 )
1267 with self.assertRaises(ValueError):
1268 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1269 with self.assertRaises(ValueError):
1270 ctx.maximum_version = ssl.TLSVersion.TLSv1
1271
1272
matthewhughes9348e836bb2020-07-17 09:59:15 +01001273 @unittest.skipUnless(
1274 hasattr(ssl.SSLContext, 'security_level'),
1275 "requires OpenSSL >= 1.1.0"
1276 )
1277 def test_security_level(self):
1278 ctx = ssl.SSLContext()
1279 # The default security callback allows for levels between 0-5
1280 # with OpenSSL defaulting to 1, however some vendors override the
1281 # default value (e.g. Debian defaults to 2)
1282 security_level_range = {
1283 0,
1284 1, # OpenSSL default
1285 2, # Debian
1286 3,
1287 4,
1288 5,
1289 }
1290 self.assertIn(ctx.security_level, security_level_range)
1291
Christian Heimes2427b502013-11-23 11:24:32 +01001292 @unittest.skipUnless(have_verify_flags(),
1293 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001294 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001295 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001296 # default value
1297 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1298 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001299 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1300 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1301 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1302 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1303 ctx.verify_flags = ssl.VERIFY_DEFAULT
1304 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1305 # supports any value
1306 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1307 self.assertEqual(ctx.verify_flags,
1308 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1309 with self.assertRaises(TypeError):
1310 ctx.verify_flags = None
1311
Antoine Pitrou152efa22010-05-16 18:19:27 +00001312 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001314 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001315 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001316 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1317 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001318 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001319 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001320 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001321 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001322 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001323 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001324 ctx.load_cert_chain(EMPTYCERT)
1325 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001327 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1328 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1329 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001330 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001331 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001332 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001333 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001334 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001335 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1336 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001337 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001338 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001339 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001340 # Password protected key and cert
1341 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1342 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1343 ctx.load_cert_chain(CERTFILE_PROTECTED,
1344 password=bytearray(KEY_PASSWORD.encode()))
1345 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1346 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1347 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1348 bytearray(KEY_PASSWORD.encode()))
1349 with self.assertRaisesRegex(TypeError, "should be a string"):
1350 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1351 with self.assertRaises(ssl.SSLError):
1352 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1353 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1354 # openssl has a fixed limit on the password buffer.
1355 # PEM_BUFSIZE is generally set to 1kb.
1356 # Return a string larger than this.
1357 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1358 # Password callback
1359 def getpass_unicode():
1360 return KEY_PASSWORD
1361 def getpass_bytes():
1362 return KEY_PASSWORD.encode()
1363 def getpass_bytearray():
1364 return bytearray(KEY_PASSWORD.encode())
1365 def getpass_badpass():
1366 return "badpass"
1367 def getpass_huge():
1368 return b'a' * (1024 * 1024)
1369 def getpass_bad_type():
1370 return 9
1371 def getpass_exception():
1372 raise Exception('getpass error')
1373 class GetPassCallable:
1374 def __call__(self):
1375 return KEY_PASSWORD
1376 def getpass(self):
1377 return KEY_PASSWORD
1378 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1379 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1380 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1381 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1382 ctx.load_cert_chain(CERTFILE_PROTECTED,
1383 password=GetPassCallable().getpass)
1384 with self.assertRaises(ssl.SSLError):
1385 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1386 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1387 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1388 with self.assertRaisesRegex(TypeError, "must return a string"):
1389 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1390 with self.assertRaisesRegex(Exception, "getpass error"):
1391 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1392 # Make sure the password function isn't called if it isn't needed
1393 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001394
1395 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001396 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001397 ctx.load_verify_locations(CERTFILE)
1398 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1399 ctx.load_verify_locations(BYTES_CERTFILE)
1400 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1401 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001402 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001403 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001404 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001405 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001406 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001407 ctx.load_verify_locations(BADCERT)
1408 ctx.load_verify_locations(CERTFILE, CAPATH)
1409 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1410
Victor Stinner80f75e62011-01-29 11:31:20 +00001411 # Issue #10989: crash if the second argument type is invalid
1412 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1413
Christian Heimesefff7062013-11-21 03:35:02 +01001414 def test_load_verify_cadata(self):
1415 # test cadata
1416 with open(CAFILE_CACERT) as f:
1417 cacert_pem = f.read()
1418 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1419 with open(CAFILE_NEURONIO) as f:
1420 neuronio_pem = f.read()
1421 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1422
1423 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001424 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001425 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1426 ctx.load_verify_locations(cadata=cacert_pem)
1427 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1428 ctx.load_verify_locations(cadata=neuronio_pem)
1429 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1430 # cert already in hash table
1431 ctx.load_verify_locations(cadata=neuronio_pem)
1432 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1433
1434 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001435 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001436 combined = "\n".join((cacert_pem, neuronio_pem))
1437 ctx.load_verify_locations(cadata=combined)
1438 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1439
1440 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001441 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001442 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1443 neuronio_pem, "tail"]
1444 ctx.load_verify_locations(cadata="\n".join(combined))
1445 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1446
1447 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001449 ctx.load_verify_locations(cadata=cacert_der)
1450 ctx.load_verify_locations(cadata=neuronio_der)
1451 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1452 # cert already in hash table
1453 ctx.load_verify_locations(cadata=cacert_der)
1454 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1455
1456 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001457 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001458 combined = b"".join((cacert_der, neuronio_der))
1459 ctx.load_verify_locations(cadata=combined)
1460 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1461
1462 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001463 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001464 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1465
1466 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1467 ctx.load_verify_locations(cadata="broken")
1468 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1469 ctx.load_verify_locations(cadata=b"broken")
1470
1471
Paul Monsonf3550692019-06-19 13:09:54 -07001472 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001473 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001475 ctx.load_dh_params(DHFILE)
1476 if os.name != 'nt':
1477 ctx.load_dh_params(BYTES_DHFILE)
1478 self.assertRaises(TypeError, ctx.load_dh_params)
1479 self.assertRaises(TypeError, ctx.load_dh_params, None)
1480 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001481 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001482 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001483 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001484 ctx.load_dh_params(CERTFILE)
1485
Antoine Pitroub0182c82010-10-12 20:09:02 +00001486 def test_session_stats(self):
1487 for proto in PROTOCOLS:
1488 ctx = ssl.SSLContext(proto)
1489 self.assertEqual(ctx.session_stats(), {
1490 'number': 0,
1491 'connect': 0,
1492 'connect_good': 0,
1493 'connect_renegotiate': 0,
1494 'accept': 0,
1495 'accept_good': 0,
1496 'accept_renegotiate': 0,
1497 'hits': 0,
1498 'misses': 0,
1499 'timeouts': 0,
1500 'cache_full': 0,
1501 })
1502
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001503 def test_set_default_verify_paths(self):
1504 # There's not much we can do to test that it acts as expected,
1505 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001506 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001507 ctx.set_default_verify_paths()
1508
Antoine Pitrou501da612011-12-21 09:27:41 +01001509 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001510 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001511 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001512 ctx.set_ecdh_curve("prime256v1")
1513 ctx.set_ecdh_curve(b"prime256v1")
1514 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1515 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1516 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1517 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1518
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001519 @needs_sni
1520 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001521 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001522
1523 # set_servername_callback expects a callable, or None
1524 self.assertRaises(TypeError, ctx.set_servername_callback)
1525 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1526 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1527 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1528
1529 def dummycallback(sock, servername, ctx):
1530 pass
1531 ctx.set_servername_callback(None)
1532 ctx.set_servername_callback(dummycallback)
1533
1534 @needs_sni
1535 def test_sni_callback_refcycle(self):
1536 # Reference cycles through the servername callback are detected
1537 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001539 def dummycallback(sock, servername, ctx, cycle=ctx):
1540 pass
1541 ctx.set_servername_callback(dummycallback)
1542 wr = weakref.ref(ctx)
1543 del ctx, dummycallback
1544 gc.collect()
1545 self.assertIs(wr(), None)
1546
Christian Heimes9a5395a2013-06-17 15:44:12 +02001547 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001548 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001549 self.assertEqual(ctx.cert_store_stats(),
1550 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1551 ctx.load_cert_chain(CERTFILE)
1552 self.assertEqual(ctx.cert_store_stats(),
1553 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1554 ctx.load_verify_locations(CERTFILE)
1555 self.assertEqual(ctx.cert_store_stats(),
1556 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001557 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001558 self.assertEqual(ctx.cert_store_stats(),
1559 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1560
1561 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001562 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001563 self.assertEqual(ctx.get_ca_certs(), [])
1564 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1565 ctx.load_verify_locations(CERTFILE)
1566 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001567 # but CAFILE_CACERT is a CA cert
1568 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001569 self.assertEqual(ctx.get_ca_certs(),
1570 [{'issuer': ((('organizationName', 'Root CA'),),
1571 (('organizationalUnitName', 'http://www.cacert.org'),),
1572 (('commonName', 'CA Cert Signing Authority'),),
1573 (('emailAddress', 'support@cacert.org'),)),
1574 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1575 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1576 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001577 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001578 'subject': ((('organizationName', 'Root CA'),),
1579 (('organizationalUnitName', 'http://www.cacert.org'),),
1580 (('commonName', 'CA Cert Signing Authority'),),
1581 (('emailAddress', 'support@cacert.org'),)),
1582 'version': 3}])
1583
Martin Panterb55f8b72016-01-14 12:53:56 +00001584 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001585 pem = f.read()
1586 der = ssl.PEM_cert_to_DER_cert(pem)
1587 self.assertEqual(ctx.get_ca_certs(True), [der])
1588
Christian Heimes72d28502013-11-23 13:56:58 +01001589 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001590 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001591 ctx.load_default_certs()
1592
Christian Heimesa170fa12017-09-15 20:27:30 +02001593 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001594 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1595 ctx.load_default_certs()
1596
Christian Heimesa170fa12017-09-15 20:27:30 +02001597 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001598 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1599
Christian Heimesa170fa12017-09-15 20:27:30 +02001600 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001601 self.assertRaises(TypeError, ctx.load_default_certs, None)
1602 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1603
Benjamin Peterson91244e02014-10-03 18:17:15 -04001604 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001605 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001606 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001607 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001608 with support.EnvironmentVarGuard() as env:
1609 env["SSL_CERT_DIR"] = CAPATH
1610 env["SSL_CERT_FILE"] = CERTFILE
1611 ctx.load_default_certs()
1612 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1613
Benjamin Peterson91244e02014-10-03 18:17:15 -04001614 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001615 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001616 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001618 ctx.load_default_certs()
1619 stats = ctx.cert_store_stats()
1620
Christian Heimesa170fa12017-09-15 20:27:30 +02001621 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001622 with support.EnvironmentVarGuard() as env:
1623 env["SSL_CERT_DIR"] = CAPATH
1624 env["SSL_CERT_FILE"] = CERTFILE
1625 ctx.load_default_certs()
1626 stats["x509"] += 1
1627 self.assertEqual(ctx.cert_store_stats(), stats)
1628
Christian Heimes358cfd42016-09-10 22:43:48 +02001629 def _assert_context_options(self, ctx):
1630 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1631 if OP_NO_COMPRESSION != 0:
1632 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1633 OP_NO_COMPRESSION)
1634 if OP_SINGLE_DH_USE != 0:
1635 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1636 OP_SINGLE_DH_USE)
1637 if OP_SINGLE_ECDH_USE != 0:
1638 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1639 OP_SINGLE_ECDH_USE)
1640 if OP_CIPHER_SERVER_PREFERENCE != 0:
1641 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1642 OP_CIPHER_SERVER_PREFERENCE)
1643
Christian Heimes4c05b472013-11-23 15:58:30 +01001644 def test_create_default_context(self):
1645 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001646
Christian Heimesa170fa12017-09-15 20:27:30 +02001647 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001648 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001649 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001650 self._assert_context_options(ctx)
1651
Christian Heimes4c05b472013-11-23 15:58:30 +01001652 with open(SIGNING_CA) as f:
1653 cadata = f.read()
1654 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1655 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001656 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001657 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001658 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001659
1660 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001661 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001662 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001663 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001664
Christian Heimes67986f92013-11-23 22:43:47 +01001665 def test__create_stdlib_context(self):
1666 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001667 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001668 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001669 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001670 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001671
1672 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1673 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1674 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001675 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001676
1677 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001678 cert_reqs=ssl.CERT_REQUIRED,
1679 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001680 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1681 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001682 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001683 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001684
1685 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001686 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001687 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001688 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001689
Christian Heimes1aa9a752013-12-02 02:41:19 +01001690 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001691 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001692 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001693 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001694
Christian Heimese82c0342017-09-15 20:29:57 +02001695 # Auto set CERT_REQUIRED
1696 ctx.check_hostname = True
1697 self.assertTrue(ctx.check_hostname)
1698 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1699 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001700 ctx.verify_mode = ssl.CERT_REQUIRED
1701 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001702 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001703
Christian Heimese82c0342017-09-15 20:29:57 +02001704 # Changing verify_mode does not affect check_hostname
1705 ctx.check_hostname = False
1706 ctx.verify_mode = ssl.CERT_NONE
1707 ctx.check_hostname = False
1708 self.assertFalse(ctx.check_hostname)
1709 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1710 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001711 ctx.check_hostname = True
1712 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001713 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1714
1715 ctx.check_hostname = False
1716 ctx.verify_mode = ssl.CERT_OPTIONAL
1717 ctx.check_hostname = False
1718 self.assertFalse(ctx.check_hostname)
1719 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1720 # keep CERT_OPTIONAL
1721 ctx.check_hostname = True
1722 self.assertTrue(ctx.check_hostname)
1723 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001724
1725 # Cannot set CERT_NONE with check_hostname enabled
1726 with self.assertRaises(ValueError):
1727 ctx.verify_mode = ssl.CERT_NONE
1728 ctx.check_hostname = False
1729 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001730 ctx.verify_mode = ssl.CERT_NONE
1731 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001732
Christian Heimes5fe668c2016-09-12 00:01:11 +02001733 def test_context_client_server(self):
1734 # PROTOCOL_TLS_CLIENT has sane defaults
1735 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1736 self.assertTrue(ctx.check_hostname)
1737 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1738
1739 # PROTOCOL_TLS_SERVER has different but also sane defaults
1740 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1741 self.assertFalse(ctx.check_hostname)
1742 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1743
Christian Heimes4df60f12017-09-15 20:26:05 +02001744 def test_context_custom_class(self):
1745 class MySSLSocket(ssl.SSLSocket):
1746 pass
1747
1748 class MySSLObject(ssl.SSLObject):
1749 pass
1750
1751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1752 ctx.sslsocket_class = MySSLSocket
1753 ctx.sslobject_class = MySSLObject
1754
1755 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1756 self.assertIsInstance(sock, MySSLSocket)
1757 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1758 self.assertIsInstance(obj, MySSLObject)
1759
Christian Heimes78c7d522019-06-03 21:00:10 +02001760 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1761 def test_num_tickest(self):
1762 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1763 self.assertEqual(ctx.num_tickets, 2)
1764 ctx.num_tickets = 1
1765 self.assertEqual(ctx.num_tickets, 1)
1766 ctx.num_tickets = 0
1767 self.assertEqual(ctx.num_tickets, 0)
1768 with self.assertRaises(ValueError):
1769 ctx.num_tickets = -1
1770 with self.assertRaises(TypeError):
1771 ctx.num_tickets = None
1772
1773 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1774 self.assertEqual(ctx.num_tickets, 2)
1775 with self.assertRaises(ValueError):
1776 ctx.num_tickets = 1
1777
Antoine Pitrou152efa22010-05-16 18:19:27 +00001778
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001779class SSLErrorTests(unittest.TestCase):
1780
1781 def test_str(self):
1782 # The str() of a SSLError doesn't include the errno
1783 e = ssl.SSLError(1, "foo")
1784 self.assertEqual(str(e), "foo")
1785 self.assertEqual(e.errno, 1)
1786 # Same for a subclass
1787 e = ssl.SSLZeroReturnError(1, "foo")
1788 self.assertEqual(str(e), "foo")
1789 self.assertEqual(e.errno, 1)
1790
Paul Monsonf3550692019-06-19 13:09:54 -07001791 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001792 def test_lib_reason(self):
1793 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001794 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001795 with self.assertRaises(ssl.SSLError) as cm:
1796 ctx.load_dh_params(CERTFILE)
1797 self.assertEqual(cm.exception.library, 'PEM')
1798 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1799 s = str(cm.exception)
1800 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1801
1802 def test_subclass(self):
1803 # Check that the appropriate SSLError subclass is raised
1804 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001805 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1806 ctx.check_hostname = False
1807 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001808 with socket.create_server(("127.0.0.1", 0)) as s:
1809 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001810 c.setblocking(False)
1811 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001812 with self.assertRaises(ssl.SSLWantReadError) as cm:
1813 c.do_handshake()
1814 s = str(cm.exception)
1815 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1816 # For compatibility
1817 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1818
1819
Christian Heimes61d478c2018-01-27 15:51:38 +01001820 def test_bad_server_hostname(self):
1821 ctx = ssl.create_default_context()
1822 with self.assertRaises(ValueError):
1823 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1824 server_hostname="")
1825 with self.assertRaises(ValueError):
1826 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1827 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001828 with self.assertRaises(TypeError):
1829 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1830 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001831
1832
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001833class MemoryBIOTests(unittest.TestCase):
1834
1835 def test_read_write(self):
1836 bio = ssl.MemoryBIO()
1837 bio.write(b'foo')
1838 self.assertEqual(bio.read(), b'foo')
1839 self.assertEqual(bio.read(), b'')
1840 bio.write(b'foo')
1841 bio.write(b'bar')
1842 self.assertEqual(bio.read(), b'foobar')
1843 self.assertEqual(bio.read(), b'')
1844 bio.write(b'baz')
1845 self.assertEqual(bio.read(2), b'ba')
1846 self.assertEqual(bio.read(1), b'z')
1847 self.assertEqual(bio.read(1), b'')
1848
1849 def test_eof(self):
1850 bio = ssl.MemoryBIO()
1851 self.assertFalse(bio.eof)
1852 self.assertEqual(bio.read(), b'')
1853 self.assertFalse(bio.eof)
1854 bio.write(b'foo')
1855 self.assertFalse(bio.eof)
1856 bio.write_eof()
1857 self.assertFalse(bio.eof)
1858 self.assertEqual(bio.read(2), b'fo')
1859 self.assertFalse(bio.eof)
1860 self.assertEqual(bio.read(1), b'o')
1861 self.assertTrue(bio.eof)
1862 self.assertEqual(bio.read(), b'')
1863 self.assertTrue(bio.eof)
1864
1865 def test_pending(self):
1866 bio = ssl.MemoryBIO()
1867 self.assertEqual(bio.pending, 0)
1868 bio.write(b'foo')
1869 self.assertEqual(bio.pending, 3)
1870 for i in range(3):
1871 bio.read(1)
1872 self.assertEqual(bio.pending, 3-i-1)
1873 for i in range(3):
1874 bio.write(b'x')
1875 self.assertEqual(bio.pending, i+1)
1876 bio.read()
1877 self.assertEqual(bio.pending, 0)
1878
1879 def test_buffer_types(self):
1880 bio = ssl.MemoryBIO()
1881 bio.write(b'foo')
1882 self.assertEqual(bio.read(), b'foo')
1883 bio.write(bytearray(b'bar'))
1884 self.assertEqual(bio.read(), b'bar')
1885 bio.write(memoryview(b'baz'))
1886 self.assertEqual(bio.read(), b'baz')
1887
1888 def test_error_types(self):
1889 bio = ssl.MemoryBIO()
1890 self.assertRaises(TypeError, bio.write, 'foo')
1891 self.assertRaises(TypeError, bio.write, None)
1892 self.assertRaises(TypeError, bio.write, True)
1893 self.assertRaises(TypeError, bio.write, 1)
1894
1895
Christian Heimes9d50ab52018-02-27 10:17:30 +01001896class SSLObjectTests(unittest.TestCase):
1897 def test_private_init(self):
1898 bio = ssl.MemoryBIO()
1899 with self.assertRaisesRegex(TypeError, "public constructor"):
1900 ssl.SSLObject(bio, bio)
1901
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001902 def test_unwrap(self):
1903 client_ctx, server_ctx, hostname = testing_context()
1904 c_in = ssl.MemoryBIO()
1905 c_out = ssl.MemoryBIO()
1906 s_in = ssl.MemoryBIO()
1907 s_out = ssl.MemoryBIO()
1908 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1909 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1910
1911 # Loop on the handshake for a bit to get it settled
1912 for _ in range(5):
1913 try:
1914 client.do_handshake()
1915 except ssl.SSLWantReadError:
1916 pass
1917 if c_out.pending:
1918 s_in.write(c_out.read())
1919 try:
1920 server.do_handshake()
1921 except ssl.SSLWantReadError:
1922 pass
1923 if s_out.pending:
1924 c_in.write(s_out.read())
1925 # Now the handshakes should be complete (don't raise WantReadError)
1926 client.do_handshake()
1927 server.do_handshake()
1928
1929 # Now if we unwrap one side unilaterally, it should send close-notify
1930 # and raise WantReadError:
1931 with self.assertRaises(ssl.SSLWantReadError):
1932 client.unwrap()
1933
1934 # But server.unwrap() does not raise, because it reads the client's
1935 # close-notify:
1936 s_in.write(c_out.read())
1937 server.unwrap()
1938
1939 # And now that the client gets the server's close-notify, it doesn't
1940 # raise either.
1941 c_in.write(s_out.read())
1942 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001943
Martin Panter3840b2a2016-03-27 01:53:46 +00001944class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001945 """Tests that connect to a simple server running in the background"""
1946
1947 def setUp(self):
1948 server = ThreadedEchoServer(SIGNED_CERTFILE)
1949 self.server_addr = (HOST, server.port)
1950 server.__enter__()
1951 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001952
Antoine Pitrou480a1242010-04-28 21:37:09 +00001953 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001954 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 cert_reqs=ssl.CERT_NONE) as s:
1956 s.connect(self.server_addr)
1957 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001958 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001959
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001961 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 cert_reqs=ssl.CERT_REQUIRED,
1963 ca_certs=SIGNING_CA) as s:
1964 s.connect(self.server_addr)
1965 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001966 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001967
Martin Panter3840b2a2016-03-27 01:53:46 +00001968 def test_connect_fail(self):
1969 # This should fail because we have no verification certs. Connection
1970 # failure crashes ThreadedEchoServer, so run this in an independent
1971 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001972 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001973 cert_reqs=ssl.CERT_REQUIRED)
1974 self.addCleanup(s.close)
1975 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1976 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001977
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001978 def test_connect_ex(self):
1979 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001980 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001981 cert_reqs=ssl.CERT_REQUIRED,
1982 ca_certs=SIGNING_CA)
1983 self.addCleanup(s.close)
1984 self.assertEqual(0, s.connect_ex(self.server_addr))
1985 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001986
1987 def test_non_blocking_connect_ex(self):
1988 # Issue #11326: non-blocking connect_ex() should allow handshake
1989 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001990 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 cert_reqs=ssl.CERT_REQUIRED,
1992 ca_certs=SIGNING_CA,
1993 do_handshake_on_connect=False)
1994 self.addCleanup(s.close)
1995 s.setblocking(False)
1996 rc = s.connect_ex(self.server_addr)
1997 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1998 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1999 # Wait for connect to finish
2000 select.select([], [s], [], 5.0)
2001 # Non-blocking handshake
2002 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00002003 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00002004 s.do_handshake()
2005 break
2006 except ssl.SSLWantReadError:
2007 select.select([s], [], [], 5.0)
2008 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00002009 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 # SSL established
2011 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01002012
Antoine Pitrou152efa22010-05-16 18:19:27 +00002013 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002014 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02002015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2017 s.connect(self.server_addr)
2018 self.assertEqual({}, s.getpeercert())
2019 # Same with a server hostname
2020 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2021 server_hostname="dummy") as s:
2022 s.connect(self.server_addr)
2023 ctx.verify_mode = ssl.CERT_REQUIRED
2024 # This should succeed because we specify the root cert
2025 ctx.load_verify_locations(SIGNING_CA)
2026 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2027 s.connect(self.server_addr)
2028 cert = s.getpeercert()
2029 self.assertTrue(cert)
2030
2031 def test_connect_with_context_fail(self):
2032 # This should fail because we have no verification certs. Connection
2033 # failure crashes ThreadedEchoServer, so run this in an independent
2034 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002035 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002036 ctx.verify_mode = ssl.CERT_REQUIRED
2037 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2038 self.addCleanup(s.close)
2039 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2040 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002041
2042 def test_connect_capath(self):
2043 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002044 # NOTE: the subject hashing algorithm has been changed between
2045 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2046 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002047 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002049 ctx.verify_mode = ssl.CERT_REQUIRED
2050 ctx.load_verify_locations(capath=CAPATH)
2051 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2052 s.connect(self.server_addr)
2053 cert = s.getpeercert()
2054 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002055
Martin Panter3840b2a2016-03-27 01:53:46 +00002056 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002058 ctx.verify_mode = ssl.CERT_REQUIRED
2059 ctx.load_verify_locations(capath=BYTES_CAPATH)
2060 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2061 s.connect(self.server_addr)
2062 cert = s.getpeercert()
2063 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002064
Christian Heimesefff7062013-11-21 03:35:02 +01002065 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002066 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002067 pem = f.read()
2068 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002069 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002070 ctx.verify_mode = ssl.CERT_REQUIRED
2071 ctx.load_verify_locations(cadata=pem)
2072 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2073 s.connect(self.server_addr)
2074 cert = s.getpeercert()
2075 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002076
Martin Panter3840b2a2016-03-27 01:53:46 +00002077 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002079 ctx.verify_mode = ssl.CERT_REQUIRED
2080 ctx.load_verify_locations(cadata=der)
2081 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2082 s.connect(self.server_addr)
2083 cert = s.getpeercert()
2084 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002085
Antoine Pitroue3220242010-04-24 11:13:53 +00002086 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2087 def test_makefile_close(self):
2088 # Issue #5238: creating a file-like object with makefile() shouldn't
2089 # delay closing the underlying "real socket" (here tested with its
2090 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002091 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002092 ss.connect(self.server_addr)
2093 fd = ss.fileno()
2094 f = ss.makefile()
2095 f.close()
2096 # The fd is still open
2097 os.read(fd, 0)
2098 # Closing the SSL socket should close the fd too
2099 ss.close()
2100 gc.collect()
2101 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002102 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002103 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002104
Antoine Pitrou480a1242010-04-28 21:37:09 +00002105 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002106 s = socket.socket(socket.AF_INET)
2107 s.connect(self.server_addr)
2108 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002109 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 cert_reqs=ssl.CERT_NONE,
2111 do_handshake_on_connect=False)
2112 self.addCleanup(s.close)
2113 count = 0
2114 while True:
2115 try:
2116 count += 1
2117 s.do_handshake()
2118 break
2119 except ssl.SSLWantReadError:
2120 select.select([s], [], [])
2121 except ssl.SSLWantWriteError:
2122 select.select([], [s], [])
2123 if support.verbose:
2124 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002125
Antoine Pitrou480a1242010-04-28 21:37:09 +00002126 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002127 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002128
Martin Panter3840b2a2016-03-27 01:53:46 +00002129 def test_get_server_certificate_fail(self):
2130 # Connection failure crashes ThreadedEchoServer, so run this in an
2131 # independent test method
2132 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002133
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002134 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002135 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002136 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2137 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002138 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002139 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2140 s.connect(self.server_addr)
2141 # Error checking can happen at instantiation or when connecting
2142 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2143 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002144 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002145 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2146 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002147
Christian Heimes9a5395a2013-06-17 15:44:12 +02002148 def test_get_ca_certs_capath(self):
2149 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002150 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002151 ctx.load_verify_locations(capath=CAPATH)
2152 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002153 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2154 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002155 s.connect(self.server_addr)
2156 cert = s.getpeercert()
2157 self.assertTrue(cert)
2158 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002159
Christian Heimes575596e2013-12-15 21:49:17 +01002160 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002161 def test_context_setget(self):
2162 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002163 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2164 ctx1.load_verify_locations(capath=CAPATH)
2165 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2166 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002167 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002168 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002169 ss.connect(self.server_addr)
2170 self.assertIs(ss.context, ctx1)
2171 self.assertIs(ss._sslobj.context, ctx1)
2172 ss.context = ctx2
2173 self.assertIs(ss.context, ctx2)
2174 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002175
2176 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2177 # A simple IO loop. Call func(*args) depending on the error we get
2178 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
Victor Stinner0d63bac2019-12-11 11:30:03 +01002179 timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
Victor Stinner50a72af2017-09-11 09:34:24 -07002180 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002181 count = 0
2182 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002183 if time.monotonic() > deadline:
2184 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002185 errno = None
2186 count += 1
2187 try:
2188 ret = func(*args)
2189 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002190 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002191 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002192 raise
2193 errno = e.errno
2194 # Get any data from the outgoing BIO irrespective of any error, and
2195 # send it to the socket.
2196 buf = outgoing.read()
2197 sock.sendall(buf)
2198 # If there's no error, we're done. For WANT_READ, we need to get
2199 # data from the socket and put it in the incoming BIO.
2200 if errno is None:
2201 break
2202 elif errno == ssl.SSL_ERROR_WANT_READ:
2203 buf = sock.recv(32768)
2204 if buf:
2205 incoming.write(buf)
2206 else:
2207 incoming.write_eof()
2208 if support.verbose:
2209 sys.stdout.write("Needed %d calls to complete %s().\n"
2210 % (count, func.__name__))
2211 return ret
2212
Martin Panter3840b2a2016-03-27 01:53:46 +00002213 def test_bio_handshake(self):
2214 sock = socket.socket(socket.AF_INET)
2215 self.addCleanup(sock.close)
2216 sock.connect(self.server_addr)
2217 incoming = ssl.MemoryBIO()
2218 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002219 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2220 self.assertTrue(ctx.check_hostname)
2221 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002222 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002223 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2224 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002225 self.assertIs(sslobj._sslobj.owner, sslobj)
2226 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002227 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002228 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002229 self.assertRaises(ValueError, sslobj.getpeercert)
2230 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2231 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2232 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2233 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002234 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002235 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002236 self.assertTrue(sslobj.getpeercert())
2237 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2238 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2239 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002240 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002241 except ssl.SSLSyscallError:
2242 # If the server shuts down the TCP connection without sending a
2243 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2244 pass
2245 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2246
2247 def test_bio_read_write_data(self):
2248 sock = socket.socket(socket.AF_INET)
2249 self.addCleanup(sock.close)
2250 sock.connect(self.server_addr)
2251 incoming = ssl.MemoryBIO()
2252 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002253 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002254 ctx.verify_mode = ssl.CERT_NONE
2255 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2256 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2257 req = b'FOO\n'
2258 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2259 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2260 self.assertEqual(buf, b'foo\n')
2261 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002262
2263
Martin Panter3840b2a2016-03-27 01:53:46 +00002264class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002265
Martin Panter3840b2a2016-03-27 01:53:46 +00002266 def test_timeout_connect_ex(self):
2267 # Issue #12065: on a timeout, connect_ex() should return the original
2268 # errno (mimicking the behaviour of non-SSL sockets).
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +03002269 with socket_helper.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002270 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002271 cert_reqs=ssl.CERT_REQUIRED,
2272 do_handshake_on_connect=False)
2273 self.addCleanup(s.close)
2274 s.settimeout(0.0000001)
2275 rc = s.connect_ex((REMOTE_HOST, 443))
2276 if rc == 0:
2277 self.skipTest("REMOTE_HOST responded too quickly")
2278 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2279
Serhiy Storchaka16994912020-04-25 10:06:29 +03002280 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6')
Martin Panter3840b2a2016-03-27 01:53:46 +00002281 def test_get_server_certificate_ipv6(self):
Serhiy Storchakabfb1cf42020-04-29 10:36:20 +03002282 with socket_helper.transient_internet('ipv6.google.com'):
Martin Panter3840b2a2016-03-27 01:53:46 +00002283 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2284 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2285
Martin Panter3840b2a2016-03-27 01:53:46 +00002286
2287def _test_get_server_certificate(test, host, port, cert=None):
2288 pem = ssl.get_server_certificate((host, port))
2289 if not pem:
2290 test.fail("No server certificate on %s:%s!" % (host, port))
2291
2292 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2293 if not pem:
2294 test.fail("No server certificate on %s:%s!" % (host, port))
2295 if support.verbose:
2296 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2297
2298def _test_get_server_certificate_fail(test, host, port):
2299 try:
2300 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2301 except ssl.SSLError as x:
2302 #should fail
2303 if support.verbose:
2304 sys.stdout.write("%s\n" % x)
2305 else:
2306 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2307
2308
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002309from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002310
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002311class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002312
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002313 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002314
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002315 """A mildly complicated class, because we want it to work both
2316 with and without the SSL wrapper around the socket connection, so
2317 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002318
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002319 def __init__(self, server, connsock, addr):
2320 self.server = server
2321 self.running = False
2322 self.sock = connsock
2323 self.addr = addr
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03002324 self.sock.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002326 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002327 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002329 def wrap_conn(self):
2330 try:
2331 self.sslconn = self.server.context.wrap_socket(
2332 self.sock, server_side=True)
2333 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2334 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002335 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002336 # We treat ConnectionResetError as though it were an
2337 # SSLError - OpenSSL on Ubuntu abruptly closes the
2338 # connection when asked to use an unsupported protocol.
2339 #
Christian Heimes529525f2018-05-23 22:24:45 +02002340 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2341 # tries to send session tickets after handshake.
2342 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002343 #
2344 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2345 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002346 self.server.conn_errors.append(str(e))
2347 if self.server.chatty:
2348 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2349 self.running = False
2350 self.close()
2351 return False
2352 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002353 # OSError may occur with wrong protocols, e.g. both
2354 # sides use PROTOCOL_TLS_SERVER.
2355 #
2356 # XXX Various errors can have happened here, for example
2357 # a mismatching protocol version, an invalid certificate,
2358 # or a low-level bug. This should be made more discriminating.
2359 #
2360 # bpo-31323: Store the exception as string to prevent
2361 # a reference leak: server -> conn_errors -> exception
2362 # -> traceback -> self (ConnectionHandler) -> server
2363 self.server.conn_errors.append(str(e))
2364 if self.server.chatty:
2365 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2366 self.running = False
2367 self.server.stop()
2368 self.close()
2369 return False
2370 else:
2371 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2372 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2373 cert = self.sslconn.getpeercert()
2374 if support.verbose and self.server.chatty:
2375 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2376 cert_binary = self.sslconn.getpeercert(True)
2377 if support.verbose and self.server.chatty:
2378 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2379 cipher = self.sslconn.cipher()
2380 if support.verbose and self.server.chatty:
2381 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2382 sys.stdout.write(" server: selected protocol is now "
2383 + str(self.sslconn.selected_npn_protocol()) + "\n")
2384 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002385
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002386 def read(self):
2387 if self.sslconn:
2388 return self.sslconn.read()
2389 else:
2390 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002391
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002392 def write(self, bytes):
2393 if self.sslconn:
2394 return self.sslconn.write(bytes)
2395 else:
2396 return self.sock.send(bytes)
2397
2398 def close(self):
2399 if self.sslconn:
2400 self.sslconn.close()
2401 else:
2402 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002403
Antoine Pitrou480a1242010-04-28 21:37:09 +00002404 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002405 self.running = True
2406 if not self.server.starttls_server:
2407 if not self.wrap_conn():
2408 return
2409 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002410 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002411 msg = self.read()
2412 stripped = msg.strip()
2413 if not stripped:
2414 # eof, so quit this handler
2415 self.running = False
2416 try:
2417 self.sock = self.sslconn.unwrap()
2418 except OSError:
2419 # Many tests shut the TCP connection down
2420 # without an SSL shutdown. This causes
2421 # unwrap() to raise OSError with errno=0!
2422 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002423 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002424 self.sslconn = None
2425 self.close()
2426 elif stripped == b'over':
2427 if support.verbose and self.server.connectionchatty:
2428 sys.stdout.write(" server: client closed connection\n")
2429 self.close()
2430 return
2431 elif (self.server.starttls_server and
2432 stripped == b'STARTTLS'):
2433 if support.verbose and self.server.connectionchatty:
2434 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2435 self.write(b"OK\n")
2436 if not self.wrap_conn():
2437 return
2438 elif (self.server.starttls_server and self.sslconn
2439 and stripped == b'ENDTLS'):
2440 if support.verbose and self.server.connectionchatty:
2441 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2442 self.write(b"OK\n")
2443 self.sock = self.sslconn.unwrap()
2444 self.sslconn = None
2445 if support.verbose and self.server.connectionchatty:
2446 sys.stdout.write(" server: connection is now unencrypted...\n")
2447 elif stripped == b'CB tls-unique':
2448 if support.verbose and self.server.connectionchatty:
2449 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2450 data = self.sslconn.get_channel_binding("tls-unique")
2451 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002452 elif stripped == b'PHA':
2453 if support.verbose and self.server.connectionchatty:
2454 sys.stdout.write(" server: initiating post handshake auth\n")
2455 try:
2456 self.sslconn.verify_client_post_handshake()
2457 except ssl.SSLError as e:
2458 self.write(repr(e).encode("us-ascii") + b"\n")
2459 else:
2460 self.write(b"OK\n")
2461 elif stripped == b'HASCERT':
2462 if self.sslconn.getpeercert() is not None:
2463 self.write(b'TRUE\n')
2464 else:
2465 self.write(b'FALSE\n')
2466 elif stripped == b'GETCERT':
2467 cert = self.sslconn.getpeercert()
2468 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002469 else:
2470 if (support.verbose and
2471 self.server.connectionchatty):
2472 ctype = (self.sslconn and "encrypted") or "unencrypted"
2473 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2474 % (msg, ctype, msg.lower(), ctype))
2475 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002476 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002477 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2478 # when connection is not shut down gracefully.
2479 if self.server.chatty and support.verbose:
2480 sys.stdout.write(
2481 " Connection reset by peer: {}\n".format(
2482 self.addr)
2483 )
2484 self.close()
2485 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002486 except ssl.SSLError as err:
2487 # On Windows sometimes test_pha_required_nocert receives the
2488 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2489 # before the 'tlsv13 alert certificate required' exception.
2490 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2491 # is received test_pha_required_nocert fails with ConnectionResetError
2492 # because the underlying socket is closed
2493 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2494 if self.server.chatty and support.verbose:
2495 sys.stdout.write(err.args[1])
2496 # test_pha_required_nocert is expecting this exception
2497 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 except OSError:
2499 if self.server.chatty:
2500 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002501 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002503
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002504 # normally, we'd just stop here, but for the test
2505 # harness, we want to stop the server
2506 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002507
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002508 def __init__(self, certificate=None, ssl_version=None,
2509 certreqs=None, cacerts=None,
2510 chatty=True, connectionchatty=False, starttls_server=False,
2511 npn_protocols=None, alpn_protocols=None,
2512 ciphers=None, context=None):
2513 if context:
2514 self.context = context
2515 else:
2516 self.context = ssl.SSLContext(ssl_version
2517 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002518 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002519 self.context.verify_mode = (certreqs if certreqs is not None
2520 else ssl.CERT_NONE)
2521 if cacerts:
2522 self.context.load_verify_locations(cacerts)
2523 if certificate:
2524 self.context.load_cert_chain(certificate)
2525 if npn_protocols:
2526 self.context.set_npn_protocols(npn_protocols)
2527 if alpn_protocols:
2528 self.context.set_alpn_protocols(alpn_protocols)
2529 if ciphers:
2530 self.context.set_ciphers(ciphers)
2531 self.chatty = chatty
2532 self.connectionchatty = connectionchatty
2533 self.starttls_server = starttls_server
2534 self.sock = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03002535 self.port = socket_helper.bind_port(self.sock)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 self.flag = None
2537 self.active = False
2538 self.selected_npn_protocols = []
2539 self.selected_alpn_protocols = []
2540 self.shared_ciphers = []
2541 self.conn_errors = []
2542 threading.Thread.__init__(self)
2543 self.daemon = True
2544
2545 def __enter__(self):
2546 self.start(threading.Event())
2547 self.flag.wait()
2548 return self
2549
2550 def __exit__(self, *args):
2551 self.stop()
2552 self.join()
2553
2554 def start(self, flag=None):
2555 self.flag = flag
2556 threading.Thread.start(self)
2557
2558 def run(self):
2559 self.sock.settimeout(0.05)
2560 self.sock.listen()
2561 self.active = True
2562 if self.flag:
2563 # signal an event
2564 self.flag.set()
2565 while self.active:
2566 try:
2567 newconn, connaddr = self.sock.accept()
2568 if support.verbose and self.chatty:
2569 sys.stdout.write(' server: new connection from '
2570 + repr(connaddr) + '\n')
2571 handler = self.ConnectionHandler(self, newconn, connaddr)
2572 handler.start()
2573 handler.join()
2574 except socket.timeout:
2575 pass
2576 except KeyboardInterrupt:
2577 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002578 except BaseException as e:
2579 if support.verbose and self.chatty:
2580 sys.stdout.write(
2581 ' connection handling failed: ' + repr(e) + '\n')
2582
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 self.sock.close()
2584
2585 def stop(self):
2586 self.active = False
2587
2588class AsyncoreEchoServer(threading.Thread):
2589
2590 # this one's based on asyncore.dispatcher
2591
2592 class EchoServer (asyncore.dispatcher):
2593
2594 class ConnectionHandler(asyncore.dispatcher_with_send):
2595
2596 def __init__(self, conn, certfile):
2597 self.socket = test_wrap_socket(conn, server_side=True,
2598 certfile=certfile,
2599 do_handshake_on_connect=False)
2600 asyncore.dispatcher_with_send.__init__(self, self.socket)
2601 self._ssl_accepting = True
2602 self._do_ssl_handshake()
2603
2604 def readable(self):
2605 if isinstance(self.socket, ssl.SSLSocket):
2606 while self.socket.pending() > 0:
2607 self.handle_read_event()
2608 return True
2609
2610 def _do_ssl_handshake(self):
2611 try:
2612 self.socket.do_handshake()
2613 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2614 return
2615 except ssl.SSLEOFError:
2616 return self.handle_close()
2617 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002618 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002619 except OSError as err:
2620 if err.args[0] == errno.ECONNABORTED:
2621 return self.handle_close()
2622 else:
2623 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002624
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002625 def handle_read(self):
2626 if self._ssl_accepting:
2627 self._do_ssl_handshake()
2628 else:
2629 data = self.recv(1024)
2630 if support.verbose:
2631 sys.stdout.write(" server: read %s from client\n" % repr(data))
2632 if not data:
2633 self.close()
2634 else:
2635 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002636
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637 def handle_close(self):
2638 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002639 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002641
2642 def handle_error(self):
2643 raise
2644
Trent Nelson78520002008-04-10 20:54:35 +00002645 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002646 self.certfile = certfile
2647 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Serhiy Storchaka16994912020-04-25 10:06:29 +03002648 self.port = socket_helper.bind_port(sock, '')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002649 asyncore.dispatcher.__init__(self, sock)
2650 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002651
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002652 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002653 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002654 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2655 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002656
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002657 def handle_error(self):
2658 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002659
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002660 def __init__(self, certfile):
2661 self.flag = None
2662 self.active = False
2663 self.server = self.EchoServer(certfile)
2664 self.port = self.server.port
2665 threading.Thread.__init__(self)
2666 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002667
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002668 def __str__(self):
2669 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002670
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671 def __enter__(self):
2672 self.start(threading.Event())
2673 self.flag.wait()
2674 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002675
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002676 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002677 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002678 sys.stdout.write(" cleanup: stopping server.\n")
2679 self.stop()
2680 if support.verbose:
2681 sys.stdout.write(" cleanup: joining server thread.\n")
2682 self.join()
2683 if support.verbose:
2684 sys.stdout.write(" cleanup: successfully joined.\n")
2685 # make sure that ConnectionHandler is removed from socket_map
2686 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002687
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002688 def start (self, flag=None):
2689 self.flag = flag
2690 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002691
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002692 def run(self):
2693 self.active = True
2694 if self.flag:
2695 self.flag.set()
2696 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002697 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 asyncore.loop(1)
2699 except:
2700 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002701
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002702 def stop(self):
2703 self.active = False
2704 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002705
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706def server_params_test(client_context, server_context, indata=b"FOO\n",
2707 chatty=True, connectionchatty=False, sni_name=None,
2708 session=None):
2709 """
2710 Launch a server, connect a client to it and try various reads
2711 and writes.
2712 """
2713 stats = {}
2714 server = ThreadedEchoServer(context=server_context,
2715 chatty=chatty,
2716 connectionchatty=False)
2717 with server:
2718 with client_context.wrap_socket(socket.socket(),
2719 server_hostname=sni_name, session=session) as s:
2720 s.connect((HOST, server.port))
2721 for arg in [indata, bytearray(indata), memoryview(indata)]:
2722 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002723 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002724 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002725 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002726 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002727 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002728 if connectionchatty:
2729 if support.verbose:
2730 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002731 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002733 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2734 % (outdata[:20], len(outdata),
2735 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002736 s.write(b"over\n")
2737 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002738 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002739 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740 stats.update({
2741 'compression': s.compression(),
2742 'cipher': s.cipher(),
2743 'peercert': s.getpeercert(),
2744 'client_alpn_protocol': s.selected_alpn_protocol(),
2745 'client_npn_protocol': s.selected_npn_protocol(),
2746 'version': s.version(),
2747 'session_reused': s.session_reused,
2748 'session': s.session,
2749 })
2750 s.close()
2751 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2752 stats['server_npn_protocols'] = server.selected_npn_protocols
2753 stats['server_shared_ciphers'] = server.shared_ciphers
2754 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002755
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002756def try_protocol_combo(server_protocol, client_protocol, expect_success,
2757 certsreqs=None, server_options=0, client_options=0):
2758 """
2759 Try to SSL-connect using *client_protocol* to *server_protocol*.
2760 If *expect_success* is true, assert that the connection succeeds,
2761 if it's false, assert that the connection fails.
2762 Also, if *expect_success* is a string, assert that it is the protocol
2763 version actually used by the connection.
2764 """
2765 if certsreqs is None:
2766 certsreqs = ssl.CERT_NONE
2767 certtype = {
2768 ssl.CERT_NONE: "CERT_NONE",
2769 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2770 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2771 }[certsreqs]
2772 if support.verbose:
2773 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2774 sys.stdout.write(formatstr %
2775 (ssl.get_protocol_name(client_protocol),
2776 ssl.get_protocol_name(server_protocol),
2777 certtype))
2778 client_context = ssl.SSLContext(client_protocol)
2779 client_context.options |= client_options
2780 server_context = ssl.SSLContext(server_protocol)
2781 server_context.options |= server_options
2782
Victor Stinner3ef63442019-02-19 18:06:03 +01002783 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2784 if (min_version is not None
2785 # SSLContext.minimum_version is only available on recent OpenSSL
2786 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2787 and hasattr(server_context, 'minimum_version')
2788 and server_protocol == ssl.PROTOCOL_TLS
2789 and server_context.minimum_version > min_version):
2790 # If OpenSSL configuration is strict and requires more recent TLS
2791 # version, we have to change the minimum to test old TLS versions.
2792 server_context.minimum_version = min_version
2793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002794 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2795 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2796 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002797 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002798 client_context.set_ciphers("ALL")
2799
2800 for ctx in (client_context, server_context):
2801 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002802 ctx.load_cert_chain(SIGNED_CERTFILE)
2803 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 try:
2805 stats = server_params_test(client_context, server_context,
2806 chatty=False, connectionchatty=False)
2807 # Protocol mismatch can result in either an SSLError, or a
2808 # "Connection reset by peer" error.
2809 except ssl.SSLError:
2810 if expect_success:
2811 raise
2812 except OSError as e:
2813 if expect_success or e.errno != errno.ECONNRESET:
2814 raise
2815 else:
2816 if not expect_success:
2817 raise AssertionError(
2818 "Client protocol %s succeeded with server protocol %s!"
2819 % (ssl.get_protocol_name(client_protocol),
2820 ssl.get_protocol_name(server_protocol)))
2821 elif (expect_success is not True
2822 and expect_success != stats['version']):
2823 raise AssertionError("version mismatch: expected %r, got %r"
2824 % (expect_success, stats['version']))
2825
2826
2827class ThreadedTests(unittest.TestCase):
2828
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002829 def test_echo(self):
2830 """Basic test of an SSL client connecting to a server"""
2831 if support.verbose:
2832 sys.stdout.write("\n")
2833 for protocol in PROTOCOLS:
2834 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2835 continue
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02002836 if not has_tls_protocol(protocol):
2837 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002838 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2839 context = ssl.SSLContext(protocol)
2840 context.load_cert_chain(CERTFILE)
2841 server_params_test(context, context,
2842 chatty=True, connectionchatty=True)
2843
Christian Heimesa170fa12017-09-15 20:27:30 +02002844 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002845
2846 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2847 server_params_test(client_context=client_context,
2848 server_context=server_context,
2849 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002850 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002851
2852 client_context.check_hostname = False
2853 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2854 with self.assertRaises(ssl.SSLError) as e:
2855 server_params_test(client_context=server_context,
2856 server_context=client_context,
2857 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002858 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002859 self.assertIn('called a function you should not call',
2860 str(e.exception))
2861
2862 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2863 with self.assertRaises(ssl.SSLError) as e:
2864 server_params_test(client_context=server_context,
2865 server_context=server_context,
2866 chatty=True, connectionchatty=True)
2867 self.assertIn('called a function you should not call',
2868 str(e.exception))
2869
2870 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2871 with self.assertRaises(ssl.SSLError) as e:
2872 server_params_test(client_context=server_context,
2873 server_context=client_context,
2874 chatty=True, connectionchatty=True)
2875 self.assertIn('called a function you should not call',
2876 str(e.exception))
2877
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002878 def test_getpeercert(self):
2879 if support.verbose:
2880 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002881
2882 client_context, server_context, hostname = testing_context()
2883 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002884 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002885 with client_context.wrap_socket(socket.socket(),
2886 do_handshake_on_connect=False,
2887 server_hostname=hostname) as s:
2888 s.connect((HOST, server.port))
2889 # getpeercert() raise ValueError while the handshake isn't
2890 # done.
2891 with self.assertRaises(ValueError):
2892 s.getpeercert()
2893 s.do_handshake()
2894 cert = s.getpeercert()
2895 self.assertTrue(cert, "Can't get peer certificate.")
2896 cipher = s.cipher()
2897 if support.verbose:
2898 sys.stdout.write(pprint.pformat(cert) + '\n')
2899 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2900 if 'subject' not in cert:
2901 self.fail("No subject field in certificate: %s." %
2902 pprint.pformat(cert))
2903 if ((('organizationName', 'Python Software Foundation'),)
2904 not in cert['subject']):
2905 self.fail(
2906 "Missing or invalid 'organizationName' field in certificate subject; "
2907 "should be 'Python Software Foundation'.")
2908 self.assertIn('notBefore', cert)
2909 self.assertIn('notAfter', cert)
2910 before = ssl.cert_time_to_seconds(cert['notBefore'])
2911 after = ssl.cert_time_to_seconds(cert['notAfter'])
2912 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002913
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002914 @unittest.skipUnless(have_verify_flags(),
2915 "verify_flags need OpenSSL > 0.9.8")
2916 def test_crl_check(self):
2917 if support.verbose:
2918 sys.stdout.write("\n")
2919
Christian Heimesa170fa12017-09-15 20:27:30 +02002920 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002921
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002922 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002923 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002924
2925 # VERIFY_DEFAULT should pass
2926 server = ThreadedEchoServer(context=server_context, chatty=True)
2927 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002928 with client_context.wrap_socket(socket.socket(),
2929 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002930 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 cert = s.getpeercert()
2932 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002933
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002935 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002936
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 server = ThreadedEchoServer(context=server_context, chatty=True)
2938 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002939 with client_context.wrap_socket(socket.socket(),
2940 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002941 with self.assertRaisesRegex(ssl.SSLError,
2942 "certificate verify failed"):
2943 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002944
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002946 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002947
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 server = ThreadedEchoServer(context=server_context, chatty=True)
2949 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002950 with client_context.wrap_socket(socket.socket(),
2951 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002952 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002953 cert = s.getpeercert()
2954 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002955
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002956 def test_check_hostname(self):
2957 if support.verbose:
2958 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002959
Christian Heimesa170fa12017-09-15 20:27:30 +02002960 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002961
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002962 # correct hostname should verify
2963 server = ThreadedEchoServer(context=server_context, chatty=True)
2964 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002965 with client_context.wrap_socket(socket.socket(),
2966 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002967 s.connect((HOST, server.port))
2968 cert = s.getpeercert()
2969 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002970
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002971 # incorrect hostname should raise an exception
2972 server = ThreadedEchoServer(context=server_context, chatty=True)
2973 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002974 with client_context.wrap_socket(socket.socket(),
2975 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002976 with self.assertRaisesRegex(
2977 ssl.CertificateError,
2978 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002980
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002981 # missing server_hostname arg should cause an exception, too
2982 server = ThreadedEchoServer(context=server_context, chatty=True)
2983 with server:
2984 with socket.socket() as s:
2985 with self.assertRaisesRegex(ValueError,
2986 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002987 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002988
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002989 def test_ecc_cert(self):
2990 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2991 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002992 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002993 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2994
2995 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2996 # load ECC cert
2997 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2998
2999 # correct hostname should verify
3000 server = ThreadedEchoServer(context=server_context, chatty=True)
3001 with server:
3002 with client_context.wrap_socket(socket.socket(),
3003 server_hostname=hostname) as s:
3004 s.connect((HOST, server.port))
3005 cert = s.getpeercert()
3006 self.assertTrue(cert, "Can't get peer certificate.")
3007 cipher = s.cipher()[0].split('-')
3008 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3009
3010 def test_dual_rsa_ecc(self):
3011 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3012 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003013 # TODO: fix TLSv1.3 once SSLContext can restrict signature
3014 # algorithms.
3015 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003016 # only ECDSA certs
3017 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
3018 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
3019
3020 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3021 # load ECC and RSA key/cert pairs
3022 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
3023 server_context.load_cert_chain(SIGNED_CERTFILE)
3024
3025 # correct hostname should verify
3026 server = ThreadedEchoServer(context=server_context, chatty=True)
3027 with server:
3028 with client_context.wrap_socket(socket.socket(),
3029 server_hostname=hostname) as s:
3030 s.connect((HOST, server.port))
3031 cert = s.getpeercert()
3032 self.assertTrue(cert, "Can't get peer certificate.")
3033 cipher = s.cipher()[0].split('-')
3034 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3035
Christian Heimes66e57422018-01-29 14:25:13 +01003036 def test_check_hostname_idn(self):
3037 if support.verbose:
3038 sys.stdout.write("\n")
3039
Christian Heimes11a14932018-02-24 02:35:08 +01003040 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003041 server_context.load_cert_chain(IDNSANSFILE)
3042
Christian Heimes11a14932018-02-24 02:35:08 +01003043 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003044 context.verify_mode = ssl.CERT_REQUIRED
3045 context.check_hostname = True
3046 context.load_verify_locations(SIGNING_CA)
3047
3048 # correct hostname should verify, when specified in several
3049 # different ways
3050 idn_hostnames = [
3051 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003052 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003053 ('xn--knig-5qa.idn.pythontest.net',
3054 'xn--knig-5qa.idn.pythontest.net'),
3055 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003056 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003057
3058 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003059 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003060 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3061 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3062 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003063 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3064
3065 # ('königsgäßchen.idna2008.pythontest.net',
3066 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3067 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3068 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3069 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3070 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3071
Christian Heimes66e57422018-01-29 14:25:13 +01003072 ]
3073 for server_hostname, expected_hostname in idn_hostnames:
3074 server = ThreadedEchoServer(context=server_context, chatty=True)
3075 with server:
3076 with context.wrap_socket(socket.socket(),
3077 server_hostname=server_hostname) as s:
3078 self.assertEqual(s.server_hostname, expected_hostname)
3079 s.connect((HOST, server.port))
3080 cert = s.getpeercert()
3081 self.assertEqual(s.server_hostname, expected_hostname)
3082 self.assertTrue(cert, "Can't get peer certificate.")
3083
Christian Heimes66e57422018-01-29 14:25:13 +01003084 # incorrect hostname should raise an exception
3085 server = ThreadedEchoServer(context=server_context, chatty=True)
3086 with server:
3087 with context.wrap_socket(socket.socket(),
3088 server_hostname="python.example.org") as s:
3089 with self.assertRaises(ssl.CertificateError):
3090 s.connect((HOST, server.port))
3091
Christian Heimes529525f2018-05-23 22:24:45 +02003092 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 """Connecting when the server rejects the client's certificate
3094
3095 Launch a server with CERT_REQUIRED, and check that trying to
3096 connect to it with a wrong client certificate fails.
3097 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003098 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003099 # load client cert that is not signed by trusted CA
3100 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003101 # require TLS client authentication
3102 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003103 # TLS 1.3 has different handshake
3104 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003105
3106 server = ThreadedEchoServer(
3107 context=server_context, chatty=True, connectionchatty=True,
3108 )
3109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003110 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003111 client_context.wrap_socket(socket.socket(),
3112 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003113 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003114 # Expect either an SSL error about the server rejecting
3115 # the connection, or a low-level connection reset (which
3116 # sometimes happens on Windows)
3117 s.connect((HOST, server.port))
3118 except ssl.SSLError as e:
3119 if support.verbose:
3120 sys.stdout.write("\nSSLError is %r\n" % e)
3121 except OSError as e:
3122 if e.errno != errno.ECONNRESET:
3123 raise
3124 if support.verbose:
3125 sys.stdout.write("\nsocket.error is %r\n" % e)
3126 else:
3127 self.fail("Use of invalid cert should have failed!")
3128
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003129 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003130 def test_wrong_cert_tls13(self):
3131 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003132 # load client cert that is not signed by trusted CA
3133 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003134 server_context.verify_mode = ssl.CERT_REQUIRED
3135 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3136 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3137
3138 server = ThreadedEchoServer(
3139 context=server_context, chatty=True, connectionchatty=True,
3140 )
3141 with server, \
3142 client_context.wrap_socket(socket.socket(),
3143 server_hostname=hostname) as s:
3144 # TLS 1.3 perform client cert exchange after handshake
3145 s.connect((HOST, server.port))
3146 try:
3147 s.write(b'data')
3148 s.read(4)
3149 except ssl.SSLError as e:
3150 if support.verbose:
3151 sys.stdout.write("\nSSLError is %r\n" % e)
3152 except OSError as e:
3153 if e.errno != errno.ECONNRESET:
3154 raise
3155 if support.verbose:
3156 sys.stdout.write("\nsocket.error is %r\n" % e)
3157 else:
3158 self.fail("Use of invalid cert should have failed!")
3159
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003160 def test_rude_shutdown(self):
3161 """A brutal shutdown of an SSL server should raise an OSError
3162 in the client when attempting handshake.
3163 """
3164 listener_ready = threading.Event()
3165 listener_gone = threading.Event()
3166
3167 s = socket.socket()
Serhiy Storchaka16994912020-04-25 10:06:29 +03003168 port = socket_helper.bind_port(s, HOST)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169
3170 # `listener` runs in a thread. It sits in an accept() until
3171 # the main thread connects. Then it rudely closes the socket,
3172 # and sets Event `listener_gone` to let the main thread know
3173 # the socket is gone.
3174 def listener():
3175 s.listen()
3176 listener_ready.set()
3177 newsock, addr = s.accept()
3178 newsock.close()
3179 s.close()
3180 listener_gone.set()
3181
3182 def connector():
3183 listener_ready.wait()
3184 with socket.socket() as c:
3185 c.connect((HOST, port))
3186 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003187 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003188 ssl_sock = test_wrap_socket(c)
3189 except OSError:
3190 pass
3191 else:
3192 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003193
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003194 t = threading.Thread(target=listener)
3195 t.start()
3196 try:
3197 connector()
3198 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003199 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003200
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003201 def test_ssl_cert_verify_error(self):
3202 if support.verbose:
3203 sys.stdout.write("\n")
3204
3205 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3206 server_context.load_cert_chain(SIGNED_CERTFILE)
3207
3208 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3209
3210 server = ThreadedEchoServer(context=server_context, chatty=True)
3211 with server:
3212 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003213 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003214 try:
3215 s.connect((HOST, server.port))
3216 except ssl.SSLError as e:
3217 msg = 'unable to get local issuer certificate'
3218 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3219 self.assertEqual(e.verify_code, 20)
3220 self.assertEqual(e.verify_message, msg)
3221 self.assertIn(msg, repr(e))
3222 self.assertIn('certificate verify failed', repr(e))
3223
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003224 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003225 def test_protocol_sslv2(self):
3226 """Connecting to an SSLv2 server with various client options"""
3227 if support.verbose:
3228 sys.stdout.write("\n")
3229 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3230 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3231 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003232 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003233 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003234 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3235 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3236 # SSLv23 client with specific SSL options
3237 if no_sslv2_implies_sslv3_hello():
3238 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003239 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003240 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003241 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003242 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003243 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003244 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003245
Christian Heimesa170fa12017-09-15 20:27:30 +02003246 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003247 """Connecting to an SSLv23 server with various client options"""
3248 if support.verbose:
3249 sys.stdout.write("\n")
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003250 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003251 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003252 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003253 except OSError as x:
3254 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3255 if support.verbose:
3256 sys.stdout.write(
3257 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3258 % str(x))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003259 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003260 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3261 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003262 if has_tls_version('TLSv1'):
3263 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003264
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003265 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003266 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3267 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003268 if has_tls_version('TLSv1'):
3269 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003270
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003271 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003272 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3273 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003274 if has_tls_version('TLSv1'):
3275 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003276
3277 # Server with specific SSL options
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003278 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003279 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003280 server_options=ssl.OP_NO_SSLv3)
3281 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003282 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003283 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003284 if has_tls_version('TLSv1'):
3285 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3286 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003287
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003288 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 def test_protocol_sslv3(self):
3290 """Connecting to an SSLv3 server with various client options"""
3291 if support.verbose:
3292 sys.stdout.write("\n")
3293 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3294 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3295 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003296 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003297 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003298 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003299 client_options=ssl.OP_NO_SSLv3)
3300 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3301 if no_sslv2_implies_sslv3_hello():
3302 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003303 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003304 False, client_options=ssl.OP_NO_SSLv2)
3305
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003306 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 def test_protocol_tlsv1(self):
3308 """Connecting to a TLSv1 server with various client options"""
3309 if support.verbose:
3310 sys.stdout.write("\n")
3311 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3312 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3313 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003314 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003315 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003316 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003317 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003318 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003319 client_options=ssl.OP_NO_TLSv1)
3320
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003321 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003322 def test_protocol_tlsv1_1(self):
3323 """Connecting to a TLSv1.1 server with various client options.
3324 Testing against older TLS versions."""
3325 if support.verbose:
3326 sys.stdout.write("\n")
3327 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003328 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003329 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003330 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003331 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003332 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003333 client_options=ssl.OP_NO_TLSv1_1)
3334
Christian Heimesa170fa12017-09-15 20:27:30 +02003335 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003336 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3337 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003338
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003339 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003340 def test_protocol_tlsv1_2(self):
3341 """Connecting to a TLSv1.2 server with various client options.
3342 Testing against older TLS versions."""
3343 if support.verbose:
3344 sys.stdout.write("\n")
3345 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3346 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3347 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003348 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003349 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003350 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003351 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003352 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003353 client_options=ssl.OP_NO_TLSv1_2)
3354
Christian Heimesa170fa12017-09-15 20:27:30 +02003355 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003356 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3357 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3358 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3359 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3360
3361 def test_starttls(self):
3362 """Switching from clear text to encrypted and back again."""
3363 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3364
3365 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003366 starttls_server=True,
3367 chatty=True,
3368 connectionchatty=True)
3369 wrapped = False
3370 with server:
3371 s = socket.socket()
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03003372 s.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003373 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003374 if support.verbose:
3375 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003376 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003377 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003378 sys.stdout.write(
3379 " client: sending %r...\n" % indata)
3380 if wrapped:
3381 conn.write(indata)
3382 outdata = conn.read()
3383 else:
3384 s.send(indata)
3385 outdata = s.recv(1024)
3386 msg = outdata.strip().lower()
3387 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3388 # STARTTLS ok, switch to secure mode
3389 if support.verbose:
3390 sys.stdout.write(
3391 " client: read %r from server, starting TLS...\n"
3392 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003393 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003394 wrapped = True
3395 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3396 # ENDTLS ok, switch back to clear text
3397 if support.verbose:
3398 sys.stdout.write(
3399 " client: read %r from server, ending TLS...\n"
3400 % msg)
3401 s = conn.unwrap()
3402 wrapped = False
3403 else:
3404 if support.verbose:
3405 sys.stdout.write(
3406 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003407 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003408 sys.stdout.write(" client: closing connection.\n")
3409 if wrapped:
3410 conn.write(b"over\n")
3411 else:
3412 s.send(b"over\n")
3413 if wrapped:
3414 conn.close()
3415 else:
3416 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003417
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 def test_socketserver(self):
3419 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003420 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003421 # try to connect
3422 if support.verbose:
3423 sys.stdout.write('\n')
3424 with open(CERTFILE, 'rb') as f:
3425 d1 = f.read()
3426 d2 = ''
3427 # now fetch the same data from the HTTPS server
3428 url = 'https://localhost:%d/%s' % (
3429 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003430 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003431 f = urllib.request.urlopen(url, context=context)
3432 try:
3433 dlen = f.info().get("content-length")
3434 if dlen and (int(dlen) > 0):
3435 d2 = f.read(int(dlen))
3436 if support.verbose:
3437 sys.stdout.write(
3438 " client: read %d bytes from remote server '%s'\n"
3439 % (len(d2), server))
3440 finally:
3441 f.close()
3442 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003443
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003444 def test_asyncore_server(self):
3445 """Check the example asyncore integration."""
3446 if support.verbose:
3447 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003448
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003449 indata = b"FOO\n"
3450 server = AsyncoreEchoServer(CERTFILE)
3451 with server:
3452 s = test_wrap_socket(socket.socket())
3453 s.connect(('127.0.0.1', server.port))
3454 if support.verbose:
3455 sys.stdout.write(
3456 " client: sending %r...\n" % indata)
3457 s.write(indata)
3458 outdata = s.read()
3459 if support.verbose:
3460 sys.stdout.write(" client: read %r\n" % outdata)
3461 if outdata != indata.lower():
3462 self.fail(
3463 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3464 % (outdata[:20], len(outdata),
3465 indata[:20].lower(), len(indata)))
3466 s.write(b"over\n")
3467 if support.verbose:
3468 sys.stdout.write(" client: closing connection.\n")
3469 s.close()
3470 if support.verbose:
3471 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003472
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003473 def test_recv_send(self):
3474 """Test recv(), send() and friends."""
3475 if support.verbose:
3476 sys.stdout.write("\n")
3477
3478 server = ThreadedEchoServer(CERTFILE,
3479 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003480 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003481 cacerts=CERTFILE,
3482 chatty=True,
3483 connectionchatty=False)
3484 with server:
3485 s = test_wrap_socket(socket.socket(),
3486 server_side=False,
3487 certfile=CERTFILE,
3488 ca_certs=CERTFILE,
3489 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003490 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003491 s.connect((HOST, server.port))
3492 # helper methods for standardising recv* method signatures
3493 def _recv_into():
3494 b = bytearray(b"\0"*100)
3495 count = s.recv_into(b)
3496 return b[:count]
3497
3498 def _recvfrom_into():
3499 b = bytearray(b"\0"*100)
3500 count, addr = s.recvfrom_into(b)
3501 return b[:count]
3502
3503 # (name, method, expect success?, *args, return value func)
3504 send_methods = [
3505 ('send', s.send, True, [], len),
3506 ('sendto', s.sendto, False, ["some.address"], len),
3507 ('sendall', s.sendall, True, [], lambda x: None),
3508 ]
3509 # (name, method, whether to expect success, *args)
3510 recv_methods = [
3511 ('recv', s.recv, True, []),
3512 ('recvfrom', s.recvfrom, False, ["some.address"]),
3513 ('recv_into', _recv_into, True, []),
3514 ('recvfrom_into', _recvfrom_into, False, []),
3515 ]
3516 data_prefix = "PREFIX_"
3517
3518 for (meth_name, send_meth, expect_success, args,
3519 ret_val_meth) in send_methods:
3520 indata = (data_prefix + meth_name).encode('ascii')
3521 try:
3522 ret = send_meth(indata, *args)
3523 msg = "sending with {}".format(meth_name)
3524 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3525 outdata = s.read()
3526 if outdata != indata.lower():
3527 self.fail(
3528 "While sending with <<{name:s}>> bad data "
3529 "<<{outdata:r}>> ({nout:d}) received; "
3530 "expected <<{indata:r}>> ({nin:d})\n".format(
3531 name=meth_name, outdata=outdata[:20],
3532 nout=len(outdata),
3533 indata=indata[:20], nin=len(indata)
3534 )
3535 )
3536 except ValueError as e:
3537 if expect_success:
3538 self.fail(
3539 "Failed to send with method <<{name:s}>>; "
3540 "expected to succeed.\n".format(name=meth_name)
3541 )
3542 if not str(e).startswith(meth_name):
3543 self.fail(
3544 "Method <<{name:s}>> failed with unexpected "
3545 "exception message: {exp:s}\n".format(
3546 name=meth_name, exp=e
3547 )
3548 )
3549
3550 for meth_name, recv_meth, expect_success, args in recv_methods:
3551 indata = (data_prefix + meth_name).encode('ascii')
3552 try:
3553 s.send(indata)
3554 outdata = recv_meth(*args)
3555 if outdata != indata.lower():
3556 self.fail(
3557 "While receiving with <<{name:s}>> bad data "
3558 "<<{outdata:r}>> ({nout:d}) received; "
3559 "expected <<{indata:r}>> ({nin:d})\n".format(
3560 name=meth_name, outdata=outdata[:20],
3561 nout=len(outdata),
3562 indata=indata[:20], nin=len(indata)
3563 )
3564 )
3565 except ValueError as e:
3566 if expect_success:
3567 self.fail(
3568 "Failed to receive with method <<{name:s}>>; "
3569 "expected to succeed.\n".format(name=meth_name)
3570 )
3571 if not str(e).startswith(meth_name):
3572 self.fail(
3573 "Method <<{name:s}>> failed with unexpected "
3574 "exception message: {exp:s}\n".format(
3575 name=meth_name, exp=e
3576 )
3577 )
3578 # consume data
3579 s.read()
3580
3581 # read(-1, buffer) is supported, even though read(-1) is not
3582 data = b"data"
3583 s.send(data)
3584 buffer = bytearray(len(data))
3585 self.assertEqual(s.read(-1, buffer), len(data))
3586 self.assertEqual(buffer, data)
3587
Christian Heimes888bbdc2017-09-07 14:18:21 -07003588 # sendall accepts bytes-like objects
3589 if ctypes is not None:
3590 ubyte = ctypes.c_ubyte * len(data)
3591 byteslike = ubyte.from_buffer_copy(data)
3592 s.sendall(byteslike)
3593 self.assertEqual(s.read(), data)
3594
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003595 # Make sure sendmsg et al are disallowed to avoid
3596 # inadvertent disclosure of data and/or corruption
3597 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003598 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003599 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3600 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3601 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003602 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 s.write(b"over\n")
3604
3605 self.assertRaises(ValueError, s.recv, -1)
3606 self.assertRaises(ValueError, s.read, -1)
3607
3608 s.close()
3609
3610 def test_recv_zero(self):
3611 server = ThreadedEchoServer(CERTFILE)
3612 server.__enter__()
3613 self.addCleanup(server.__exit__, None, None)
3614 s = socket.create_connection((HOST, server.port))
3615 self.addCleanup(s.close)
3616 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3617 self.addCleanup(s.close)
3618
3619 # recv/read(0) should return no data
3620 s.send(b"data")
3621 self.assertEqual(s.recv(0), b"")
3622 self.assertEqual(s.read(0), b"")
3623 self.assertEqual(s.read(), b"data")
3624
3625 # Should not block if the other end sends no data
3626 s.setblocking(False)
3627 self.assertEqual(s.recv(0), b"")
3628 self.assertEqual(s.recv_into(bytearray()), 0)
3629
3630 def test_nonblocking_send(self):
3631 server = ThreadedEchoServer(CERTFILE,
3632 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003633 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003634 cacerts=CERTFILE,
3635 chatty=True,
3636 connectionchatty=False)
3637 with server:
3638 s = test_wrap_socket(socket.socket(),
3639 server_side=False,
3640 certfile=CERTFILE,
3641 ca_certs=CERTFILE,
3642 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003643 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003644 s.connect((HOST, server.port))
3645 s.setblocking(False)
3646
3647 # If we keep sending data, at some point the buffers
3648 # will be full and the call will block
3649 buf = bytearray(8192)
3650 def fill_buffer():
3651 while True:
3652 s.send(buf)
3653 self.assertRaises((ssl.SSLWantWriteError,
3654 ssl.SSLWantReadError), fill_buffer)
3655
3656 # Now read all the output and discard it
3657 s.setblocking(True)
3658 s.close()
3659
3660 def test_handshake_timeout(self):
3661 # Issue #5103: SSL handshake must respect the socket timeout
3662 server = socket.socket(socket.AF_INET)
3663 host = "127.0.0.1"
Serhiy Storchaka16994912020-04-25 10:06:29 +03003664 port = socket_helper.bind_port(server)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003665 started = threading.Event()
3666 finish = False
3667
3668 def serve():
3669 server.listen()
3670 started.set()
3671 conns = []
3672 while not finish:
3673 r, w, e = select.select([server], [], [], 0.1)
3674 if server in r:
3675 # Let the socket hang around rather than having
3676 # it closed by garbage collection.
3677 conns.append(server.accept()[0])
3678 for sock in conns:
3679 sock.close()
3680
3681 t = threading.Thread(target=serve)
3682 t.start()
3683 started.wait()
3684
3685 try:
3686 try:
3687 c = socket.socket(socket.AF_INET)
3688 c.settimeout(0.2)
3689 c.connect((host, port))
3690 # Will attempt handshake and time out
3691 self.assertRaisesRegex(socket.timeout, "timed out",
3692 test_wrap_socket, c)
3693 finally:
3694 c.close()
3695 try:
3696 c = socket.socket(socket.AF_INET)
3697 c = test_wrap_socket(c)
3698 c.settimeout(0.2)
3699 # Will attempt handshake and time out
3700 self.assertRaisesRegex(socket.timeout, "timed out",
3701 c.connect, (host, port))
3702 finally:
3703 c.close()
3704 finally:
3705 finish = True
3706 t.join()
3707 server.close()
3708
3709 def test_server_accept(self):
3710 # Issue #16357: accept() on a SSLSocket created through
3711 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003712 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003713 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003714 context.load_verify_locations(SIGNING_CA)
3715 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003716 server = socket.socket(socket.AF_INET)
3717 host = "127.0.0.1"
Serhiy Storchaka16994912020-04-25 10:06:29 +03003718 port = socket_helper.bind_port(server)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003719 server = context.wrap_socket(server, server_side=True)
3720 self.assertTrue(server.server_side)
3721
3722 evt = threading.Event()
3723 remote = None
3724 peer = None
3725 def serve():
3726 nonlocal remote, peer
3727 server.listen()
3728 # Block on the accept and wait on the connection to close.
3729 evt.set()
3730 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003731 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003732
3733 t = threading.Thread(target=serve)
3734 t.start()
3735 # Client wait until server setup and perform a connect.
3736 evt.wait()
3737 client = context.wrap_socket(socket.socket())
3738 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003739 client.send(b'data')
3740 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003741 client_addr = client.getsockname()
3742 client.close()
3743 t.join()
3744 remote.close()
3745 server.close()
3746 # Sanity checks.
3747 self.assertIsInstance(remote, ssl.SSLSocket)
3748 self.assertEqual(peer, client_addr)
3749
3750 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003751 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003752 with context.wrap_socket(socket.socket()) as sock:
3753 with self.assertRaises(OSError) as cm:
3754 sock.getpeercert()
3755 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3756
3757 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003758 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003759 with context.wrap_socket(socket.socket()) as sock:
3760 with self.assertRaises(OSError) as cm:
3761 sock.do_handshake()
3762 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3763
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003764 def test_no_shared_ciphers(self):
3765 client_context, server_context, hostname = testing_context()
3766 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3767 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003768 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003769 client_context.set_ciphers("AES128")
3770 server_context.set_ciphers("AES256")
3771 with ThreadedEchoServer(context=server_context) as server:
3772 with client_context.wrap_socket(socket.socket(),
3773 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003774 with self.assertRaises(OSError):
3775 s.connect((HOST, server.port))
3776 self.assertIn("no shared cipher", server.conn_errors[0])
3777
3778 def test_version_basic(self):
3779 """
3780 Basic tests for SSLSocket.version().
3781 More tests are done in the test_protocol_*() methods.
3782 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003783 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3784 context.check_hostname = False
3785 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003787 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 chatty=False) as server:
3789 with context.wrap_socket(socket.socket()) as s:
3790 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003791 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003792 s.connect((HOST, server.port))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003793 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003794 self.assertEqual(s.version(), 'TLSv1.3')
3795 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003796 self.assertEqual(s.version(), 'TLSv1.2')
3797 else: # 0.9.8 to 1.0.1
3798 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003799 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 self.assertIs(s.version(), None)
3801
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003802 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003803 def test_tls1_3(self):
3804 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3805 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003806 context.options |= (
3807 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3808 )
3809 with ThreadedEchoServer(context=context) as server:
3810 with context.wrap_socket(socket.socket()) as s:
3811 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003812 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003813 'TLS_AES_256_GCM_SHA384',
3814 'TLS_CHACHA20_POLY1305_SHA256',
3815 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003816 })
3817 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003818
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003819 @requires_minimum_version
3820 @requires_tls_version('TLSv1_2')
3821 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003822 client_context, server_context, hostname = testing_context()
3823 # client TLSv1.0 to 1.2
3824 client_context.minimum_version = ssl.TLSVersion.TLSv1
3825 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3826 # server only TLSv1.2
3827 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3828 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3829
3830 with ThreadedEchoServer(context=server_context) as server:
3831 with client_context.wrap_socket(socket.socket(),
3832 server_hostname=hostname) as s:
3833 s.connect((HOST, server.port))
3834 self.assertEqual(s.version(), 'TLSv1.2')
3835
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003836 @requires_minimum_version
3837 @requires_tls_version('TLSv1_1')
3838 def test_min_max_version_tlsv1_1(self):
3839 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003840 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003841 client_context.minimum_version = ssl.TLSVersion.TLSv1
3842 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003843 server_context.minimum_version = ssl.TLSVersion.TLSv1
3844 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3845
3846 with ThreadedEchoServer(context=server_context) as server:
3847 with client_context.wrap_socket(socket.socket(),
3848 server_hostname=hostname) as s:
3849 s.connect((HOST, server.port))
3850 self.assertEqual(s.version(), 'TLSv1.1')
3851
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003852 @requires_minimum_version
3853 @requires_tls_version('TLSv1_2')
3854 def test_min_max_version_mismatch(self):
3855 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003856 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003857 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimesc9bc49c2019-09-11 19:24:47 +02003858 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003859 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimesde606ea2019-09-11 19:48:58 +02003860 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003861 with ThreadedEchoServer(context=server_context) as server:
3862 with client_context.wrap_socket(socket.socket(),
3863 server_hostname=hostname) as s:
3864 with self.assertRaises(ssl.SSLError) as e:
3865 s.connect((HOST, server.port))
3866 self.assertIn("alert", str(e.exception))
3867
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003868 @requires_minimum_version
3869 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003870 def test_min_max_version_sslv3(self):
3871 client_context, server_context, hostname = testing_context()
3872 server_context.minimum_version = ssl.TLSVersion.SSLv3
3873 client_context.minimum_version = ssl.TLSVersion.SSLv3
3874 client_context.maximum_version = ssl.TLSVersion.SSLv3
3875 with ThreadedEchoServer(context=server_context) as server:
3876 with client_context.wrap_socket(socket.socket(),
3877 server_hostname=hostname) as s:
3878 s.connect((HOST, server.port))
3879 self.assertEqual(s.version(), 'SSLv3')
3880
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3882 def test_default_ecdh_curve(self):
3883 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3884 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003885 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003886 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003887 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3888 # cipher name.
3889 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003890 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3891 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3892 # our default cipher list should prefer ECDH-based ciphers
3893 # automatically.
3894 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3895 context.set_ciphers("ECCdraft:ECDH")
3896 with ThreadedEchoServer(context=context) as server:
3897 with context.wrap_socket(socket.socket()) as s:
3898 s.connect((HOST, server.port))
3899 self.assertIn("ECDH", s.cipher()[0])
3900
3901 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3902 "'tls-unique' channel binding not available")
3903 def test_tls_unique_channel_binding(self):
3904 """Test tls-unique channel binding."""
3905 if support.verbose:
3906 sys.stdout.write("\n")
3907
Christian Heimes05d9fe32018-02-27 08:55:39 +01003908 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003909
3910 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911 chatty=True,
3912 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003913
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003915 with client_context.wrap_socket(
3916 socket.socket(),
3917 server_hostname=hostname) as s:
3918 s.connect((HOST, server.port))
3919 # get the data
3920 cb_data = s.get_channel_binding("tls-unique")
3921 if support.verbose:
3922 sys.stdout.write(
3923 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003924
Christian Heimes05d9fe32018-02-27 08:55:39 +01003925 # check if it is sane
3926 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003927 if s.version() == 'TLSv1.3':
3928 self.assertEqual(len(cb_data), 48)
3929 else:
3930 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003931
Christian Heimes05d9fe32018-02-27 08:55:39 +01003932 # and compare with the peers version
3933 s.write(b"CB tls-unique\n")
3934 peer_data_repr = s.read().strip()
3935 self.assertEqual(peer_data_repr,
3936 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003937
3938 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003939 with client_context.wrap_socket(
3940 socket.socket(),
3941 server_hostname=hostname) as s:
3942 s.connect((HOST, server.port))
3943 new_cb_data = s.get_channel_binding("tls-unique")
3944 if support.verbose:
3945 sys.stdout.write(
3946 "got another channel binding data: {0!r}\n".format(
3947 new_cb_data)
3948 )
3949 # is it really unique
3950 self.assertNotEqual(cb_data, new_cb_data)
3951 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003952 if s.version() == 'TLSv1.3':
3953 self.assertEqual(len(cb_data), 48)
3954 else:
3955 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003956 s.write(b"CB tls-unique\n")
3957 peer_data_repr = s.read().strip()
3958 self.assertEqual(peer_data_repr,
3959 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960
3961 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003962 client_context, server_context, hostname = testing_context()
3963 stats = server_params_test(client_context, server_context,
3964 chatty=True, connectionchatty=True,
3965 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 if support.verbose:
3967 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3968 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3969
3970 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3971 "ssl.OP_NO_COMPRESSION needed for this test")
3972 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003973 client_context, server_context, hostname = testing_context()
3974 client_context.options |= ssl.OP_NO_COMPRESSION
3975 server_context.options |= ssl.OP_NO_COMPRESSION
3976 stats = server_params_test(client_context, server_context,
3977 chatty=True, connectionchatty=True,
3978 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979 self.assertIs(stats['compression'], None)
3980
Paul Monsonf3550692019-06-19 13:09:54 -07003981 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003982 def test_dh_params(self):
3983 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003984 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003985 # test scenario needs TLS <= 1.2
3986 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003987 server_context.load_dh_params(DHFILE)
3988 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003989 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003990 stats = server_params_test(client_context, server_context,
3991 chatty=True, connectionchatty=True,
3992 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003993 cipher = stats["cipher"][0]
3994 parts = cipher.split("-")
3995 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3996 self.fail("Non-DH cipher: " + cipher[0])
3997
Christian Heimesb7b92252018-02-25 09:49:31 +01003998 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003999 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01004000 def test_ecdh_curve(self):
4001 # server secp384r1, client auto
4002 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004003
Christian Heimesb7b92252018-02-25 09:49:31 +01004004 server_context.set_ecdh_curve("secp384r1")
4005 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
4006 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4007 stats = server_params_test(client_context, server_context,
4008 chatty=True, connectionchatty=True,
4009 sni_name=hostname)
4010
4011 # server auto, client secp384r1
4012 client_context, server_context, hostname = testing_context()
4013 client_context.set_ecdh_curve("secp384r1")
4014 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
4015 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4016 stats = server_params_test(client_context, server_context,
4017 chatty=True, connectionchatty=True,
4018 sni_name=hostname)
4019
4020 # server / client curve mismatch
4021 client_context, server_context, hostname = testing_context()
4022 client_context.set_ecdh_curve("prime256v1")
4023 server_context.set_ecdh_curve("secp384r1")
4024 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
4025 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4026 try:
4027 stats = server_params_test(client_context, server_context,
4028 chatty=True, connectionchatty=True,
4029 sni_name=hostname)
4030 except ssl.SSLError:
4031 pass
4032 else:
4033 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004034 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004035 self.fail("mismatch curve did not fail")
4036
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004037 def test_selected_alpn_protocol(self):
4038 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004039 client_context, server_context, hostname = testing_context()
4040 stats = server_params_test(client_context, server_context,
4041 chatty=True, connectionchatty=True,
4042 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 self.assertIs(stats['client_alpn_protocol'], None)
4044
4045 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4046 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4047 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004048 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 server_context.set_alpn_protocols(['foo', 'bar'])
4050 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004051 chatty=True, connectionchatty=True,
4052 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004053 self.assertIs(stats['client_alpn_protocol'], None)
4054
4055 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4056 def test_alpn_protocols(self):
4057 server_protocols = ['foo', 'bar', 'milkshake']
4058 protocol_tests = [
4059 (['foo', 'bar'], 'foo'),
4060 (['bar', 'foo'], 'foo'),
4061 (['milkshake'], 'milkshake'),
4062 (['http/3.0', 'http/4.0'], None)
4063 ]
4064 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004065 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004066 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004067 client_context.set_alpn_protocols(client_protocols)
4068
4069 try:
4070 stats = server_params_test(client_context,
4071 server_context,
4072 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004073 connectionchatty=True,
4074 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 except ssl.SSLError as e:
4076 stats = e
4077
Christian Heimes05d9fe32018-02-27 08:55:39 +01004078 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004079 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4080 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4081 self.assertIsInstance(stats, ssl.SSLError)
4082 else:
4083 msg = "failed trying %s (s) and %s (c).\n" \
4084 "was expecting %s, but got %%s from the %%s" \
4085 % (str(server_protocols), str(client_protocols),
4086 str(expected))
4087 client_result = stats['client_alpn_protocol']
4088 self.assertEqual(client_result, expected,
4089 msg % (client_result, "client"))
4090 server_result = stats['server_alpn_protocols'][-1] \
4091 if len(stats['server_alpn_protocols']) else 'nothing'
4092 self.assertEqual(server_result, expected,
4093 msg % (server_result, "server"))
4094
4095 def test_selected_npn_protocol(self):
4096 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004097 client_context, server_context, hostname = testing_context()
4098 stats = server_params_test(client_context, server_context,
4099 chatty=True, connectionchatty=True,
4100 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004101 self.assertIs(stats['client_npn_protocol'], None)
4102
4103 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4104 def test_npn_protocols(self):
4105 server_protocols = ['http/1.1', 'spdy/2']
4106 protocol_tests = [
4107 (['http/1.1', 'spdy/2'], 'http/1.1'),
4108 (['spdy/2', 'http/1.1'], 'http/1.1'),
4109 (['spdy/2', 'test'], 'spdy/2'),
4110 (['abc', 'def'], 'abc')
4111 ]
4112 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004113 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004114 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004115 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004116 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 chatty=True, connectionchatty=True,
4118 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004119 msg = "failed trying %s (s) and %s (c).\n" \
4120 "was expecting %s, but got %%s from the %%s" \
4121 % (str(server_protocols), str(client_protocols),
4122 str(expected))
4123 client_result = stats['client_npn_protocol']
4124 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4125 server_result = stats['server_npn_protocols'][-1] \
4126 if len(stats['server_npn_protocols']) else 'nothing'
4127 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4128
4129 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004130 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004132 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004134 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004135 client_context.load_verify_locations(SIGNING_CA)
4136 return server_context, other_context, client_context
4137
4138 def check_common_name(self, stats, name):
4139 cert = stats['peercert']
4140 self.assertIn((('commonName', name),), cert['subject'])
4141
4142 @needs_sni
4143 def test_sni_callback(self):
4144 calls = []
4145 server_context, other_context, client_context = self.sni_contexts()
4146
Christian Heimesa170fa12017-09-15 20:27:30 +02004147 client_context.check_hostname = False
4148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004149 def servername_cb(ssl_sock, server_name, initial_context):
4150 calls.append((server_name, initial_context))
4151 if server_name is not None:
4152 ssl_sock.context = other_context
4153 server_context.set_servername_callback(servername_cb)
4154
4155 stats = server_params_test(client_context, server_context,
4156 chatty=True,
4157 sni_name='supermessage')
4158 # The hostname was fetched properly, and the certificate was
4159 # changed for the connection.
4160 self.assertEqual(calls, [("supermessage", server_context)])
4161 # CERTFILE4 was selected
4162 self.check_common_name(stats, 'fakehostname')
4163
4164 calls = []
4165 # The callback is called with server_name=None
4166 stats = server_params_test(client_context, server_context,
4167 chatty=True,
4168 sni_name=None)
4169 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004170 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004171
4172 # Check disabling the callback
4173 calls = []
4174 server_context.set_servername_callback(None)
4175
4176 stats = server_params_test(client_context, server_context,
4177 chatty=True,
4178 sni_name='notfunny')
4179 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004180 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004181 self.assertEqual(calls, [])
4182
4183 @needs_sni
4184 def test_sni_callback_alert(self):
4185 # Returning a TLS alert is reflected to the connecting client
4186 server_context, other_context, client_context = self.sni_contexts()
4187
4188 def cb_returning_alert(ssl_sock, server_name, initial_context):
4189 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4190 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004191 with self.assertRaises(ssl.SSLError) as cm:
4192 stats = server_params_test(client_context, server_context,
4193 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004194 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004195 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004196
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004197 @needs_sni
4198 def test_sni_callback_raising(self):
4199 # Raising fails the connection with a TLS handshake failure alert.
4200 server_context, other_context, client_context = self.sni_contexts()
4201
4202 def cb_raising(ssl_sock, server_name, initial_context):
4203 1/0
4204 server_context.set_servername_callback(cb_raising)
4205
Victor Stinner00253502019-06-03 03:51:43 +02004206 with support.catch_unraisable_exception() as catch:
4207 with self.assertRaises(ssl.SSLError) as cm:
4208 stats = server_params_test(client_context, server_context,
4209 chatty=False,
4210 sni_name='supermessage')
4211
4212 self.assertEqual(cm.exception.reason,
4213 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4214 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004215
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004216 @needs_sni
4217 def test_sni_callback_wrong_return_type(self):
4218 # Returning the wrong return type terminates the TLS connection
4219 # with an internal error alert.
4220 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004221
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004222 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4223 return "foo"
4224 server_context.set_servername_callback(cb_wrong_return_type)
4225
Victor Stinner00253502019-06-03 03:51:43 +02004226 with support.catch_unraisable_exception() as catch:
4227 with self.assertRaises(ssl.SSLError) as cm:
4228 stats = server_params_test(client_context, server_context,
4229 chatty=False,
4230 sni_name='supermessage')
4231
4232
4233 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4234 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004235
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004236 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004237 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004238 client_context.set_ciphers("AES128:AES256")
4239 server_context.set_ciphers("AES256")
4240 expected_algs = [
4241 "AES256", "AES-256",
4242 # TLS 1.3 ciphers are always enabled
4243 "TLS_CHACHA20", "TLS_AES",
4244 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004245
Christian Heimesa170fa12017-09-15 20:27:30 +02004246 stats = server_params_test(client_context, server_context,
4247 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004248 ciphers = stats['server_shared_ciphers'][0]
4249 self.assertGreater(len(ciphers), 0)
4250 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004251 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004252 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004253
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004254 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004255 client_context, server_context, hostname = testing_context()
4256 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004257
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004258 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004259 s = client_context.wrap_socket(socket.socket(),
4260 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004261 s.connect((HOST, server.port))
4262 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004263
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004264 self.assertRaises(ValueError, s.read, 1024)
4265 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004266
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004267 def test_sendfile(self):
4268 TEST_DATA = b"x" * 512
4269 with open(support.TESTFN, 'wb') as f:
4270 f.write(TEST_DATA)
4271 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004272 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004273 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004274 context.load_verify_locations(SIGNING_CA)
4275 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004276 server = ThreadedEchoServer(context=context, chatty=False)
4277 with server:
4278 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004279 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004280 with open(support.TESTFN, 'rb') as file:
4281 s.sendfile(file)
4282 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004283
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004284 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004285 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004286 # TODO: sessions aren't compatible with TLSv1.3 yet
4287 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004288
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004289 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004290 stats = server_params_test(client_context, server_context,
4291 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004292 session = stats['session']
4293 self.assertTrue(session.id)
4294 self.assertGreater(session.time, 0)
4295 self.assertGreater(session.timeout, 0)
4296 self.assertTrue(session.has_ticket)
4297 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4298 self.assertGreater(session.ticket_lifetime_hint, 0)
4299 self.assertFalse(stats['session_reused'])
4300 sess_stat = server_context.session_stats()
4301 self.assertEqual(sess_stat['accept'], 1)
4302 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004304 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004305 stats = server_params_test(client_context, server_context,
4306 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004307 sess_stat = server_context.session_stats()
4308 self.assertEqual(sess_stat['accept'], 2)
4309 self.assertEqual(sess_stat['hits'], 1)
4310 self.assertTrue(stats['session_reused'])
4311 session2 = stats['session']
4312 self.assertEqual(session2.id, session.id)
4313 self.assertEqual(session2, session)
4314 self.assertIsNot(session2, session)
4315 self.assertGreaterEqual(session2.time, session.time)
4316 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004317
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004318 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004319 stats = server_params_test(client_context, server_context,
4320 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004321 self.assertFalse(stats['session_reused'])
4322 session3 = stats['session']
4323 self.assertNotEqual(session3.id, session.id)
4324 self.assertNotEqual(session3, session)
4325 sess_stat = server_context.session_stats()
4326 self.assertEqual(sess_stat['accept'], 3)
4327 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004329 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004330 stats = server_params_test(client_context, server_context,
4331 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004332 self.assertTrue(stats['session_reused'])
4333 session4 = stats['session']
4334 self.assertEqual(session4.id, session.id)
4335 self.assertEqual(session4, session)
4336 self.assertGreaterEqual(session4.time, session.time)
4337 self.assertGreaterEqual(session4.timeout, session.timeout)
4338 sess_stat = server_context.session_stats()
4339 self.assertEqual(sess_stat['accept'], 4)
4340 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004341
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004342 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004343 client_context, server_context, hostname = testing_context()
4344 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004345
Christian Heimes05d9fe32018-02-27 08:55:39 +01004346 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004347 client_context.options |= ssl.OP_NO_TLSv1_3
4348 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004349
Christian Heimesa170fa12017-09-15 20:27:30 +02004350 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004351 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004352 with client_context.wrap_socket(socket.socket(),
4353 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004354 # session is None before handshake
4355 self.assertEqual(s.session, None)
4356 self.assertEqual(s.session_reused, None)
4357 s.connect((HOST, server.port))
4358 session = s.session
4359 self.assertTrue(session)
4360 with self.assertRaises(TypeError) as e:
4361 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004362 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004363
Christian Heimesa170fa12017-09-15 20:27:30 +02004364 with client_context.wrap_socket(socket.socket(),
4365 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004366 s.connect((HOST, server.port))
4367 # cannot set session after handshake
4368 with self.assertRaises(ValueError) as e:
4369 s.session = session
4370 self.assertEqual(str(e.exception),
4371 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004372
Christian Heimesa170fa12017-09-15 20:27:30 +02004373 with client_context.wrap_socket(socket.socket(),
4374 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004375 # can set session before handshake and before the
4376 # connection was established
4377 s.session = session
4378 s.connect((HOST, server.port))
4379 self.assertEqual(s.session.id, session.id)
4380 self.assertEqual(s.session, session)
4381 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004382
Christian Heimesa170fa12017-09-15 20:27:30 +02004383 with client_context2.wrap_socket(socket.socket(),
4384 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004385 # cannot re-use session with a different SSLContext
4386 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004387 s.session = session
4388 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004389 self.assertEqual(str(e.exception),
4390 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004391
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004392
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02004393@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004394class TestPostHandshakeAuth(unittest.TestCase):
4395 def test_pha_setter(self):
4396 protocols = [
4397 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4398 ]
4399 for protocol in protocols:
4400 ctx = ssl.SSLContext(protocol)
4401 self.assertEqual(ctx.post_handshake_auth, False)
4402
4403 ctx.post_handshake_auth = True
4404 self.assertEqual(ctx.post_handshake_auth, True)
4405
4406 ctx.verify_mode = ssl.CERT_REQUIRED
4407 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4408 self.assertEqual(ctx.post_handshake_auth, True)
4409
4410 ctx.post_handshake_auth = False
4411 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4412 self.assertEqual(ctx.post_handshake_auth, False)
4413
4414 ctx.verify_mode = ssl.CERT_OPTIONAL
4415 ctx.post_handshake_auth = True
4416 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4417 self.assertEqual(ctx.post_handshake_auth, True)
4418
4419 def test_pha_required(self):
4420 client_context, server_context, hostname = testing_context()
4421 server_context.post_handshake_auth = True
4422 server_context.verify_mode = ssl.CERT_REQUIRED
4423 client_context.post_handshake_auth = True
4424 client_context.load_cert_chain(SIGNED_CERTFILE)
4425
4426 server = ThreadedEchoServer(context=server_context, chatty=False)
4427 with server:
4428 with client_context.wrap_socket(socket.socket(),
4429 server_hostname=hostname) as s:
4430 s.connect((HOST, server.port))
4431 s.write(b'HASCERT')
4432 self.assertEqual(s.recv(1024), b'FALSE\n')
4433 s.write(b'PHA')
4434 self.assertEqual(s.recv(1024), b'OK\n')
4435 s.write(b'HASCERT')
4436 self.assertEqual(s.recv(1024), b'TRUE\n')
4437 # PHA method just returns true when cert is already available
4438 s.write(b'PHA')
4439 self.assertEqual(s.recv(1024), b'OK\n')
4440 s.write(b'GETCERT')
4441 cert_text = s.recv(4096).decode('us-ascii')
4442 self.assertIn('Python Software Foundation CA', cert_text)
4443
4444 def test_pha_required_nocert(self):
4445 client_context, server_context, hostname = testing_context()
4446 server_context.post_handshake_auth = True
4447 server_context.verify_mode = ssl.CERT_REQUIRED
4448 client_context.post_handshake_auth = True
4449
Victor Stinner73ea5462019-07-09 14:33:49 +02004450 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4451 # (it is only raised sometimes on Windows)
Hai Shie80697d2020-05-28 06:10:27 +08004452 with threading_helper.catch_threading_exception() as cm:
Victor Stinner73ea5462019-07-09 14:33:49 +02004453 server = ThreadedEchoServer(context=server_context, chatty=False)
4454 with server:
4455 with client_context.wrap_socket(socket.socket(),
4456 server_hostname=hostname) as s:
4457 s.connect((HOST, server.port))
4458 s.write(b'PHA')
4459 # receive CertificateRequest
4460 self.assertEqual(s.recv(1024), b'OK\n')
4461 # send empty Certificate + Finish
4462 s.write(b'HASCERT')
4463 # receive alert
4464 with self.assertRaisesRegex(
4465 ssl.SSLError,
4466 'tlsv13 alert certificate required'):
4467 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004468
4469 def test_pha_optional(self):
4470 if support.verbose:
4471 sys.stdout.write("\n")
4472
4473 client_context, server_context, hostname = testing_context()
4474 server_context.post_handshake_auth = True
4475 server_context.verify_mode = ssl.CERT_REQUIRED
4476 client_context.post_handshake_auth = True
4477 client_context.load_cert_chain(SIGNED_CERTFILE)
4478
4479 # check CERT_OPTIONAL
4480 server_context.verify_mode = ssl.CERT_OPTIONAL
4481 server = ThreadedEchoServer(context=server_context, chatty=False)
4482 with server:
4483 with client_context.wrap_socket(socket.socket(),
4484 server_hostname=hostname) as s:
4485 s.connect((HOST, server.port))
4486 s.write(b'HASCERT')
4487 self.assertEqual(s.recv(1024), b'FALSE\n')
4488 s.write(b'PHA')
4489 self.assertEqual(s.recv(1024), b'OK\n')
4490 s.write(b'HASCERT')
4491 self.assertEqual(s.recv(1024), b'TRUE\n')
4492
4493 def test_pha_optional_nocert(self):
4494 if support.verbose:
4495 sys.stdout.write("\n")
4496
4497 client_context, server_context, hostname = testing_context()
4498 server_context.post_handshake_auth = True
4499 server_context.verify_mode = ssl.CERT_OPTIONAL
4500 client_context.post_handshake_auth = True
4501
4502 server = ThreadedEchoServer(context=server_context, chatty=False)
4503 with server:
4504 with client_context.wrap_socket(socket.socket(),
4505 server_hostname=hostname) as s:
4506 s.connect((HOST, server.port))
4507 s.write(b'HASCERT')
4508 self.assertEqual(s.recv(1024), b'FALSE\n')
4509 s.write(b'PHA')
4510 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004511 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004512 s.write(b'HASCERT')
4513 self.assertEqual(s.recv(1024), b'FALSE\n')
4514
4515 def test_pha_no_pha_client(self):
4516 client_context, server_context, hostname = testing_context()
4517 server_context.post_handshake_auth = True
4518 server_context.verify_mode = ssl.CERT_REQUIRED
4519 client_context.load_cert_chain(SIGNED_CERTFILE)
4520
4521 server = ThreadedEchoServer(context=server_context, chatty=False)
4522 with server:
4523 with client_context.wrap_socket(socket.socket(),
4524 server_hostname=hostname) as s:
4525 s.connect((HOST, server.port))
4526 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4527 s.verify_client_post_handshake()
4528 s.write(b'PHA')
4529 self.assertIn(b'extension not received', s.recv(1024))
4530
4531 def test_pha_no_pha_server(self):
4532 # server doesn't have PHA enabled, cert is requested in handshake
4533 client_context, server_context, hostname = testing_context()
4534 server_context.verify_mode = ssl.CERT_REQUIRED
4535 client_context.post_handshake_auth = True
4536 client_context.load_cert_chain(SIGNED_CERTFILE)
4537
4538 server = ThreadedEchoServer(context=server_context, chatty=False)
4539 with server:
4540 with client_context.wrap_socket(socket.socket(),
4541 server_hostname=hostname) as s:
4542 s.connect((HOST, server.port))
4543 s.write(b'HASCERT')
4544 self.assertEqual(s.recv(1024), b'TRUE\n')
4545 # PHA doesn't fail if there is already a cert
4546 s.write(b'PHA')
4547 self.assertEqual(s.recv(1024), b'OK\n')
4548 s.write(b'HASCERT')
4549 self.assertEqual(s.recv(1024), b'TRUE\n')
4550
4551 def test_pha_not_tls13(self):
4552 # TLS 1.2
4553 client_context, server_context, hostname = testing_context()
4554 server_context.verify_mode = ssl.CERT_REQUIRED
4555 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4556 client_context.post_handshake_auth = True
4557 client_context.load_cert_chain(SIGNED_CERTFILE)
4558
4559 server = ThreadedEchoServer(context=server_context, chatty=False)
4560 with server:
4561 with client_context.wrap_socket(socket.socket(),
4562 server_hostname=hostname) as s:
4563 s.connect((HOST, server.port))
4564 # PHA fails for TLS != 1.3
4565 s.write(b'PHA')
4566 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4567
Christian Heimesf0f59302019-07-01 08:29:17 +02004568 def test_bpo37428_pha_cert_none(self):
4569 # verify that post_handshake_auth does not implicitly enable cert
4570 # validation.
4571 hostname = SIGNED_CERTFILE_HOSTNAME
4572 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4573 client_context.post_handshake_auth = True
4574 client_context.load_cert_chain(SIGNED_CERTFILE)
4575 # no cert validation and CA on client side
4576 client_context.check_hostname = False
4577 client_context.verify_mode = ssl.CERT_NONE
4578
4579 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4580 server_context.load_cert_chain(SIGNED_CERTFILE)
4581 server_context.load_verify_locations(SIGNING_CA)
4582 server_context.post_handshake_auth = True
4583 server_context.verify_mode = ssl.CERT_REQUIRED
4584
4585 server = ThreadedEchoServer(context=server_context, chatty=False)
4586 with server:
4587 with client_context.wrap_socket(socket.socket(),
4588 server_hostname=hostname) as s:
4589 s.connect((HOST, server.port))
4590 s.write(b'HASCERT')
4591 self.assertEqual(s.recv(1024), b'FALSE\n')
4592 s.write(b'PHA')
4593 self.assertEqual(s.recv(1024), b'OK\n')
4594 s.write(b'HASCERT')
4595 self.assertEqual(s.recv(1024), b'TRUE\n')
4596 # server cert has not been validated
4597 self.assertEqual(s.getpeercert(), {})
4598
Christian Heimes9fb051f2018-09-23 08:32:31 +02004599
Christian Heimesc7f70692019-05-31 11:44:05 +02004600HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4601requires_keylog = unittest.skipUnless(
4602 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4603
4604class TestSSLDebug(unittest.TestCase):
4605
4606 def keylog_lines(self, fname=support.TESTFN):
4607 with open(fname) as f:
4608 return len(list(f))
4609
4610 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004611 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004612 def test_keylog_defaults(self):
4613 self.addCleanup(support.unlink, support.TESTFN)
4614 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4615 self.assertEqual(ctx.keylog_filename, None)
4616
4617 self.assertFalse(os.path.isfile(support.TESTFN))
4618 ctx.keylog_filename = support.TESTFN
4619 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4620 self.assertTrue(os.path.isfile(support.TESTFN))
4621 self.assertEqual(self.keylog_lines(), 1)
4622
4623 ctx.keylog_filename = None
4624 self.assertEqual(ctx.keylog_filename, None)
4625
4626 with self.assertRaises((IsADirectoryError, PermissionError)):
4627 # Windows raises PermissionError
4628 ctx.keylog_filename = os.path.dirname(
4629 os.path.abspath(support.TESTFN))
4630
4631 with self.assertRaises(TypeError):
4632 ctx.keylog_filename = 1
4633
4634 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004635 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004636 def test_keylog_filename(self):
4637 self.addCleanup(support.unlink, support.TESTFN)
4638 client_context, server_context, hostname = testing_context()
4639
4640 client_context.keylog_filename = support.TESTFN
4641 server = ThreadedEchoServer(context=server_context, chatty=False)
4642 with server:
4643 with client_context.wrap_socket(socket.socket(),
4644 server_hostname=hostname) as s:
4645 s.connect((HOST, server.port))
4646 # header, 5 lines for TLS 1.3
4647 self.assertEqual(self.keylog_lines(), 6)
4648
4649 client_context.keylog_filename = None
4650 server_context.keylog_filename = support.TESTFN
4651 server = ThreadedEchoServer(context=server_context, chatty=False)
4652 with server:
4653 with client_context.wrap_socket(socket.socket(),
4654 server_hostname=hostname) as s:
4655 s.connect((HOST, server.port))
4656 self.assertGreaterEqual(self.keylog_lines(), 11)
4657
4658 client_context.keylog_filename = support.TESTFN
4659 server_context.keylog_filename = support.TESTFN
4660 server = ThreadedEchoServer(context=server_context, chatty=False)
4661 with server:
4662 with client_context.wrap_socket(socket.socket(),
4663 server_hostname=hostname) as s:
4664 s.connect((HOST, server.port))
4665 self.assertGreaterEqual(self.keylog_lines(), 21)
4666
4667 client_context.keylog_filename = None
4668 server_context.keylog_filename = None
4669
4670 @requires_keylog
4671 @unittest.skipIf(sys.flags.ignore_environment,
4672 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004673 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004674 def test_keylog_env(self):
4675 self.addCleanup(support.unlink, support.TESTFN)
4676 with unittest.mock.patch.dict(os.environ):
4677 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4678 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4679
4680 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4681 self.assertEqual(ctx.keylog_filename, None)
4682
4683 ctx = ssl.create_default_context()
4684 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4685
4686 ctx = ssl._create_stdlib_context()
4687 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4688
4689 def test_msg_callback(self):
4690 client_context, server_context, hostname = testing_context()
4691
4692 def msg_cb(conn, direction, version, content_type, msg_type, data):
4693 pass
4694
4695 self.assertIs(client_context._msg_callback, None)
4696 client_context._msg_callback = msg_cb
4697 self.assertIs(client_context._msg_callback, msg_cb)
4698 with self.assertRaises(TypeError):
4699 client_context._msg_callback = object()
4700
4701 def test_msg_callback_tls12(self):
4702 client_context, server_context, hostname = testing_context()
4703 client_context.options |= ssl.OP_NO_TLSv1_3
4704
4705 msg = []
4706
4707 def msg_cb(conn, direction, version, content_type, msg_type, data):
4708 self.assertIsInstance(conn, ssl.SSLSocket)
4709 self.assertIsInstance(data, bytes)
4710 self.assertIn(direction, {'read', 'write'})
4711 msg.append((direction, version, content_type, msg_type))
4712
4713 client_context._msg_callback = msg_cb
4714
4715 server = ThreadedEchoServer(context=server_context, chatty=False)
4716 with server:
4717 with client_context.wrap_socket(socket.socket(),
4718 server_hostname=hostname) as s:
4719 s.connect((HOST, server.port))
4720
Christian Heimese35d1ba2019-06-03 20:40:15 +02004721 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004722 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4723 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004724 msg
4725 )
4726 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004727 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4728 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004729 msg
4730 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004731
4732
Thomas Woutersed03b412007-08-28 21:37:11 +00004733def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004734 if support.verbose:
4735 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004736 'Mac': platform.mac_ver,
4737 'Windows': platform.win32_ver,
4738 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004739 for name, func in plats.items():
4740 plat = func()
4741 if plat and plat[0]:
4742 plat = '%s %r' % (name, plat)
4743 break
4744 else:
4745 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004746 print("test_ssl: testing with %r %r" %
4747 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4748 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004749 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004750 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4751 try:
4752 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4753 except AttributeError:
4754 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004755
Antoine Pitrou152efa22010-05-16 18:19:27 +00004756 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004757 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004758 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004759 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004760 BADCERT, BADKEY, EMPTYCERT]:
4761 if not os.path.exists(filename):
4762 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004763
Martin Panter3840b2a2016-03-27 01:53:46 +00004764 tests = [
4765 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004766 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004767 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004768 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004769
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004770 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004771 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004772
Hai Shie80697d2020-05-28 06:10:27 +08004773 thread_info = threading_helper.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004774 try:
4775 support.run_unittest(*tests)
4776 finally:
Hai Shie80697d2020-05-28 06:10:27 +08004777 threading_helper.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004778
4779if __name__ == "__main__":
4780 test_main()