blob: 3cd6e927a469e097d49097d435280988fdea3faf [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimesdf6ac7e2019-09-26 17:02:59 +020022import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070023try:
24 import ctypes
25except ImportError:
26 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000027
Antoine Pitrou05d936d2010-10-13 11:38:36 +000028ssl = support.import_module("ssl")
29
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020030from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000031
Paul Monsonf3550692019-06-19 13:09:54 -070032Py_DEBUG = hasattr(sys, 'gettotalrefcount')
33Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
34
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010035PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020037IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010038IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
39IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010040PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000041
Victor Stinner3ef63442019-02-19 18:06:03 +010042PROTOCOL_TO_TLS_VERSION = {}
43for proto, ver in (
44 ("PROTOCOL_SSLv23", "SSLv3"),
45 ("PROTOCOL_TLSv1", "TLSv1"),
46 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
47):
48 try:
49 proto = getattr(ssl, proto)
50 ver = getattr(ssl.TLSVersion, ver)
51 except AttributeError:
52 continue
53 PROTOCOL_TO_TLS_VERSION[proto] = ver
54
Christian Heimesefff7062013-11-21 03:35:02 +010055def data_file(*name):
56 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000057
Antoine Pitrou81564092010-10-08 23:06:24 +000058# The custom key and certificate files used in test_ssl are generated
59# using Lib/test/make_ssl_certs.py.
60# Other certificates are simply fetched from the Internet servers they
61# are meant to authenticate.
62
Antoine Pitrou152efa22010-05-16 18:19:27 +000063CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000064BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000065ONLYCERT = data_file("ssl_cert.pem")
66ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000067BYTES_ONLYCERT = os.fsencode(ONLYCERT)
68BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020069CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
70ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
71KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000072CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000073BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010074CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
75CAFILE_CACERT = data_file("capath", "5ed36f99.0")
76
Christian Heimesbd5c7d22018-01-20 15:16:30 +010077CERTFILE_INFO = {
78 'issuer': ((('countryName', 'XY'),),
79 (('localityName', 'Castle Anthrax'),),
80 (('organizationName', 'Python Software Foundation'),),
81 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020082 'notAfter': 'Aug 26 14:23:15 2028 GMT',
83 'notBefore': 'Aug 29 14:23:15 2018 GMT',
84 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010085 'subject': ((('countryName', 'XY'),),
86 (('localityName', 'Castle Anthrax'),),
87 (('organizationName', 'Python Software Foundation'),),
88 (('commonName', 'localhost'),)),
89 'subjectAltName': (('DNS', 'localhost'),),
90 'version': 3
91}
Antoine Pitrou152efa22010-05-16 18:19:27 +000092
Christian Heimes22587792013-11-21 23:56:13 +010093# empty CRL
94CRLFILE = data_file("revocation.crl")
95
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010096# Two keys and certs signed by the same CA (for SNI tests)
97SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020098SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010099
100SIGNED_CERTFILE_INFO = {
101 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
102 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
103 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
104 'issuer': ((('countryName', 'XY'),),
105 (('organizationName', 'Python Software Foundation CA'),),
106 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200107 'notAfter': 'Jul 7 14:23:16 2028 GMT',
108 'notBefore': 'Aug 29 14:23:16 2018 GMT',
109 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100110 'subject': ((('countryName', 'XY'),),
111 (('localityName', 'Castle Anthrax'),),
112 (('organizationName', 'Python Software Foundation'),),
113 (('commonName', 'localhost'),)),
114 'subjectAltName': (('DNS', 'localhost'),),
115 'version': 3
116}
117
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100118SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200119SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100120SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
121SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
122
Martin Panter3840b2a2016-03-27 01:53:46 +0000123# Same certificate as pycacert.pem, but without extra text in file
124SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200125# cert with all kinds of subject alt names
126ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100127IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Martin Panter3d81d932016-01-14 09:36:00 +0000129REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130
131EMPTYCERT = data_file("nullcert.pem")
132BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000133NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200135NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200136NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100137TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000138
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200139DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100140BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000141
Christian Heimes358cfd42016-09-10 22:43:48 +0200142# Not defined in all versions of OpenSSL
143OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
144OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
145OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
146OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100147OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200148
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100149
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200150def has_tls_protocol(protocol):
151 """Check if a TLS protocol is available and enabled
152
153 :param protocol: enum ssl._SSLMethod member or name
154 :return: bool
155 """
156 if isinstance(protocol, str):
157 assert protocol.startswith('PROTOCOL_')
158 protocol = getattr(ssl, protocol, None)
159 if protocol is None:
160 return False
161 if protocol in {
162 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
163 ssl.PROTOCOL_TLS_CLIENT
164 }:
165 # auto-negotiate protocols are always available
166 return True
167 name = protocol.name
168 return has_tls_version(name[len('PROTOCOL_'):])
169
170
171@functools.lru_cache
172def has_tls_version(version):
173 """Check if a TLS/SSL version is enabled
174
175 :param version: TLS version name or ssl.TLSVersion member
176 :return: bool
177 """
178 if version == "SSLv2":
179 # never supported and not even in TLSVersion enum
180 return False
181
182 if isinstance(version, str):
183 version = ssl.TLSVersion.__members__[version]
184
185 # check compile time flags like ssl.HAS_TLSv1_2
186 if not getattr(ssl, f'HAS_{version.name}'):
187 return False
188
189 # check runtime and dynamic crypto policy settings. A TLS version may
190 # be compiled in but disabled by a policy or config option.
191 ctx = ssl.SSLContext()
192 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200193 hasattr(ctx, 'minimum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200194 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
195 version < ctx.minimum_version
196 ):
197 return False
198 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200199 hasattr(ctx, 'maximum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200200 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
201 version > ctx.maximum_version
202 ):
203 return False
204
205 return True
206
207
208def requires_tls_version(version):
209 """Decorator to skip tests when a required TLS version is not available
210
211 :param version: TLS version name or ssl.TLSVersion member
212 :return:
213 """
214 def decorator(func):
215 @functools.wraps(func)
216 def wrapper(*args, **kw):
217 if not has_tls_version(version):
218 raise unittest.SkipTest(f"{version} is not available.")
219 else:
220 return func(*args, **kw)
221 return wrapper
222 return decorator
223
224
225requires_minimum_version = unittest.skipUnless(
226 hasattr(ssl.SSLContext, 'minimum_version'),
227 "required OpenSSL >= 1.1.0g"
228)
229
230
Thomas Woutersed03b412007-08-28 21:37:11 +0000231def handle_error(prefix):
232 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000233 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000234 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000235
Antoine Pitroub5218772010-05-21 09:56:06 +0000236def can_clear_options():
237 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200238 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000239
240def no_sslv2_implies_sslv3_hello():
241 # 0.9.7h or higher
242 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
243
Christian Heimes2427b502013-11-23 11:24:32 +0100244def have_verify_flags():
245 # 0.9.8 or higher
246 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
247
Christian Heimesb7b92252018-02-25 09:49:31 +0100248def _have_secp_curves():
249 if not ssl.HAS_ECDH:
250 return False
251 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
252 try:
253 ctx.set_ecdh_curve("secp384r1")
254 except ValueError:
255 return False
256 else:
257 return True
258
259
260HAVE_SECP_CURVES = _have_secp_curves()
261
262
Antoine Pitrouc695c952014-04-28 20:57:36 +0200263def utc_offset(): #NOTE: ignore issues like #1647654
264 # local time = utc time + utc offset
265 if time.daylight and time.localtime().tm_isdst > 0:
266 return -time.altzone # seconds
267 return -time.timezone
268
Christian Heimes9424bb42013-06-17 15:32:57 +0200269def asn1time(cert_time):
270 # Some versions of OpenSSL ignore seconds, see #18207
271 # 0.9.8.i
272 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
273 fmt = "%b %d %H:%M:%S %Y GMT"
274 dt = datetime.datetime.strptime(cert_time, fmt)
275 dt = dt.replace(second=0)
276 cert_time = dt.strftime(fmt)
277 # %d adds leading zero but ASN1_TIME_print() uses leading space
278 if cert_time[4] == "0":
279 cert_time = cert_time[:4] + " " + cert_time[5:]
280
281 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000282
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100283needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
284
Antoine Pitrou23df4832010-08-04 17:14:06 +0000285
Christian Heimesd0486372016-09-10 23:23:33 +0200286def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
287 cert_reqs=ssl.CERT_NONE, ca_certs=None,
288 ciphers=None, certfile=None, keyfile=None,
289 **kwargs):
290 context = ssl.SSLContext(ssl_version)
291 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200292 if cert_reqs == ssl.CERT_NONE:
293 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200294 context.verify_mode = cert_reqs
295 if ca_certs is not None:
296 context.load_verify_locations(ca_certs)
297 if certfile is not None or keyfile is not None:
298 context.load_cert_chain(certfile, keyfile)
299 if ciphers is not None:
300 context.set_ciphers(ciphers)
301 return context.wrap_socket(sock, **kwargs)
302
Christian Heimesa170fa12017-09-15 20:27:30 +0200303
304def testing_context(server_cert=SIGNED_CERTFILE):
305 """Create context
306
307 client_context, server_context, hostname = testing_context()
308 """
309 if server_cert == SIGNED_CERTFILE:
310 hostname = SIGNED_CERTFILE_HOSTNAME
311 elif server_cert == SIGNED_CERTFILE2:
312 hostname = SIGNED_CERTFILE2_HOSTNAME
313 else:
314 raise ValueError(server_cert)
315
316 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
317 client_context.load_verify_locations(SIGNING_CA)
318
319 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
320 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200321 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200322
323 return client_context, server_context, hostname
324
325
Antoine Pitrou152efa22010-05-16 18:19:27 +0000326class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000327
Antoine Pitrou480a1242010-04-28 21:37:09 +0000328 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000329 ssl.CERT_NONE
330 ssl.CERT_OPTIONAL
331 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100332 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100333 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100334 if ssl.HAS_ECDH:
335 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100336 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
337 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000338 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100339 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700340 ssl.OP_NO_SSLv2
341 ssl.OP_NO_SSLv3
342 ssl.OP_NO_TLSv1
343 ssl.OP_NO_TLSv1_3
344 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
345 ssl.OP_NO_TLSv1_1
346 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200347 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000348
Christian Heimes9d50ab52018-02-27 10:17:30 +0100349 def test_private_init(self):
350 with self.assertRaisesRegex(TypeError, "public constructor"):
351 with socket.socket() as s:
352 ssl.SSLSocket(s)
353
Antoine Pitrou172f0252014-04-18 20:33:08 +0200354 def test_str_for_enums(self):
355 # Make sure that the PROTOCOL_* constants have enum-like string
356 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200357 proto = ssl.PROTOCOL_TLS
358 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200359 ctx = ssl.SSLContext(proto)
360 self.assertIs(ctx.protocol, proto)
361
Antoine Pitrou480a1242010-04-28 21:37:09 +0000362 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000363 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000364 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000365 sys.stdout.write("\n RAND_status is %d (%s)\n"
366 % (v, (v and "sufficient randomness") or
367 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200368
369 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
370 self.assertEqual(len(data), 16)
371 self.assertEqual(is_cryptographic, v == 1)
372 if v:
373 data = ssl.RAND_bytes(16)
374 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200375 else:
376 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200377
Victor Stinner1e81a392013-12-19 16:47:04 +0100378 # negative num is invalid
379 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
380 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
381
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100382 if hasattr(ssl, 'RAND_egd'):
383 self.assertRaises(TypeError, ssl.RAND_egd, 1)
384 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000385 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200386 ssl.RAND_add(b"this is a random bytes object", 75.0)
387 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000388
Christian Heimesf77b4b22013-08-21 13:26:05 +0200389 @unittest.skipUnless(os.name == 'posix', 'requires posix')
390 def test_random_fork(self):
391 status = ssl.RAND_status()
392 if not status:
393 self.fail("OpenSSL's PRNG has insufficient randomness")
394
395 rfd, wfd = os.pipe()
396 pid = os.fork()
397 if pid == 0:
398 try:
399 os.close(rfd)
400 child_random = ssl.RAND_pseudo_bytes(16)[0]
401 self.assertEqual(len(child_random), 16)
402 os.write(wfd, child_random)
403 os.close(wfd)
404 except BaseException:
405 os._exit(1)
406 else:
407 os._exit(0)
408 else:
409 os.close(wfd)
410 self.addCleanup(os.close, rfd)
411 _, status = os.waitpid(pid, 0)
412 self.assertEqual(status, 0)
413
414 child_random = os.read(rfd, 16)
415 self.assertEqual(len(child_random), 16)
416 parent_random = ssl.RAND_pseudo_bytes(16)[0]
417 self.assertEqual(len(parent_random), 16)
418
419 self.assertNotEqual(child_random, parent_random)
420
Christian Heimese6dac002018-08-30 07:25:49 +0200421 maxDiff = None
422
Antoine Pitrou480a1242010-04-28 21:37:09 +0000423 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000424 # note that this uses an 'unofficial' function in _ssl.c,
425 # provided solely for this test, to exercise the certificate
426 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100427 self.assertEqual(
428 ssl._ssl._test_decode_cert(CERTFILE),
429 CERTFILE_INFO
430 )
431 self.assertEqual(
432 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
433 SIGNED_CERTFILE_INFO
434 )
435
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200436 # Issue #13034: the subjectAltName in some certificates
437 # (notably projects.developer.nokia.com:443) wasn't parsed
438 p = ssl._ssl._test_decode_cert(NOKIACERT)
439 if support.verbose:
440 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
441 self.assertEqual(p['subjectAltName'],
442 (('DNS', 'projects.developer.nokia.com'),
443 ('DNS', 'projects.forum.nokia.com'))
444 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100445 # extra OCSP and AIA fields
446 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
447 self.assertEqual(p['caIssuers'],
448 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
449 self.assertEqual(p['crlDistributionPoints'],
450 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000451
Christian Heimesa37f5242019-01-15 23:47:42 +0100452 def test_parse_cert_CVE_2019_5010(self):
453 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
454 if support.verbose:
455 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
456 self.assertEqual(
457 p,
458 {
459 'issuer': (
460 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
461 'notAfter': 'Jun 14 18:00:58 2028 GMT',
462 'notBefore': 'Jun 18 18:00:58 2018 GMT',
463 'serialNumber': '02',
464 'subject': ((('countryName', 'UK'),),
465 (('commonName',
466 'codenomicon-vm-2.test.lal.cisco.com'),)),
467 'subjectAltName': (
468 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
469 'version': 3
470 }
471 )
472
Christian Heimes824f7f32013-08-17 00:54:47 +0200473 def test_parse_cert_CVE_2013_4238(self):
474 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
475 if support.verbose:
476 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
477 subject = ((('countryName', 'US'),),
478 (('stateOrProvinceName', 'Oregon'),),
479 (('localityName', 'Beaverton'),),
480 (('organizationName', 'Python Software Foundation'),),
481 (('organizationalUnitName', 'Python Core Development'),),
482 (('commonName', 'null.python.org\x00example.org'),),
483 (('emailAddress', 'python-dev@python.org'),))
484 self.assertEqual(p['subject'], subject)
485 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200486 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
487 san = (('DNS', 'altnull.python.org\x00example.com'),
488 ('email', 'null@python.org\x00user@example.org'),
489 ('URI', 'http://null.python.org\x00http://example.org'),
490 ('IP Address', '192.0.2.1'),
491 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
492 else:
493 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
494 san = (('DNS', 'altnull.python.org\x00example.com'),
495 ('email', 'null@python.org\x00user@example.org'),
496 ('URI', 'http://null.python.org\x00http://example.org'),
497 ('IP Address', '192.0.2.1'),
498 ('IP Address', '<invalid>'))
499
500 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200501
Christian Heimes1c03abd2016-09-06 23:25:35 +0200502 def test_parse_all_sans(self):
503 p = ssl._ssl._test_decode_cert(ALLSANFILE)
504 self.assertEqual(p['subjectAltName'],
505 (
506 ('DNS', 'allsans'),
507 ('othername', '<unsupported>'),
508 ('othername', '<unsupported>'),
509 ('email', 'user@example.org'),
510 ('DNS', 'www.example.org'),
511 ('DirName',
512 ((('countryName', 'XY'),),
513 (('localityName', 'Castle Anthrax'),),
514 (('organizationName', 'Python Software Foundation'),),
515 (('commonName', 'dirname example'),))),
516 ('URI', 'https://www.python.org/'),
517 ('IP Address', '127.0.0.1'),
518 ('IP Address', '0:0:0:0:0:0:0:1\n'),
519 ('Registered ID', '1.2.3.4.5')
520 )
521 )
522
Antoine Pitrou480a1242010-04-28 21:37:09 +0000523 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000524 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000525 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000526 d1 = ssl.PEM_cert_to_DER_cert(pem)
527 p2 = ssl.DER_cert_to_PEM_cert(d1)
528 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000529 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000530 if not p2.startswith(ssl.PEM_HEADER + '\n'):
531 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
532 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
533 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000534
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000535 def test_openssl_version(self):
536 n = ssl.OPENSSL_VERSION_NUMBER
537 t = ssl.OPENSSL_VERSION_INFO
538 s = ssl.OPENSSL_VERSION
539 self.assertIsInstance(n, int)
540 self.assertIsInstance(t, tuple)
541 self.assertIsInstance(s, str)
542 # Some sanity checks follow
543 # >= 0.9
544 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400545 # < 3.0
546 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000547 major, minor, fix, patch, status = t
548 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400549 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000550 self.assertGreaterEqual(minor, 0)
551 self.assertLess(minor, 256)
552 self.assertGreaterEqual(fix, 0)
553 self.assertLess(fix, 256)
554 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100555 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000556 self.assertGreaterEqual(status, 0)
557 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400558 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200559 if IS_LIBRESSL:
560 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100561 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400562 else:
563 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100564 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000565
Antoine Pitrou9d543662010-04-23 23:10:32 +0000566 @support.cpython_only
567 def test_refcycle(self):
568 # Issue #7943: an SSL object doesn't create reference cycles with
569 # itself.
570 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200571 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000572 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100573 with support.check_warnings(("", ResourceWarning)):
574 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100575 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000576
Antoine Pitroua468adc2010-09-14 14:43:44 +0000577 def test_wrapped_unconnected(self):
578 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200579 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000580 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200581 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100582 self.assertRaises(OSError, ss.recv, 1)
583 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
584 self.assertRaises(OSError, ss.recvfrom, 1)
585 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
586 self.assertRaises(OSError, ss.send, b'x')
587 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200588 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100589 self.assertRaises(NotImplementedError, ss.sendmsg,
590 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200591 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
592 self.assertRaises(NotImplementedError, ss.recvmsg_into,
593 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000594
Antoine Pitrou40f08742010-04-24 22:04:40 +0000595 def test_timeout(self):
596 # Issue #8524: when creating an SSL socket, the timeout of the
597 # original socket should be retained.
598 for timeout in (None, 0.0, 5.0):
599 s = socket.socket(socket.AF_INET)
600 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200601 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100602 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000603
Christian Heimesd0486372016-09-10 23:23:33 +0200604 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000605 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000606 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000607 "certfile must be specified",
608 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000609 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000610 "certfile must be specified for server-side operations",
611 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000612 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000613 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200614 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100615 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
616 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200617 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200618 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000619 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000620 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000621 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200622 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000623 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000624 ssl.wrap_socket(sock,
625 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000626 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200627 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000628 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000629 ssl.wrap_socket(sock,
630 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000631 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000632
Martin Panter3464ea22016-02-01 21:58:11 +0000633 def bad_cert_test(self, certfile):
634 """Check that trying to use the given client certificate fails"""
635 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
636 certfile)
637 sock = socket.socket()
638 self.addCleanup(sock.close)
639 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200640 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200641 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000642
643 def test_empty_cert(self):
644 """Wrapping with an empty cert file"""
645 self.bad_cert_test("nullcert.pem")
646
647 def test_malformed_cert(self):
648 """Wrapping with a badly formatted certificate (syntax error)"""
649 self.bad_cert_test("badcert.pem")
650
651 def test_malformed_key(self):
652 """Wrapping with a badly formatted key (syntax error)"""
653 self.bad_cert_test("badkey.pem")
654
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000655 def test_match_hostname(self):
656 def ok(cert, hostname):
657 ssl.match_hostname(cert, hostname)
658 def fail(cert, hostname):
659 self.assertRaises(ssl.CertificateError,
660 ssl.match_hostname, cert, hostname)
661
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100662 # -- Hostname matching --
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 cert = {'subject': ((('commonName', 'example.com'),),)}
665 ok(cert, 'example.com')
666 ok(cert, 'ExAmple.cOm')
667 fail(cert, 'www.example.com')
668 fail(cert, '.example.com')
669 fail(cert, 'example.org')
670 fail(cert, 'exampleXcom')
671
672 cert = {'subject': ((('commonName', '*.a.com'),),)}
673 ok(cert, 'foo.a.com')
674 fail(cert, 'bar.foo.a.com')
675 fail(cert, 'a.com')
676 fail(cert, 'Xa.com')
677 fail(cert, '.a.com')
678
Mandeep Singhede2ac92017-11-27 04:01:27 +0530679 # only match wildcards when they are the only thing
680 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000681 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530682 fail(cert, 'foo.com')
683 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000684 fail(cert, 'bar.com')
685 fail(cert, 'foo.a.com')
686 fail(cert, 'bar.foo.com')
687
Christian Heimes824f7f32013-08-17 00:54:47 +0200688 # NULL bytes are bad, CVE-2013-4073
689 cert = {'subject': ((('commonName',
690 'null.python.org\x00example.org'),),)}
691 ok(cert, 'null.python.org\x00example.org') # or raise an error?
692 fail(cert, 'example.org')
693 fail(cert, 'null.python.org')
694
Georg Brandl72c98d32013-10-27 07:16:53 +0100695 # error cases with wildcards
696 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
697 fail(cert, 'bar.foo.a.com')
698 fail(cert, 'a.com')
699 fail(cert, 'Xa.com')
700 fail(cert, '.a.com')
701
702 cert = {'subject': ((('commonName', 'a.*.com'),),)}
703 fail(cert, 'a.foo.com')
704 fail(cert, 'a..com')
705 fail(cert, 'a.com')
706
707 # wildcard doesn't match IDNA prefix 'xn--'
708 idna = 'püthon.python.org'.encode("idna").decode("ascii")
709 cert = {'subject': ((('commonName', idna),),)}
710 ok(cert, idna)
711 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
712 fail(cert, idna)
713 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
714 fail(cert, idna)
715
716 # wildcard in first fragment and IDNA A-labels in sequent fragments
717 # are supported.
718 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
719 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530720 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
721 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100722 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
723 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
724
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000725 # Slightly fake real-world example
726 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
727 'subject': ((('commonName', 'linuxfrz.org'),),),
728 'subjectAltName': (('DNS', 'linuxfr.org'),
729 ('DNS', 'linuxfr.com'),
730 ('othername', '<unsupported>'))}
731 ok(cert, 'linuxfr.org')
732 ok(cert, 'linuxfr.com')
733 # Not a "DNS" entry
734 fail(cert, '<unsupported>')
735 # When there is a subjectAltName, commonName isn't used
736 fail(cert, 'linuxfrz.org')
737
738 # A pristine real-world example
739 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
740 'subject': ((('countryName', 'US'),),
741 (('stateOrProvinceName', 'California'),),
742 (('localityName', 'Mountain View'),),
743 (('organizationName', 'Google Inc'),),
744 (('commonName', 'mail.google.com'),))}
745 ok(cert, 'mail.google.com')
746 fail(cert, 'gmail.com')
747 # Only commonName is considered
748 fail(cert, 'California')
749
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100750 # -- IPv4 matching --
751 cert = {'subject': ((('commonName', 'example.com'),),),
752 'subjectAltName': (('DNS', 'example.com'),
753 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200754 ('IP Address', '14.15.16.17'),
755 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100756 ok(cert, '10.11.12.13')
757 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200758 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
759 fail(cert, '127.1')
760 fail(cert, '14.15.16.17 ')
761 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100762 fail(cert, '14.15.16.18')
763 fail(cert, 'example.net')
764
765 # -- IPv6 matching --
Zackery Spytzc2cda632019-06-30 09:24:43 -0600766 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100767 cert = {'subject': ((('commonName', 'example.com'),),),
768 'subjectAltName': (
769 ('DNS', 'example.com'),
770 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
771 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
772 ok(cert, '2001::cafe')
773 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200774 fail(cert, '2003::baba ')
775 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100776 fail(cert, '2003::bebe')
777 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100778
779 # -- Miscellaneous --
780
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000781 # Neither commonName nor subjectAltName
782 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
783 'subject': ((('countryName', 'US'),),
784 (('stateOrProvinceName', 'California'),),
785 (('localityName', 'Mountain View'),),
786 (('organizationName', 'Google Inc'),))}
787 fail(cert, 'mail.google.com')
788
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200789 # No DNS entry in subjectAltName but a commonName
790 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
791 'subject': ((('countryName', 'US'),),
792 (('stateOrProvinceName', 'California'),),
793 (('localityName', 'Mountain View'),),
794 (('commonName', 'mail.google.com'),)),
795 'subjectAltName': (('othername', 'blabla'), )}
796 ok(cert, 'mail.google.com')
797
798 # No DNS entry subjectAltName and no commonName
799 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
800 'subject': ((('countryName', 'US'),),
801 (('stateOrProvinceName', 'California'),),
802 (('localityName', 'Mountain View'),),
803 (('organizationName', 'Google Inc'),)),
804 'subjectAltName': (('othername', 'blabla'),)}
805 fail(cert, 'google.com')
806
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000807 # Empty cert / no cert
808 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
809 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
810
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200811 # Issue #17980: avoid denials of service by refusing more than one
812 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100813 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
814 with self.assertRaisesRegex(
815 ssl.CertificateError,
816 "partial wildcards in leftmost label are not supported"):
817 ssl.match_hostname(cert, 'axxb.example.com')
818
819 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
820 with self.assertRaisesRegex(
821 ssl.CertificateError,
822 "wildcard can only be present in the leftmost label"):
823 ssl.match_hostname(cert, 'www.sub.example.com')
824
825 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
826 with self.assertRaisesRegex(
827 ssl.CertificateError,
828 "too many wildcards"):
829 ssl.match_hostname(cert, 'axxbxxc.example.com')
830
831 cert = {'subject': ((('commonName', '*'),),)}
832 with self.assertRaisesRegex(
833 ssl.CertificateError,
834 "sole wildcard without additional labels are not support"):
835 ssl.match_hostname(cert, 'host')
836
837 cert = {'subject': ((('commonName', '*.com'),),)}
838 with self.assertRaisesRegex(
839 ssl.CertificateError,
840 r"hostname 'com' doesn't match '\*.com'"):
841 ssl.match_hostname(cert, 'com')
842
843 # extra checks for _inet_paton()
844 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
845 with self.assertRaises(ValueError):
846 ssl._inet_paton(invalid)
847 for ipaddr in ['127.0.0.1', '192.168.0.1']:
848 self.assertTrue(ssl._inet_paton(ipaddr))
Zackery Spytzc2cda632019-06-30 09:24:43 -0600849 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100850 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
851 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200852
Antoine Pitroud5323212010-10-22 18:19:07 +0000853 def test_server_side(self):
854 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200855 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000856 with socket.socket() as sock:
857 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
858 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000859
Antoine Pitroud6494802011-07-21 01:11:30 +0200860 def test_unknown_channel_binding(self):
861 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200862 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200863 c = socket.socket(socket.AF_INET)
864 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200865 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100866 with self.assertRaises(ValueError):
867 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200868 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200869
870 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
871 "'tls-unique' channel binding not available")
872 def test_tls_unique_channel_binding(self):
873 # unconnected should return None for known type
874 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200875 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100876 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200877 # the same for server-side
878 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200879 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100880 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200881
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600882 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200883 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600884 r = repr(ss)
885 with self.assertWarns(ResourceWarning) as cm:
886 ss = None
887 support.gc_collect()
888 self.assertIn(r, str(cm.warning.args[0]))
889
Christian Heimes6d7ad132013-06-09 18:02:55 +0200890 def test_get_default_verify_paths(self):
891 paths = ssl.get_default_verify_paths()
892 self.assertEqual(len(paths), 6)
893 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
894
895 with support.EnvironmentVarGuard() as env:
896 env["SSL_CERT_DIR"] = CAPATH
897 env["SSL_CERT_FILE"] = CERTFILE
898 paths = ssl.get_default_verify_paths()
899 self.assertEqual(paths.cafile, CERTFILE)
900 self.assertEqual(paths.capath, CAPATH)
901
Christian Heimes44109d72013-11-22 01:51:30 +0100902 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
903 def test_enum_certificates(self):
904 self.assertTrue(ssl.enum_certificates("CA"))
905 self.assertTrue(ssl.enum_certificates("ROOT"))
906
907 self.assertRaises(TypeError, ssl.enum_certificates)
908 self.assertRaises(WindowsError, ssl.enum_certificates, "")
909
Christian Heimesc2d65e12013-11-22 16:13:55 +0100910 trust_oids = set()
911 for storename in ("CA", "ROOT"):
912 store = ssl.enum_certificates(storename)
913 self.assertIsInstance(store, list)
914 for element in store:
915 self.assertIsInstance(element, tuple)
916 self.assertEqual(len(element), 3)
917 cert, enc, trust = element
918 self.assertIsInstance(cert, bytes)
919 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Christian Heimes915cd3f2019-09-09 18:06:55 +0200920 self.assertIsInstance(trust, (frozenset, set, bool))
921 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100922 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100923
924 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100925 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200926
Christian Heimes46bebee2013-06-09 19:03:31 +0200927 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100928 def test_enum_crls(self):
929 self.assertTrue(ssl.enum_crls("CA"))
930 self.assertRaises(TypeError, ssl.enum_crls)
931 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200932
Christian Heimes44109d72013-11-22 01:51:30 +0100933 crls = ssl.enum_crls("CA")
934 self.assertIsInstance(crls, list)
935 for element in crls:
936 self.assertIsInstance(element, tuple)
937 self.assertEqual(len(element), 2)
938 self.assertIsInstance(element[0], bytes)
939 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200940
Christian Heimes46bebee2013-06-09 19:03:31 +0200941
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100942 def test_asn1object(self):
943 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
944 '1.3.6.1.5.5.7.3.1')
945
946 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
947 self.assertEqual(val, expected)
948 self.assertEqual(val.nid, 129)
949 self.assertEqual(val.shortname, 'serverAuth')
950 self.assertEqual(val.longname, 'TLS Web Server Authentication')
951 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
952 self.assertIsInstance(val, ssl._ASN1Object)
953 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
954
955 val = ssl._ASN1Object.fromnid(129)
956 self.assertEqual(val, expected)
957 self.assertIsInstance(val, ssl._ASN1Object)
958 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100959 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
960 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100961 for i in range(1000):
962 try:
963 obj = ssl._ASN1Object.fromnid(i)
964 except ValueError:
965 pass
966 else:
967 self.assertIsInstance(obj.nid, int)
968 self.assertIsInstance(obj.shortname, str)
969 self.assertIsInstance(obj.longname, str)
970 self.assertIsInstance(obj.oid, (str, type(None)))
971
972 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
973 self.assertEqual(val, expected)
974 self.assertIsInstance(val, ssl._ASN1Object)
975 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
976 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
977 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100978 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
979 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100980
Christian Heimes72d28502013-11-23 13:56:58 +0100981 def test_purpose_enum(self):
982 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
983 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
984 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
985 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
986 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
987 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
988 '1.3.6.1.5.5.7.3.1')
989
990 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
991 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
992 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
993 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
994 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
995 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
996 '1.3.6.1.5.5.7.3.2')
997
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100998 def test_unsupported_dtls(self):
999 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1000 self.addCleanup(s.close)
1001 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +02001002 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001003 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001005 with self.assertRaises(NotImplementedError) as cx:
1006 ctx.wrap_socket(s)
1007 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1008
Antoine Pitrouc695c952014-04-28 20:57:36 +02001009 def cert_time_ok(self, timestring, timestamp):
1010 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1011
1012 def cert_time_fail(self, timestring):
1013 with self.assertRaises(ValueError):
1014 ssl.cert_time_to_seconds(timestring)
1015
1016 @unittest.skipUnless(utc_offset(),
1017 'local time needs to be different from UTC')
1018 def test_cert_time_to_seconds_timezone(self):
1019 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1020 # results if local timezone is not UTC
1021 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1022 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1023
1024 def test_cert_time_to_seconds(self):
1025 timestring = "Jan 5 09:34:43 2018 GMT"
1026 ts = 1515144883.0
1027 self.cert_time_ok(timestring, ts)
1028 # accept keyword parameter, assert its name
1029 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1030 # accept both %e and %d (space or zero generated by strftime)
1031 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1032 # case-insensitive
1033 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1034 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1035 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1036 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1037 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1038 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1039 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1040 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1041
1042 newyear_ts = 1230768000.0
1043 # leap seconds
1044 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1045 # same timestamp
1046 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1047
1048 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1049 # allow 60th second (even if it is not a leap second)
1050 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1051 # allow 2nd leap second for compatibility with time.strptime()
1052 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1053 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1054
Mike53f7a7c2017-12-14 14:04:53 +03001055 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001056 # 99991231235959Z (rfc 5280)
1057 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1058
1059 @support.run_with_locale('LC_ALL', '')
1060 def test_cert_time_to_seconds_locale(self):
1061 # `cert_time_to_seconds()` should be locale independent
1062
1063 def local_february_name():
1064 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1065
1066 if local_february_name().lower() == 'feb':
1067 self.skipTest("locale-specific month name needs to be "
1068 "different from C locale")
1069
1070 # locale-independent
1071 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1072 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1073
Martin Panter3840b2a2016-03-27 01:53:46 +00001074 def test_connect_ex_error(self):
1075 server = socket.socket(socket.AF_INET)
1076 self.addCleanup(server.close)
1077 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001078 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001079 cert_reqs=ssl.CERT_REQUIRED)
1080 self.addCleanup(s.close)
1081 rc = s.connect_ex((HOST, port))
1082 # Issue #19919: Windows machines or VMs hosted on Windows
1083 # machines sometimes return EWOULDBLOCK.
1084 errors = (
1085 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1086 errno.EWOULDBLOCK,
1087 )
1088 self.assertIn(rc, errors)
1089
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001090
Antoine Pitrou152efa22010-05-16 18:19:27 +00001091class ContextTests(unittest.TestCase):
1092
1093 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001094 for protocol in PROTOCOLS:
1095 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001096 ctx = ssl.SSLContext()
1097 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001098 self.assertRaises(ValueError, ssl.SSLContext, -1)
1099 self.assertRaises(ValueError, ssl.SSLContext, 42)
1100
1101 def test_protocol(self):
1102 for proto in PROTOCOLS:
1103 ctx = ssl.SSLContext(proto)
1104 self.assertEqual(ctx.protocol, proto)
1105
1106 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001107 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001108 ctx.set_ciphers("ALL")
1109 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001110 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001111 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001112
Christian Heimes892d66e2018-01-29 14:10:18 +01001113 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1114 "Test applies only to Python default ciphers")
1115 def test_python_ciphers(self):
1116 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1117 ciphers = ctx.get_ciphers()
1118 for suite in ciphers:
1119 name = suite['name']
1120 self.assertNotIn("PSK", name)
1121 self.assertNotIn("SRP", name)
1122 self.assertNotIn("MD5", name)
1123 self.assertNotIn("RC4", name)
1124 self.assertNotIn("3DES", name)
1125
Christian Heimes25bfcd52016-09-06 00:04:45 +02001126 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1127 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001129 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001130 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001131 self.assertIn('AES256-GCM-SHA384', names)
1132 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001133
Antoine Pitroub5218772010-05-21 09:56:06 +00001134 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001135 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001136 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001137 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001138 # SSLContext also enables these by default
1139 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001140 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1141 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001142 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001143 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001144 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001145 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001146 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1147 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001148 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001149 # Ubuntu has OP_NO_SSLv3 forced on by default
1150 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001151 else:
1152 with self.assertRaises(ValueError):
1153 ctx.options = 0
1154
Christian Heimesa170fa12017-09-15 20:27:30 +02001155 def test_verify_mode_protocol(self):
1156 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001157 # Default value
1158 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1159 ctx.verify_mode = ssl.CERT_OPTIONAL
1160 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1161 ctx.verify_mode = ssl.CERT_REQUIRED
1162 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1163 ctx.verify_mode = ssl.CERT_NONE
1164 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1165 with self.assertRaises(TypeError):
1166 ctx.verify_mode = None
1167 with self.assertRaises(ValueError):
1168 ctx.verify_mode = 42
1169
Christian Heimesa170fa12017-09-15 20:27:30 +02001170 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1171 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1172 self.assertFalse(ctx.check_hostname)
1173
1174 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1175 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1176 self.assertTrue(ctx.check_hostname)
1177
Christian Heimes61d478c2018-01-27 15:51:38 +01001178 def test_hostname_checks_common_name(self):
1179 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1180 self.assertTrue(ctx.hostname_checks_common_name)
1181 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1182 ctx.hostname_checks_common_name = True
1183 self.assertTrue(ctx.hostname_checks_common_name)
1184 ctx.hostname_checks_common_name = False
1185 self.assertFalse(ctx.hostname_checks_common_name)
1186 ctx.hostname_checks_common_name = True
1187 self.assertTrue(ctx.hostname_checks_common_name)
1188 else:
1189 with self.assertRaises(AttributeError):
1190 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001191
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001192 @requires_minimum_version
Christian Heimesc9bc49c2019-09-11 19:24:47 +02001193 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001194 def test_min_max_version(self):
1195 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001196 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1197 # Fedora override the setting to TLS 1.0.
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001198 minimum_range = {
1199 # stock OpenSSL
1200 ssl.TLSVersion.MINIMUM_SUPPORTED,
1201 # Fedora 29 uses TLS 1.0 by default
1202 ssl.TLSVersion.TLSv1,
1203 # RHEL 8 uses TLS 1.2 by default
1204 ssl.TLSVersion.TLSv1_2
1205 }
1206
Christian Heimes34de2d32019-01-18 16:09:30 +01001207 self.assertIn(
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001208 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001209 )
1210 self.assertEqual(
1211 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1212 )
1213
1214 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1215 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1216 self.assertEqual(
1217 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1218 )
1219 self.assertEqual(
1220 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1221 )
1222
1223 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1224 ctx.maximum_version = ssl.TLSVersion.TLSv1
1225 self.assertEqual(
1226 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1227 )
1228 self.assertEqual(
1229 ctx.maximum_version, ssl.TLSVersion.TLSv1
1230 )
1231
1232 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1233 self.assertEqual(
1234 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1235 )
1236
1237 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1238 self.assertIn(
1239 ctx.maximum_version,
1240 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1241 )
1242
1243 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1244 self.assertIn(
1245 ctx.minimum_version,
1246 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1247 )
1248
1249 with self.assertRaises(ValueError):
1250 ctx.minimum_version = 42
1251
1252 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1253
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001254 self.assertIn(
1255 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001256 )
1257 self.assertEqual(
1258 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1259 )
1260 with self.assertRaises(ValueError):
1261 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1262 with self.assertRaises(ValueError):
1263 ctx.maximum_version = ssl.TLSVersion.TLSv1
1264
1265
Christian Heimes2427b502013-11-23 11:24:32 +01001266 @unittest.skipUnless(have_verify_flags(),
1267 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001268 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001269 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001270 # default value
1271 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1272 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001273 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1274 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1275 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1276 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1277 ctx.verify_flags = ssl.VERIFY_DEFAULT
1278 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1279 # supports any value
1280 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1281 self.assertEqual(ctx.verify_flags,
1282 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1283 with self.assertRaises(TypeError):
1284 ctx.verify_flags = None
1285
Antoine Pitrou152efa22010-05-16 18:19:27 +00001286 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001287 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001288 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001289 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001290 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1291 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001292 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001293 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001294 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001295 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001296 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001297 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001298 ctx.load_cert_chain(EMPTYCERT)
1299 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001300 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001301 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1302 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1303 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001304 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001305 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001306 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001307 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001308 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001309 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1310 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001311 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001312 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001313 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001314 # Password protected key and cert
1315 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1316 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1317 ctx.load_cert_chain(CERTFILE_PROTECTED,
1318 password=bytearray(KEY_PASSWORD.encode()))
1319 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1320 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1321 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1322 bytearray(KEY_PASSWORD.encode()))
1323 with self.assertRaisesRegex(TypeError, "should be a string"):
1324 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1325 with self.assertRaises(ssl.SSLError):
1326 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1327 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1328 # openssl has a fixed limit on the password buffer.
1329 # PEM_BUFSIZE is generally set to 1kb.
1330 # Return a string larger than this.
1331 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1332 # Password callback
1333 def getpass_unicode():
1334 return KEY_PASSWORD
1335 def getpass_bytes():
1336 return KEY_PASSWORD.encode()
1337 def getpass_bytearray():
1338 return bytearray(KEY_PASSWORD.encode())
1339 def getpass_badpass():
1340 return "badpass"
1341 def getpass_huge():
1342 return b'a' * (1024 * 1024)
1343 def getpass_bad_type():
1344 return 9
1345 def getpass_exception():
1346 raise Exception('getpass error')
1347 class GetPassCallable:
1348 def __call__(self):
1349 return KEY_PASSWORD
1350 def getpass(self):
1351 return KEY_PASSWORD
1352 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1353 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1354 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1355 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1356 ctx.load_cert_chain(CERTFILE_PROTECTED,
1357 password=GetPassCallable().getpass)
1358 with self.assertRaises(ssl.SSLError):
1359 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1360 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1361 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1362 with self.assertRaisesRegex(TypeError, "must return a string"):
1363 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1364 with self.assertRaisesRegex(Exception, "getpass error"):
1365 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1366 # Make sure the password function isn't called if it isn't needed
1367 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001368
1369 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001370 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001371 ctx.load_verify_locations(CERTFILE)
1372 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1373 ctx.load_verify_locations(BYTES_CERTFILE)
1374 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1375 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001376 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001377 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001378 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001379 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001380 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001381 ctx.load_verify_locations(BADCERT)
1382 ctx.load_verify_locations(CERTFILE, CAPATH)
1383 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1384
Victor Stinner80f75e62011-01-29 11:31:20 +00001385 # Issue #10989: crash if the second argument type is invalid
1386 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1387
Christian Heimesefff7062013-11-21 03:35:02 +01001388 def test_load_verify_cadata(self):
1389 # test cadata
1390 with open(CAFILE_CACERT) as f:
1391 cacert_pem = f.read()
1392 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1393 with open(CAFILE_NEURONIO) as f:
1394 neuronio_pem = f.read()
1395 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1396
1397 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001398 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001399 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1400 ctx.load_verify_locations(cadata=cacert_pem)
1401 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1402 ctx.load_verify_locations(cadata=neuronio_pem)
1403 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1404 # cert already in hash table
1405 ctx.load_verify_locations(cadata=neuronio_pem)
1406 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1407
1408 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001409 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001410 combined = "\n".join((cacert_pem, neuronio_pem))
1411 ctx.load_verify_locations(cadata=combined)
1412 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1413
1414 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001415 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001416 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1417 neuronio_pem, "tail"]
1418 ctx.load_verify_locations(cadata="\n".join(combined))
1419 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1420
1421 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001423 ctx.load_verify_locations(cadata=cacert_der)
1424 ctx.load_verify_locations(cadata=neuronio_der)
1425 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1426 # cert already in hash table
1427 ctx.load_verify_locations(cadata=cacert_der)
1428 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1429
1430 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001431 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001432 combined = b"".join((cacert_der, neuronio_der))
1433 ctx.load_verify_locations(cadata=combined)
1434 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1435
1436 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001437 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001438 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1439
1440 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1441 ctx.load_verify_locations(cadata="broken")
1442 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1443 ctx.load_verify_locations(cadata=b"broken")
1444
1445
Paul Monsonf3550692019-06-19 13:09:54 -07001446 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001447 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001449 ctx.load_dh_params(DHFILE)
1450 if os.name != 'nt':
1451 ctx.load_dh_params(BYTES_DHFILE)
1452 self.assertRaises(TypeError, ctx.load_dh_params)
1453 self.assertRaises(TypeError, ctx.load_dh_params, None)
1454 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001455 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001456 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001457 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001458 ctx.load_dh_params(CERTFILE)
1459
Antoine Pitroub0182c82010-10-12 20:09:02 +00001460 def test_session_stats(self):
1461 for proto in PROTOCOLS:
1462 ctx = ssl.SSLContext(proto)
1463 self.assertEqual(ctx.session_stats(), {
1464 'number': 0,
1465 'connect': 0,
1466 'connect_good': 0,
1467 'connect_renegotiate': 0,
1468 'accept': 0,
1469 'accept_good': 0,
1470 'accept_renegotiate': 0,
1471 'hits': 0,
1472 'misses': 0,
1473 'timeouts': 0,
1474 'cache_full': 0,
1475 })
1476
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001477 def test_set_default_verify_paths(self):
1478 # There's not much we can do to test that it acts as expected,
1479 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001480 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001481 ctx.set_default_verify_paths()
1482
Antoine Pitrou501da612011-12-21 09:27:41 +01001483 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001484 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001485 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001486 ctx.set_ecdh_curve("prime256v1")
1487 ctx.set_ecdh_curve(b"prime256v1")
1488 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1489 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1490 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1491 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1492
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001493 @needs_sni
1494 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001495 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001496
1497 # set_servername_callback expects a callable, or None
1498 self.assertRaises(TypeError, ctx.set_servername_callback)
1499 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1500 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1501 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1502
1503 def dummycallback(sock, servername, ctx):
1504 pass
1505 ctx.set_servername_callback(None)
1506 ctx.set_servername_callback(dummycallback)
1507
1508 @needs_sni
1509 def test_sni_callback_refcycle(self):
1510 # Reference cycles through the servername callback are detected
1511 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001512 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001513 def dummycallback(sock, servername, ctx, cycle=ctx):
1514 pass
1515 ctx.set_servername_callback(dummycallback)
1516 wr = weakref.ref(ctx)
1517 del ctx, dummycallback
1518 gc.collect()
1519 self.assertIs(wr(), None)
1520
Christian Heimes9a5395a2013-06-17 15:44:12 +02001521 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001522 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001523 self.assertEqual(ctx.cert_store_stats(),
1524 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1525 ctx.load_cert_chain(CERTFILE)
1526 self.assertEqual(ctx.cert_store_stats(),
1527 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1528 ctx.load_verify_locations(CERTFILE)
1529 self.assertEqual(ctx.cert_store_stats(),
1530 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001531 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001532 self.assertEqual(ctx.cert_store_stats(),
1533 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1534
1535 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001536 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001537 self.assertEqual(ctx.get_ca_certs(), [])
1538 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1539 ctx.load_verify_locations(CERTFILE)
1540 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001541 # but CAFILE_CACERT is a CA cert
1542 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001543 self.assertEqual(ctx.get_ca_certs(),
1544 [{'issuer': ((('organizationName', 'Root CA'),),
1545 (('organizationalUnitName', 'http://www.cacert.org'),),
1546 (('commonName', 'CA Cert Signing Authority'),),
1547 (('emailAddress', 'support@cacert.org'),)),
1548 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1549 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1550 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001551 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001552 'subject': ((('organizationName', 'Root CA'),),
1553 (('organizationalUnitName', 'http://www.cacert.org'),),
1554 (('commonName', 'CA Cert Signing Authority'),),
1555 (('emailAddress', 'support@cacert.org'),)),
1556 'version': 3}])
1557
Martin Panterb55f8b72016-01-14 12:53:56 +00001558 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001559 pem = f.read()
1560 der = ssl.PEM_cert_to_DER_cert(pem)
1561 self.assertEqual(ctx.get_ca_certs(True), [der])
1562
Christian Heimes72d28502013-11-23 13:56:58 +01001563 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001564 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001565 ctx.load_default_certs()
1566
Christian Heimesa170fa12017-09-15 20:27:30 +02001567 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001568 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1569 ctx.load_default_certs()
1570
Christian Heimesa170fa12017-09-15 20:27:30 +02001571 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001572 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1573
Christian Heimesa170fa12017-09-15 20:27:30 +02001574 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001575 self.assertRaises(TypeError, ctx.load_default_certs, None)
1576 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1577
Benjamin Peterson91244e02014-10-03 18:17:15 -04001578 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001579 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001580 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001581 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001582 with support.EnvironmentVarGuard() as env:
1583 env["SSL_CERT_DIR"] = CAPATH
1584 env["SSL_CERT_FILE"] = CERTFILE
1585 ctx.load_default_certs()
1586 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1587
Benjamin Peterson91244e02014-10-03 18:17:15 -04001588 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001589 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001590 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001592 ctx.load_default_certs()
1593 stats = ctx.cert_store_stats()
1594
Christian Heimesa170fa12017-09-15 20:27:30 +02001595 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001596 with support.EnvironmentVarGuard() as env:
1597 env["SSL_CERT_DIR"] = CAPATH
1598 env["SSL_CERT_FILE"] = CERTFILE
1599 ctx.load_default_certs()
1600 stats["x509"] += 1
1601 self.assertEqual(ctx.cert_store_stats(), stats)
1602
Christian Heimes358cfd42016-09-10 22:43:48 +02001603 def _assert_context_options(self, ctx):
1604 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1605 if OP_NO_COMPRESSION != 0:
1606 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1607 OP_NO_COMPRESSION)
1608 if OP_SINGLE_DH_USE != 0:
1609 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1610 OP_SINGLE_DH_USE)
1611 if OP_SINGLE_ECDH_USE != 0:
1612 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1613 OP_SINGLE_ECDH_USE)
1614 if OP_CIPHER_SERVER_PREFERENCE != 0:
1615 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1616 OP_CIPHER_SERVER_PREFERENCE)
1617
Christian Heimes4c05b472013-11-23 15:58:30 +01001618 def test_create_default_context(self):
1619 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001620
Christian Heimesa170fa12017-09-15 20:27:30 +02001621 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001622 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001623 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001624 self._assert_context_options(ctx)
1625
Christian Heimes4c05b472013-11-23 15:58:30 +01001626 with open(SIGNING_CA) as f:
1627 cadata = f.read()
1628 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1629 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001630 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001631 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001632 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001633
1634 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001635 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001636 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001637 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001638
Christian Heimes67986f92013-11-23 22:43:47 +01001639 def test__create_stdlib_context(self):
1640 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001641 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001642 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001643 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001644 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001645
1646 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1647 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1648 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001649 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001650
1651 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001652 cert_reqs=ssl.CERT_REQUIRED,
1653 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001654 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1655 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001656 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001657 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001658
1659 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001660 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001661 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001662 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001663
Christian Heimes1aa9a752013-12-02 02:41:19 +01001664 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001665 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001666 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001667 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001668
Christian Heimese82c0342017-09-15 20:29:57 +02001669 # Auto set CERT_REQUIRED
1670 ctx.check_hostname = True
1671 self.assertTrue(ctx.check_hostname)
1672 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1673 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001674 ctx.verify_mode = ssl.CERT_REQUIRED
1675 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001676 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001677
Christian Heimese82c0342017-09-15 20:29:57 +02001678 # Changing verify_mode does not affect check_hostname
1679 ctx.check_hostname = False
1680 ctx.verify_mode = ssl.CERT_NONE
1681 ctx.check_hostname = False
1682 self.assertFalse(ctx.check_hostname)
1683 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1684 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001685 ctx.check_hostname = True
1686 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001687 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1688
1689 ctx.check_hostname = False
1690 ctx.verify_mode = ssl.CERT_OPTIONAL
1691 ctx.check_hostname = False
1692 self.assertFalse(ctx.check_hostname)
1693 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1694 # keep CERT_OPTIONAL
1695 ctx.check_hostname = True
1696 self.assertTrue(ctx.check_hostname)
1697 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001698
1699 # Cannot set CERT_NONE with check_hostname enabled
1700 with self.assertRaises(ValueError):
1701 ctx.verify_mode = ssl.CERT_NONE
1702 ctx.check_hostname = False
1703 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001704 ctx.verify_mode = ssl.CERT_NONE
1705 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001706
Christian Heimes5fe668c2016-09-12 00:01:11 +02001707 def test_context_client_server(self):
1708 # PROTOCOL_TLS_CLIENT has sane defaults
1709 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1710 self.assertTrue(ctx.check_hostname)
1711 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1712
1713 # PROTOCOL_TLS_SERVER has different but also sane defaults
1714 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1715 self.assertFalse(ctx.check_hostname)
1716 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1717
Christian Heimes4df60f12017-09-15 20:26:05 +02001718 def test_context_custom_class(self):
1719 class MySSLSocket(ssl.SSLSocket):
1720 pass
1721
1722 class MySSLObject(ssl.SSLObject):
1723 pass
1724
1725 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1726 ctx.sslsocket_class = MySSLSocket
1727 ctx.sslobject_class = MySSLObject
1728
1729 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1730 self.assertIsInstance(sock, MySSLSocket)
1731 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1732 self.assertIsInstance(obj, MySSLObject)
1733
Christian Heimes78c7d522019-06-03 21:00:10 +02001734 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1735 def test_num_tickest(self):
1736 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1737 self.assertEqual(ctx.num_tickets, 2)
1738 ctx.num_tickets = 1
1739 self.assertEqual(ctx.num_tickets, 1)
1740 ctx.num_tickets = 0
1741 self.assertEqual(ctx.num_tickets, 0)
1742 with self.assertRaises(ValueError):
1743 ctx.num_tickets = -1
1744 with self.assertRaises(TypeError):
1745 ctx.num_tickets = None
1746
1747 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1748 self.assertEqual(ctx.num_tickets, 2)
1749 with self.assertRaises(ValueError):
1750 ctx.num_tickets = 1
1751
Antoine Pitrou152efa22010-05-16 18:19:27 +00001752
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001753class SSLErrorTests(unittest.TestCase):
1754
1755 def test_str(self):
1756 # The str() of a SSLError doesn't include the errno
1757 e = ssl.SSLError(1, "foo")
1758 self.assertEqual(str(e), "foo")
1759 self.assertEqual(e.errno, 1)
1760 # Same for a subclass
1761 e = ssl.SSLZeroReturnError(1, "foo")
1762 self.assertEqual(str(e), "foo")
1763 self.assertEqual(e.errno, 1)
1764
Paul Monsonf3550692019-06-19 13:09:54 -07001765 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001766 def test_lib_reason(self):
1767 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001768 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001769 with self.assertRaises(ssl.SSLError) as cm:
1770 ctx.load_dh_params(CERTFILE)
1771 self.assertEqual(cm.exception.library, 'PEM')
1772 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1773 s = str(cm.exception)
1774 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1775
1776 def test_subclass(self):
1777 # Check that the appropriate SSLError subclass is raised
1778 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001779 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1780 ctx.check_hostname = False
1781 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001782 with socket.create_server(("127.0.0.1", 0)) as s:
1783 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001784 c.setblocking(False)
1785 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001786 with self.assertRaises(ssl.SSLWantReadError) as cm:
1787 c.do_handshake()
1788 s = str(cm.exception)
1789 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1790 # For compatibility
1791 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1792
1793
Christian Heimes61d478c2018-01-27 15:51:38 +01001794 def test_bad_server_hostname(self):
1795 ctx = ssl.create_default_context()
1796 with self.assertRaises(ValueError):
1797 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1798 server_hostname="")
1799 with self.assertRaises(ValueError):
1800 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1801 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001802 with self.assertRaises(TypeError):
1803 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1804 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001805
1806
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001807class MemoryBIOTests(unittest.TestCase):
1808
1809 def test_read_write(self):
1810 bio = ssl.MemoryBIO()
1811 bio.write(b'foo')
1812 self.assertEqual(bio.read(), b'foo')
1813 self.assertEqual(bio.read(), b'')
1814 bio.write(b'foo')
1815 bio.write(b'bar')
1816 self.assertEqual(bio.read(), b'foobar')
1817 self.assertEqual(bio.read(), b'')
1818 bio.write(b'baz')
1819 self.assertEqual(bio.read(2), b'ba')
1820 self.assertEqual(bio.read(1), b'z')
1821 self.assertEqual(bio.read(1), b'')
1822
1823 def test_eof(self):
1824 bio = ssl.MemoryBIO()
1825 self.assertFalse(bio.eof)
1826 self.assertEqual(bio.read(), b'')
1827 self.assertFalse(bio.eof)
1828 bio.write(b'foo')
1829 self.assertFalse(bio.eof)
1830 bio.write_eof()
1831 self.assertFalse(bio.eof)
1832 self.assertEqual(bio.read(2), b'fo')
1833 self.assertFalse(bio.eof)
1834 self.assertEqual(bio.read(1), b'o')
1835 self.assertTrue(bio.eof)
1836 self.assertEqual(bio.read(), b'')
1837 self.assertTrue(bio.eof)
1838
1839 def test_pending(self):
1840 bio = ssl.MemoryBIO()
1841 self.assertEqual(bio.pending, 0)
1842 bio.write(b'foo')
1843 self.assertEqual(bio.pending, 3)
1844 for i in range(3):
1845 bio.read(1)
1846 self.assertEqual(bio.pending, 3-i-1)
1847 for i in range(3):
1848 bio.write(b'x')
1849 self.assertEqual(bio.pending, i+1)
1850 bio.read()
1851 self.assertEqual(bio.pending, 0)
1852
1853 def test_buffer_types(self):
1854 bio = ssl.MemoryBIO()
1855 bio.write(b'foo')
1856 self.assertEqual(bio.read(), b'foo')
1857 bio.write(bytearray(b'bar'))
1858 self.assertEqual(bio.read(), b'bar')
1859 bio.write(memoryview(b'baz'))
1860 self.assertEqual(bio.read(), b'baz')
1861
1862 def test_error_types(self):
1863 bio = ssl.MemoryBIO()
1864 self.assertRaises(TypeError, bio.write, 'foo')
1865 self.assertRaises(TypeError, bio.write, None)
1866 self.assertRaises(TypeError, bio.write, True)
1867 self.assertRaises(TypeError, bio.write, 1)
1868
1869
Christian Heimes9d50ab52018-02-27 10:17:30 +01001870class SSLObjectTests(unittest.TestCase):
1871 def test_private_init(self):
1872 bio = ssl.MemoryBIO()
1873 with self.assertRaisesRegex(TypeError, "public constructor"):
1874 ssl.SSLObject(bio, bio)
1875
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001876 def test_unwrap(self):
1877 client_ctx, server_ctx, hostname = testing_context()
1878 c_in = ssl.MemoryBIO()
1879 c_out = ssl.MemoryBIO()
1880 s_in = ssl.MemoryBIO()
1881 s_out = ssl.MemoryBIO()
1882 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1883 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1884
1885 # Loop on the handshake for a bit to get it settled
1886 for _ in range(5):
1887 try:
1888 client.do_handshake()
1889 except ssl.SSLWantReadError:
1890 pass
1891 if c_out.pending:
1892 s_in.write(c_out.read())
1893 try:
1894 server.do_handshake()
1895 except ssl.SSLWantReadError:
1896 pass
1897 if s_out.pending:
1898 c_in.write(s_out.read())
1899 # Now the handshakes should be complete (don't raise WantReadError)
1900 client.do_handshake()
1901 server.do_handshake()
1902
1903 # Now if we unwrap one side unilaterally, it should send close-notify
1904 # and raise WantReadError:
1905 with self.assertRaises(ssl.SSLWantReadError):
1906 client.unwrap()
1907
1908 # But server.unwrap() does not raise, because it reads the client's
1909 # close-notify:
1910 s_in.write(c_out.read())
1911 server.unwrap()
1912
1913 # And now that the client gets the server's close-notify, it doesn't
1914 # raise either.
1915 c_in.write(s_out.read())
1916 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001917
Martin Panter3840b2a2016-03-27 01:53:46 +00001918class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001919 """Tests that connect to a simple server running in the background"""
1920
1921 def setUp(self):
1922 server = ThreadedEchoServer(SIGNED_CERTFILE)
1923 self.server_addr = (HOST, server.port)
1924 server.__enter__()
1925 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001926
Antoine Pitrou480a1242010-04-28 21:37:09 +00001927 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001928 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001929 cert_reqs=ssl.CERT_NONE) as s:
1930 s.connect(self.server_addr)
1931 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001932 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001933
Martin Panter3840b2a2016-03-27 01:53:46 +00001934 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001935 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 cert_reqs=ssl.CERT_REQUIRED,
1937 ca_certs=SIGNING_CA) as s:
1938 s.connect(self.server_addr)
1939 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001940 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001941
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 def test_connect_fail(self):
1943 # This should fail because we have no verification certs. Connection
1944 # failure crashes ThreadedEchoServer, so run this in an independent
1945 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001946 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001947 cert_reqs=ssl.CERT_REQUIRED)
1948 self.addCleanup(s.close)
1949 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1950 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001951
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001952 def test_connect_ex(self):
1953 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001954 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 cert_reqs=ssl.CERT_REQUIRED,
1956 ca_certs=SIGNING_CA)
1957 self.addCleanup(s.close)
1958 self.assertEqual(0, s.connect_ex(self.server_addr))
1959 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001960
1961 def test_non_blocking_connect_ex(self):
1962 # Issue #11326: non-blocking connect_ex() should allow handshake
1963 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001964 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001965 cert_reqs=ssl.CERT_REQUIRED,
1966 ca_certs=SIGNING_CA,
1967 do_handshake_on_connect=False)
1968 self.addCleanup(s.close)
1969 s.setblocking(False)
1970 rc = s.connect_ex(self.server_addr)
1971 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1972 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1973 # Wait for connect to finish
1974 select.select([], [s], [], 5.0)
1975 # Non-blocking handshake
1976 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001977 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001978 s.do_handshake()
1979 break
1980 except ssl.SSLWantReadError:
1981 select.select([s], [], [], 5.0)
1982 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001983 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001984 # SSL established
1985 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001986
Antoine Pitrou152efa22010-05-16 18:19:27 +00001987 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001988 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001989 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001990 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1991 s.connect(self.server_addr)
1992 self.assertEqual({}, s.getpeercert())
1993 # Same with a server hostname
1994 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1995 server_hostname="dummy") as s:
1996 s.connect(self.server_addr)
1997 ctx.verify_mode = ssl.CERT_REQUIRED
1998 # This should succeed because we specify the root cert
1999 ctx.load_verify_locations(SIGNING_CA)
2000 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2001 s.connect(self.server_addr)
2002 cert = s.getpeercert()
2003 self.assertTrue(cert)
2004
2005 def test_connect_with_context_fail(self):
2006 # This should fail because we have no verification certs. Connection
2007 # failure crashes ThreadedEchoServer, so run this in an independent
2008 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002009 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 ctx.verify_mode = ssl.CERT_REQUIRED
2011 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2012 self.addCleanup(s.close)
2013 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2014 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002015
2016 def test_connect_capath(self):
2017 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002018 # NOTE: the subject hashing algorithm has been changed between
2019 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2020 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002021 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002023 ctx.verify_mode = ssl.CERT_REQUIRED
2024 ctx.load_verify_locations(capath=CAPATH)
2025 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2026 s.connect(self.server_addr)
2027 cert = s.getpeercert()
2028 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002029
Martin Panter3840b2a2016-03-27 01:53:46 +00002030 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002031 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002032 ctx.verify_mode = ssl.CERT_REQUIRED
2033 ctx.load_verify_locations(capath=BYTES_CAPATH)
2034 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2035 s.connect(self.server_addr)
2036 cert = s.getpeercert()
2037 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002038
Christian Heimesefff7062013-11-21 03:35:02 +01002039 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002040 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002041 pem = f.read()
2042 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002044 ctx.verify_mode = ssl.CERT_REQUIRED
2045 ctx.load_verify_locations(cadata=pem)
2046 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2047 s.connect(self.server_addr)
2048 cert = s.getpeercert()
2049 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002050
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002053 ctx.verify_mode = ssl.CERT_REQUIRED
2054 ctx.load_verify_locations(cadata=der)
2055 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2056 s.connect(self.server_addr)
2057 cert = s.getpeercert()
2058 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002059
Antoine Pitroue3220242010-04-24 11:13:53 +00002060 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2061 def test_makefile_close(self):
2062 # Issue #5238: creating a file-like object with makefile() shouldn't
2063 # delay closing the underlying "real socket" (here tested with its
2064 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002065 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002066 ss.connect(self.server_addr)
2067 fd = ss.fileno()
2068 f = ss.makefile()
2069 f.close()
2070 # The fd is still open
2071 os.read(fd, 0)
2072 # Closing the SSL socket should close the fd too
2073 ss.close()
2074 gc.collect()
2075 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002076 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002077 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002078
Antoine Pitrou480a1242010-04-28 21:37:09 +00002079 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002080 s = socket.socket(socket.AF_INET)
2081 s.connect(self.server_addr)
2082 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002083 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002084 cert_reqs=ssl.CERT_NONE,
2085 do_handshake_on_connect=False)
2086 self.addCleanup(s.close)
2087 count = 0
2088 while True:
2089 try:
2090 count += 1
2091 s.do_handshake()
2092 break
2093 except ssl.SSLWantReadError:
2094 select.select([s], [], [])
2095 except ssl.SSLWantWriteError:
2096 select.select([], [s], [])
2097 if support.verbose:
2098 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002099
Antoine Pitrou480a1242010-04-28 21:37:09 +00002100 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002101 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002102
Martin Panter3840b2a2016-03-27 01:53:46 +00002103 def test_get_server_certificate_fail(self):
2104 # Connection failure crashes ThreadedEchoServer, so run this in an
2105 # independent test method
2106 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002107
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002108 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002109 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2111 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002112 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002113 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2114 s.connect(self.server_addr)
2115 # Error checking can happen at instantiation or when connecting
2116 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2117 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002118 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002119 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2120 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002121
Christian Heimes9a5395a2013-06-17 15:44:12 +02002122 def test_get_ca_certs_capath(self):
2123 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002124 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002125 ctx.load_verify_locations(capath=CAPATH)
2126 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002127 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2128 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002129 s.connect(self.server_addr)
2130 cert = s.getpeercert()
2131 self.assertTrue(cert)
2132 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002133
Christian Heimes575596e2013-12-15 21:49:17 +01002134 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002135 def test_context_setget(self):
2136 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002137 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2138 ctx1.load_verify_locations(capath=CAPATH)
2139 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2140 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002141 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002142 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002143 ss.connect(self.server_addr)
2144 self.assertIs(ss.context, ctx1)
2145 self.assertIs(ss._sslobj.context, ctx1)
2146 ss.context = ctx2
2147 self.assertIs(ss.context, ctx2)
2148 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002149
2150 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2151 # A simple IO loop. Call func(*args) depending on the error we get
2152 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2153 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002154 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002155 count = 0
2156 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002157 if time.monotonic() > deadline:
2158 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002159 errno = None
2160 count += 1
2161 try:
2162 ret = func(*args)
2163 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002164 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002165 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002166 raise
2167 errno = e.errno
2168 # Get any data from the outgoing BIO irrespective of any error, and
2169 # send it to the socket.
2170 buf = outgoing.read()
2171 sock.sendall(buf)
2172 # If there's no error, we're done. For WANT_READ, we need to get
2173 # data from the socket and put it in the incoming BIO.
2174 if errno is None:
2175 break
2176 elif errno == ssl.SSL_ERROR_WANT_READ:
2177 buf = sock.recv(32768)
2178 if buf:
2179 incoming.write(buf)
2180 else:
2181 incoming.write_eof()
2182 if support.verbose:
2183 sys.stdout.write("Needed %d calls to complete %s().\n"
2184 % (count, func.__name__))
2185 return ret
2186
Martin Panter3840b2a2016-03-27 01:53:46 +00002187 def test_bio_handshake(self):
2188 sock = socket.socket(socket.AF_INET)
2189 self.addCleanup(sock.close)
2190 sock.connect(self.server_addr)
2191 incoming = ssl.MemoryBIO()
2192 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002193 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2194 self.assertTrue(ctx.check_hostname)
2195 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002196 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002197 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2198 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002199 self.assertIs(sslobj._sslobj.owner, sslobj)
2200 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002201 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002202 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002203 self.assertRaises(ValueError, sslobj.getpeercert)
2204 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2205 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2206 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2207 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002208 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002209 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002210 self.assertTrue(sslobj.getpeercert())
2211 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2212 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2213 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002214 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002215 except ssl.SSLSyscallError:
2216 # If the server shuts down the TCP connection without sending a
2217 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2218 pass
2219 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2220
2221 def test_bio_read_write_data(self):
2222 sock = socket.socket(socket.AF_INET)
2223 self.addCleanup(sock.close)
2224 sock.connect(self.server_addr)
2225 incoming = ssl.MemoryBIO()
2226 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002227 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002228 ctx.verify_mode = ssl.CERT_NONE
2229 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2230 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2231 req = b'FOO\n'
2232 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2233 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2234 self.assertEqual(buf, b'foo\n')
2235 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002236
2237
Martin Panter3840b2a2016-03-27 01:53:46 +00002238class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002239
Martin Panter3840b2a2016-03-27 01:53:46 +00002240 def test_timeout_connect_ex(self):
2241 # Issue #12065: on a timeout, connect_ex() should return the original
2242 # errno (mimicking the behaviour of non-SSL sockets).
2243 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002244 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002245 cert_reqs=ssl.CERT_REQUIRED,
2246 do_handshake_on_connect=False)
2247 self.addCleanup(s.close)
2248 s.settimeout(0.0000001)
2249 rc = s.connect_ex((REMOTE_HOST, 443))
2250 if rc == 0:
2251 self.skipTest("REMOTE_HOST responded too quickly")
2252 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2253
2254 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2255 def test_get_server_certificate_ipv6(self):
2256 with support.transient_internet('ipv6.google.com'):
2257 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2258 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2259
Martin Panter3840b2a2016-03-27 01:53:46 +00002260
2261def _test_get_server_certificate(test, host, port, cert=None):
2262 pem = ssl.get_server_certificate((host, port))
2263 if not pem:
2264 test.fail("No server certificate on %s:%s!" % (host, port))
2265
2266 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2267 if not pem:
2268 test.fail("No server certificate on %s:%s!" % (host, port))
2269 if support.verbose:
2270 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2271
2272def _test_get_server_certificate_fail(test, host, port):
2273 try:
2274 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2275 except ssl.SSLError as x:
2276 #should fail
2277 if support.verbose:
2278 sys.stdout.write("%s\n" % x)
2279 else:
2280 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2281
2282
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002283from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002284
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002285class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002286
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002287 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002288
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002289 """A mildly complicated class, because we want it to work both
2290 with and without the SSL wrapper around the socket connection, so
2291 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002292
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002293 def __init__(self, server, connsock, addr):
2294 self.server = server
2295 self.running = False
2296 self.sock = connsock
2297 self.addr = addr
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03002298 self.sock.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002299 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002300 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002301 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002302
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002303 def wrap_conn(self):
2304 try:
2305 self.sslconn = self.server.context.wrap_socket(
2306 self.sock, server_side=True)
2307 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2308 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002309 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002310 # We treat ConnectionResetError as though it were an
2311 # SSLError - OpenSSL on Ubuntu abruptly closes the
2312 # connection when asked to use an unsupported protocol.
2313 #
Christian Heimes529525f2018-05-23 22:24:45 +02002314 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2315 # tries to send session tickets after handshake.
2316 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002317 #
2318 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2319 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002320 self.server.conn_errors.append(str(e))
2321 if self.server.chatty:
2322 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2323 self.running = False
2324 self.close()
2325 return False
2326 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 # OSError may occur with wrong protocols, e.g. both
2328 # sides use PROTOCOL_TLS_SERVER.
2329 #
2330 # XXX Various errors can have happened here, for example
2331 # a mismatching protocol version, an invalid certificate,
2332 # or a low-level bug. This should be made more discriminating.
2333 #
2334 # bpo-31323: Store the exception as string to prevent
2335 # a reference leak: server -> conn_errors -> exception
2336 # -> traceback -> self (ConnectionHandler) -> server
2337 self.server.conn_errors.append(str(e))
2338 if self.server.chatty:
2339 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2340 self.running = False
2341 self.server.stop()
2342 self.close()
2343 return False
2344 else:
2345 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2346 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2347 cert = self.sslconn.getpeercert()
2348 if support.verbose and self.server.chatty:
2349 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2350 cert_binary = self.sslconn.getpeercert(True)
2351 if support.verbose and self.server.chatty:
2352 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2353 cipher = self.sslconn.cipher()
2354 if support.verbose and self.server.chatty:
2355 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2356 sys.stdout.write(" server: selected protocol is now "
2357 + str(self.sslconn.selected_npn_protocol()) + "\n")
2358 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002359
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002360 def read(self):
2361 if self.sslconn:
2362 return self.sslconn.read()
2363 else:
2364 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002365
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002366 def write(self, bytes):
2367 if self.sslconn:
2368 return self.sslconn.write(bytes)
2369 else:
2370 return self.sock.send(bytes)
2371
2372 def close(self):
2373 if self.sslconn:
2374 self.sslconn.close()
2375 else:
2376 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002377
Antoine Pitrou480a1242010-04-28 21:37:09 +00002378 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379 self.running = True
2380 if not self.server.starttls_server:
2381 if not self.wrap_conn():
2382 return
2383 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002384 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002385 msg = self.read()
2386 stripped = msg.strip()
2387 if not stripped:
2388 # eof, so quit this handler
2389 self.running = False
2390 try:
2391 self.sock = self.sslconn.unwrap()
2392 except OSError:
2393 # Many tests shut the TCP connection down
2394 # without an SSL shutdown. This causes
2395 # unwrap() to raise OSError with errno=0!
2396 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002397 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002398 self.sslconn = None
2399 self.close()
2400 elif stripped == b'over':
2401 if support.verbose and self.server.connectionchatty:
2402 sys.stdout.write(" server: client closed connection\n")
2403 self.close()
2404 return
2405 elif (self.server.starttls_server and
2406 stripped == b'STARTTLS'):
2407 if support.verbose and self.server.connectionchatty:
2408 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2409 self.write(b"OK\n")
2410 if not self.wrap_conn():
2411 return
2412 elif (self.server.starttls_server and self.sslconn
2413 and stripped == b'ENDTLS'):
2414 if support.verbose and self.server.connectionchatty:
2415 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2416 self.write(b"OK\n")
2417 self.sock = self.sslconn.unwrap()
2418 self.sslconn = None
2419 if support.verbose and self.server.connectionchatty:
2420 sys.stdout.write(" server: connection is now unencrypted...\n")
2421 elif stripped == b'CB tls-unique':
2422 if support.verbose and self.server.connectionchatty:
2423 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2424 data = self.sslconn.get_channel_binding("tls-unique")
2425 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002426 elif stripped == b'PHA':
2427 if support.verbose and self.server.connectionchatty:
2428 sys.stdout.write(" server: initiating post handshake auth\n")
2429 try:
2430 self.sslconn.verify_client_post_handshake()
2431 except ssl.SSLError as e:
2432 self.write(repr(e).encode("us-ascii") + b"\n")
2433 else:
2434 self.write(b"OK\n")
2435 elif stripped == b'HASCERT':
2436 if self.sslconn.getpeercert() is not None:
2437 self.write(b'TRUE\n')
2438 else:
2439 self.write(b'FALSE\n')
2440 elif stripped == b'GETCERT':
2441 cert = self.sslconn.getpeercert()
2442 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002443 else:
2444 if (support.verbose and
2445 self.server.connectionchatty):
2446 ctype = (self.sslconn and "encrypted") or "unencrypted"
2447 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2448 % (msg, ctype, msg.lower(), ctype))
2449 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002450 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002451 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2452 # when connection is not shut down gracefully.
2453 if self.server.chatty and support.verbose:
2454 sys.stdout.write(
2455 " Connection reset by peer: {}\n".format(
2456 self.addr)
2457 )
2458 self.close()
2459 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002460 except ssl.SSLError as err:
2461 # On Windows sometimes test_pha_required_nocert receives the
2462 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2463 # before the 'tlsv13 alert certificate required' exception.
2464 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2465 # is received test_pha_required_nocert fails with ConnectionResetError
2466 # because the underlying socket is closed
2467 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2468 if self.server.chatty and support.verbose:
2469 sys.stdout.write(err.args[1])
2470 # test_pha_required_nocert is expecting this exception
2471 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 except OSError:
2473 if self.server.chatty:
2474 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002475 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002476 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002477
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 # normally, we'd just stop here, but for the test
2479 # harness, we want to stop the server
2480 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002481
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 def __init__(self, certificate=None, ssl_version=None,
2483 certreqs=None, cacerts=None,
2484 chatty=True, connectionchatty=False, starttls_server=False,
2485 npn_protocols=None, alpn_protocols=None,
2486 ciphers=None, context=None):
2487 if context:
2488 self.context = context
2489 else:
2490 self.context = ssl.SSLContext(ssl_version
2491 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002492 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 self.context.verify_mode = (certreqs if certreqs is not None
2494 else ssl.CERT_NONE)
2495 if cacerts:
2496 self.context.load_verify_locations(cacerts)
2497 if certificate:
2498 self.context.load_cert_chain(certificate)
2499 if npn_protocols:
2500 self.context.set_npn_protocols(npn_protocols)
2501 if alpn_protocols:
2502 self.context.set_alpn_protocols(alpn_protocols)
2503 if ciphers:
2504 self.context.set_ciphers(ciphers)
2505 self.chatty = chatty
2506 self.connectionchatty = connectionchatty
2507 self.starttls_server = starttls_server
2508 self.sock = socket.socket()
2509 self.port = support.bind_port(self.sock)
2510 self.flag = None
2511 self.active = False
2512 self.selected_npn_protocols = []
2513 self.selected_alpn_protocols = []
2514 self.shared_ciphers = []
2515 self.conn_errors = []
2516 threading.Thread.__init__(self)
2517 self.daemon = True
2518
2519 def __enter__(self):
2520 self.start(threading.Event())
2521 self.flag.wait()
2522 return self
2523
2524 def __exit__(self, *args):
2525 self.stop()
2526 self.join()
2527
2528 def start(self, flag=None):
2529 self.flag = flag
2530 threading.Thread.start(self)
2531
2532 def run(self):
2533 self.sock.settimeout(0.05)
2534 self.sock.listen()
2535 self.active = True
2536 if self.flag:
2537 # signal an event
2538 self.flag.set()
2539 while self.active:
2540 try:
2541 newconn, connaddr = self.sock.accept()
2542 if support.verbose and self.chatty:
2543 sys.stdout.write(' server: new connection from '
2544 + repr(connaddr) + '\n')
2545 handler = self.ConnectionHandler(self, newconn, connaddr)
2546 handler.start()
2547 handler.join()
2548 except socket.timeout:
2549 pass
2550 except KeyboardInterrupt:
2551 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002552 except BaseException as e:
2553 if support.verbose and self.chatty:
2554 sys.stdout.write(
2555 ' connection handling failed: ' + repr(e) + '\n')
2556
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 self.sock.close()
2558
2559 def stop(self):
2560 self.active = False
2561
2562class AsyncoreEchoServer(threading.Thread):
2563
2564 # this one's based on asyncore.dispatcher
2565
2566 class EchoServer (asyncore.dispatcher):
2567
2568 class ConnectionHandler(asyncore.dispatcher_with_send):
2569
2570 def __init__(self, conn, certfile):
2571 self.socket = test_wrap_socket(conn, server_side=True,
2572 certfile=certfile,
2573 do_handshake_on_connect=False)
2574 asyncore.dispatcher_with_send.__init__(self, self.socket)
2575 self._ssl_accepting = True
2576 self._do_ssl_handshake()
2577
2578 def readable(self):
2579 if isinstance(self.socket, ssl.SSLSocket):
2580 while self.socket.pending() > 0:
2581 self.handle_read_event()
2582 return True
2583
2584 def _do_ssl_handshake(self):
2585 try:
2586 self.socket.do_handshake()
2587 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2588 return
2589 except ssl.SSLEOFError:
2590 return self.handle_close()
2591 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002592 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002593 except OSError as err:
2594 if err.args[0] == errno.ECONNABORTED:
2595 return self.handle_close()
2596 else:
2597 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002598
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002599 def handle_read(self):
2600 if self._ssl_accepting:
2601 self._do_ssl_handshake()
2602 else:
2603 data = self.recv(1024)
2604 if support.verbose:
2605 sys.stdout.write(" server: read %s from client\n" % repr(data))
2606 if not data:
2607 self.close()
2608 else:
2609 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002610
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611 def handle_close(self):
2612 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002613 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002614 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002615
2616 def handle_error(self):
2617 raise
2618
Trent Nelson78520002008-04-10 20:54:35 +00002619 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 self.certfile = certfile
2621 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2622 self.port = support.bind_port(sock, '')
2623 asyncore.dispatcher.__init__(self, sock)
2624 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002625
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002626 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002627 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2629 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002630
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002631 def handle_error(self):
2632 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002633
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002634 def __init__(self, certfile):
2635 self.flag = None
2636 self.active = False
2637 self.server = self.EchoServer(certfile)
2638 self.port = self.server.port
2639 threading.Thread.__init__(self)
2640 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002641
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002642 def __str__(self):
2643 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002644
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002645 def __enter__(self):
2646 self.start(threading.Event())
2647 self.flag.wait()
2648 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002649
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002651 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002652 sys.stdout.write(" cleanup: stopping server.\n")
2653 self.stop()
2654 if support.verbose:
2655 sys.stdout.write(" cleanup: joining server thread.\n")
2656 self.join()
2657 if support.verbose:
2658 sys.stdout.write(" cleanup: successfully joined.\n")
2659 # make sure that ConnectionHandler is removed from socket_map
2660 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002661
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002662 def start (self, flag=None):
2663 self.flag = flag
2664 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002665
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002666 def run(self):
2667 self.active = True
2668 if self.flag:
2669 self.flag.set()
2670 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002671 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 asyncore.loop(1)
2673 except:
2674 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002675
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002676 def stop(self):
2677 self.active = False
2678 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002679
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680def server_params_test(client_context, server_context, indata=b"FOO\n",
2681 chatty=True, connectionchatty=False, sni_name=None,
2682 session=None):
2683 """
2684 Launch a server, connect a client to it and try various reads
2685 and writes.
2686 """
2687 stats = {}
2688 server = ThreadedEchoServer(context=server_context,
2689 chatty=chatty,
2690 connectionchatty=False)
2691 with server:
2692 with client_context.wrap_socket(socket.socket(),
2693 server_hostname=sni_name, session=session) as s:
2694 s.connect((HOST, server.port))
2695 for arg in [indata, bytearray(indata), memoryview(indata)]:
2696 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002697 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002698 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002699 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002700 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002701 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002702 if connectionchatty:
2703 if support.verbose:
2704 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002705 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002707 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2708 % (outdata[:20], len(outdata),
2709 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002710 s.write(b"over\n")
2711 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002712 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002713 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002714 stats.update({
2715 'compression': s.compression(),
2716 'cipher': s.cipher(),
2717 'peercert': s.getpeercert(),
2718 'client_alpn_protocol': s.selected_alpn_protocol(),
2719 'client_npn_protocol': s.selected_npn_protocol(),
2720 'version': s.version(),
2721 'session_reused': s.session_reused,
2722 'session': s.session,
2723 })
2724 s.close()
2725 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2726 stats['server_npn_protocols'] = server.selected_npn_protocols
2727 stats['server_shared_ciphers'] = server.shared_ciphers
2728 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002729
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002730def try_protocol_combo(server_protocol, client_protocol, expect_success,
2731 certsreqs=None, server_options=0, client_options=0):
2732 """
2733 Try to SSL-connect using *client_protocol* to *server_protocol*.
2734 If *expect_success* is true, assert that the connection succeeds,
2735 if it's false, assert that the connection fails.
2736 Also, if *expect_success* is a string, assert that it is the protocol
2737 version actually used by the connection.
2738 """
2739 if certsreqs is None:
2740 certsreqs = ssl.CERT_NONE
2741 certtype = {
2742 ssl.CERT_NONE: "CERT_NONE",
2743 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2744 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2745 }[certsreqs]
2746 if support.verbose:
2747 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2748 sys.stdout.write(formatstr %
2749 (ssl.get_protocol_name(client_protocol),
2750 ssl.get_protocol_name(server_protocol),
2751 certtype))
2752 client_context = ssl.SSLContext(client_protocol)
2753 client_context.options |= client_options
2754 server_context = ssl.SSLContext(server_protocol)
2755 server_context.options |= server_options
2756
Victor Stinner3ef63442019-02-19 18:06:03 +01002757 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2758 if (min_version is not None
2759 # SSLContext.minimum_version is only available on recent OpenSSL
2760 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2761 and hasattr(server_context, 'minimum_version')
2762 and server_protocol == ssl.PROTOCOL_TLS
2763 and server_context.minimum_version > min_version):
2764 # If OpenSSL configuration is strict and requires more recent TLS
2765 # version, we have to change the minimum to test old TLS versions.
2766 server_context.minimum_version = min_version
2767
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002768 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2769 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2770 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002771 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002772 client_context.set_ciphers("ALL")
2773
2774 for ctx in (client_context, server_context):
2775 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002776 ctx.load_cert_chain(SIGNED_CERTFILE)
2777 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 try:
2779 stats = server_params_test(client_context, server_context,
2780 chatty=False, connectionchatty=False)
2781 # Protocol mismatch can result in either an SSLError, or a
2782 # "Connection reset by peer" error.
2783 except ssl.SSLError:
2784 if expect_success:
2785 raise
2786 except OSError as e:
2787 if expect_success or e.errno != errno.ECONNRESET:
2788 raise
2789 else:
2790 if not expect_success:
2791 raise AssertionError(
2792 "Client protocol %s succeeded with server protocol %s!"
2793 % (ssl.get_protocol_name(client_protocol),
2794 ssl.get_protocol_name(server_protocol)))
2795 elif (expect_success is not True
2796 and expect_success != stats['version']):
2797 raise AssertionError("version mismatch: expected %r, got %r"
2798 % (expect_success, stats['version']))
2799
2800
2801class ThreadedTests(unittest.TestCase):
2802
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002803 def test_echo(self):
2804 """Basic test of an SSL client connecting to a server"""
2805 if support.verbose:
2806 sys.stdout.write("\n")
2807 for protocol in PROTOCOLS:
2808 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2809 continue
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02002810 if not has_tls_protocol(protocol):
2811 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002812 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2813 context = ssl.SSLContext(protocol)
2814 context.load_cert_chain(CERTFILE)
2815 server_params_test(context, context,
2816 chatty=True, connectionchatty=True)
2817
Christian Heimesa170fa12017-09-15 20:27:30 +02002818 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002819
2820 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2821 server_params_test(client_context=client_context,
2822 server_context=server_context,
2823 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002824 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002825
2826 client_context.check_hostname = False
2827 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2828 with self.assertRaises(ssl.SSLError) as e:
2829 server_params_test(client_context=server_context,
2830 server_context=client_context,
2831 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002832 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002833 self.assertIn('called a function you should not call',
2834 str(e.exception))
2835
2836 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2837 with self.assertRaises(ssl.SSLError) as e:
2838 server_params_test(client_context=server_context,
2839 server_context=server_context,
2840 chatty=True, connectionchatty=True)
2841 self.assertIn('called a function you should not call',
2842 str(e.exception))
2843
2844 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2845 with self.assertRaises(ssl.SSLError) as e:
2846 server_params_test(client_context=server_context,
2847 server_context=client_context,
2848 chatty=True, connectionchatty=True)
2849 self.assertIn('called a function you should not call',
2850 str(e.exception))
2851
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002852 def test_getpeercert(self):
2853 if support.verbose:
2854 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002855
2856 client_context, server_context, hostname = testing_context()
2857 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002858 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002859 with client_context.wrap_socket(socket.socket(),
2860 do_handshake_on_connect=False,
2861 server_hostname=hostname) as s:
2862 s.connect((HOST, server.port))
2863 # getpeercert() raise ValueError while the handshake isn't
2864 # done.
2865 with self.assertRaises(ValueError):
2866 s.getpeercert()
2867 s.do_handshake()
2868 cert = s.getpeercert()
2869 self.assertTrue(cert, "Can't get peer certificate.")
2870 cipher = s.cipher()
2871 if support.verbose:
2872 sys.stdout.write(pprint.pformat(cert) + '\n')
2873 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2874 if 'subject' not in cert:
2875 self.fail("No subject field in certificate: %s." %
2876 pprint.pformat(cert))
2877 if ((('organizationName', 'Python Software Foundation'),)
2878 not in cert['subject']):
2879 self.fail(
2880 "Missing or invalid 'organizationName' field in certificate subject; "
2881 "should be 'Python Software Foundation'.")
2882 self.assertIn('notBefore', cert)
2883 self.assertIn('notAfter', cert)
2884 before = ssl.cert_time_to_seconds(cert['notBefore'])
2885 after = ssl.cert_time_to_seconds(cert['notAfter'])
2886 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002887
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002888 @unittest.skipUnless(have_verify_flags(),
2889 "verify_flags need OpenSSL > 0.9.8")
2890 def test_crl_check(self):
2891 if support.verbose:
2892 sys.stdout.write("\n")
2893
Christian Heimesa170fa12017-09-15 20:27:30 +02002894 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002895
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002896 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002897 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002898
2899 # VERIFY_DEFAULT should pass
2900 server = ThreadedEchoServer(context=server_context, chatty=True)
2901 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002902 with client_context.wrap_socket(socket.socket(),
2903 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002904 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002905 cert = s.getpeercert()
2906 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002907
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002908 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002909 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002911 server = ThreadedEchoServer(context=server_context, chatty=True)
2912 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002913 with client_context.wrap_socket(socket.socket(),
2914 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002915 with self.assertRaisesRegex(ssl.SSLError,
2916 "certificate verify failed"):
2917 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002918
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002919 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002920 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002921
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002922 server = ThreadedEchoServer(context=server_context, chatty=True)
2923 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002924 with client_context.wrap_socket(socket.socket(),
2925 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002926 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 cert = s.getpeercert()
2928 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002929
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002930 def test_check_hostname(self):
2931 if support.verbose:
2932 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002933
Christian Heimesa170fa12017-09-15 20:27:30 +02002934 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002935
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002936 # correct hostname should verify
2937 server = ThreadedEchoServer(context=server_context, chatty=True)
2938 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002939 with client_context.wrap_socket(socket.socket(),
2940 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002941 s.connect((HOST, server.port))
2942 cert = s.getpeercert()
2943 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002944
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 # incorrect hostname should raise an exception
2946 server = ThreadedEchoServer(context=server_context, chatty=True)
2947 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002948 with client_context.wrap_socket(socket.socket(),
2949 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002950 with self.assertRaisesRegex(
2951 ssl.CertificateError,
2952 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002953 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002954
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002955 # missing server_hostname arg should cause an exception, too
2956 server = ThreadedEchoServer(context=server_context, chatty=True)
2957 with server:
2958 with socket.socket() as s:
2959 with self.assertRaisesRegex(ValueError,
2960 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002961 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002962
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002963 def test_ecc_cert(self):
2964 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2965 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002966 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002967 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2968
2969 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2970 # load ECC cert
2971 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2972
2973 # correct hostname should verify
2974 server = ThreadedEchoServer(context=server_context, chatty=True)
2975 with server:
2976 with client_context.wrap_socket(socket.socket(),
2977 server_hostname=hostname) as s:
2978 s.connect((HOST, server.port))
2979 cert = s.getpeercert()
2980 self.assertTrue(cert, "Can't get peer certificate.")
2981 cipher = s.cipher()[0].split('-')
2982 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2983
2984 def test_dual_rsa_ecc(self):
2985 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2986 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002987 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2988 # algorithms.
2989 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002990 # only ECDSA certs
2991 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2992 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2993
2994 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2995 # load ECC and RSA key/cert pairs
2996 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2997 server_context.load_cert_chain(SIGNED_CERTFILE)
2998
2999 # correct hostname should verify
3000 server = ThreadedEchoServer(context=server_context, chatty=True)
3001 with server:
3002 with client_context.wrap_socket(socket.socket(),
3003 server_hostname=hostname) as s:
3004 s.connect((HOST, server.port))
3005 cert = s.getpeercert()
3006 self.assertTrue(cert, "Can't get peer certificate.")
3007 cipher = s.cipher()[0].split('-')
3008 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3009
Christian Heimes66e57422018-01-29 14:25:13 +01003010 def test_check_hostname_idn(self):
3011 if support.verbose:
3012 sys.stdout.write("\n")
3013
Christian Heimes11a14932018-02-24 02:35:08 +01003014 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003015 server_context.load_cert_chain(IDNSANSFILE)
3016
Christian Heimes11a14932018-02-24 02:35:08 +01003017 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003018 context.verify_mode = ssl.CERT_REQUIRED
3019 context.check_hostname = True
3020 context.load_verify_locations(SIGNING_CA)
3021
3022 # correct hostname should verify, when specified in several
3023 # different ways
3024 idn_hostnames = [
3025 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003026 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003027 ('xn--knig-5qa.idn.pythontest.net',
3028 'xn--knig-5qa.idn.pythontest.net'),
3029 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003030 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003031
3032 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003033 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003034 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3035 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3036 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003037 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3038
3039 # ('königsgäßchen.idna2008.pythontest.net',
3040 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3041 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3042 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3043 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3044 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3045
Christian Heimes66e57422018-01-29 14:25:13 +01003046 ]
3047 for server_hostname, expected_hostname in idn_hostnames:
3048 server = ThreadedEchoServer(context=server_context, chatty=True)
3049 with server:
3050 with context.wrap_socket(socket.socket(),
3051 server_hostname=server_hostname) as s:
3052 self.assertEqual(s.server_hostname, expected_hostname)
3053 s.connect((HOST, server.port))
3054 cert = s.getpeercert()
3055 self.assertEqual(s.server_hostname, expected_hostname)
3056 self.assertTrue(cert, "Can't get peer certificate.")
3057
Christian Heimes66e57422018-01-29 14:25:13 +01003058 # incorrect hostname should raise an exception
3059 server = ThreadedEchoServer(context=server_context, chatty=True)
3060 with server:
3061 with context.wrap_socket(socket.socket(),
3062 server_hostname="python.example.org") as s:
3063 with self.assertRaises(ssl.CertificateError):
3064 s.connect((HOST, server.port))
3065
Christian Heimes529525f2018-05-23 22:24:45 +02003066 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003067 """Connecting when the server rejects the client's certificate
3068
3069 Launch a server with CERT_REQUIRED, and check that trying to
3070 connect to it with a wrong client certificate fails.
3071 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003072 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003073 # load client cert that is not signed by trusted CA
3074 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003075 # require TLS client authentication
3076 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003077 # TLS 1.3 has different handshake
3078 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003079
3080 server = ThreadedEchoServer(
3081 context=server_context, chatty=True, connectionchatty=True,
3082 )
3083
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003085 client_context.wrap_socket(socket.socket(),
3086 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003087 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003088 # Expect either an SSL error about the server rejecting
3089 # the connection, or a low-level connection reset (which
3090 # sometimes happens on Windows)
3091 s.connect((HOST, server.port))
3092 except ssl.SSLError as e:
3093 if support.verbose:
3094 sys.stdout.write("\nSSLError is %r\n" % e)
3095 except OSError as e:
3096 if e.errno != errno.ECONNRESET:
3097 raise
3098 if support.verbose:
3099 sys.stdout.write("\nsocket.error is %r\n" % e)
3100 else:
3101 self.fail("Use of invalid cert should have failed!")
3102
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003103 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003104 def test_wrong_cert_tls13(self):
3105 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003106 # load client cert that is not signed by trusted CA
3107 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003108 server_context.verify_mode = ssl.CERT_REQUIRED
3109 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3110 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3111
3112 server = ThreadedEchoServer(
3113 context=server_context, chatty=True, connectionchatty=True,
3114 )
3115 with server, \
3116 client_context.wrap_socket(socket.socket(),
3117 server_hostname=hostname) as s:
3118 # TLS 1.3 perform client cert exchange after handshake
3119 s.connect((HOST, server.port))
3120 try:
3121 s.write(b'data')
3122 s.read(4)
3123 except ssl.SSLError as e:
3124 if support.verbose:
3125 sys.stdout.write("\nSSLError is %r\n" % e)
3126 except OSError as e:
3127 if e.errno != errno.ECONNRESET:
3128 raise
3129 if support.verbose:
3130 sys.stdout.write("\nsocket.error is %r\n" % e)
3131 else:
3132 self.fail("Use of invalid cert should have failed!")
3133
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003134 def test_rude_shutdown(self):
3135 """A brutal shutdown of an SSL server should raise an OSError
3136 in the client when attempting handshake.
3137 """
3138 listener_ready = threading.Event()
3139 listener_gone = threading.Event()
3140
3141 s = socket.socket()
3142 port = support.bind_port(s, HOST)
3143
3144 # `listener` runs in a thread. It sits in an accept() until
3145 # the main thread connects. Then it rudely closes the socket,
3146 # and sets Event `listener_gone` to let the main thread know
3147 # the socket is gone.
3148 def listener():
3149 s.listen()
3150 listener_ready.set()
3151 newsock, addr = s.accept()
3152 newsock.close()
3153 s.close()
3154 listener_gone.set()
3155
3156 def connector():
3157 listener_ready.wait()
3158 with socket.socket() as c:
3159 c.connect((HOST, port))
3160 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003161 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003162 ssl_sock = test_wrap_socket(c)
3163 except OSError:
3164 pass
3165 else:
3166 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003167
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003168 t = threading.Thread(target=listener)
3169 t.start()
3170 try:
3171 connector()
3172 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003173 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003174
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003175 def test_ssl_cert_verify_error(self):
3176 if support.verbose:
3177 sys.stdout.write("\n")
3178
3179 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3180 server_context.load_cert_chain(SIGNED_CERTFILE)
3181
3182 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3183
3184 server = ThreadedEchoServer(context=server_context, chatty=True)
3185 with server:
3186 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003187 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003188 try:
3189 s.connect((HOST, server.port))
3190 except ssl.SSLError as e:
3191 msg = 'unable to get local issuer certificate'
3192 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3193 self.assertEqual(e.verify_code, 20)
3194 self.assertEqual(e.verify_message, msg)
3195 self.assertIn(msg, repr(e))
3196 self.assertIn('certificate verify failed', repr(e))
3197
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003198 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 def test_protocol_sslv2(self):
3200 """Connecting to an SSLv2 server with various client options"""
3201 if support.verbose:
3202 sys.stdout.write("\n")
3203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3205 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003206 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003207 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003208 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3209 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3210 # SSLv23 client with specific SSL options
3211 if no_sslv2_implies_sslv3_hello():
3212 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003214 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003215 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003217 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003218 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003219
Christian Heimesa170fa12017-09-15 20:27:30 +02003220 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 """Connecting to an SSLv23 server with various client options"""
3222 if support.verbose:
3223 sys.stdout.write("\n")
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003224 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003225 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003226 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003227 except OSError as x:
3228 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3229 if support.verbose:
3230 sys.stdout.write(
3231 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3232 % str(x))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003233 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003234 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3235 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003236 if has_tls_version('TLSv1'):
3237 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003238
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003239 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003240 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3241 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003242 if has_tls_version('TLSv1'):
3243 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003244
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003245 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003246 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3247 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003248 if has_tls_version('TLSv1'):
3249 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003250
3251 # Server with specific SSL options
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003252 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003253 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003254 server_options=ssl.OP_NO_SSLv3)
3255 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003256 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003257 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003258 if has_tls_version('TLSv1'):
3259 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3260 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003261
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003262 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003263 def test_protocol_sslv3(self):
3264 """Connecting to an SSLv3 server with various client options"""
3265 if support.verbose:
3266 sys.stdout.write("\n")
3267 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3268 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003270 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003271 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003273 client_options=ssl.OP_NO_SSLv3)
3274 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3275 if no_sslv2_implies_sslv3_hello():
3276 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003277 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003278 False, client_options=ssl.OP_NO_SSLv2)
3279
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003280 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003281 def test_protocol_tlsv1(self):
3282 """Connecting to a TLSv1 server with various client options"""
3283 if support.verbose:
3284 sys.stdout.write("\n")
3285 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3286 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003288 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003290 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003291 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003292 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003293 client_options=ssl.OP_NO_TLSv1)
3294
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003295 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003296 def test_protocol_tlsv1_1(self):
3297 """Connecting to a TLSv1.1 server with various client options.
3298 Testing against older TLS versions."""
3299 if support.verbose:
3300 sys.stdout.write("\n")
3301 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003302 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003303 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003304 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003305 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003306 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 client_options=ssl.OP_NO_TLSv1_1)
3308
Christian Heimesa170fa12017-09-15 20:27:30 +02003309 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003310 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3311 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003313 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003314 def test_protocol_tlsv1_2(self):
3315 """Connecting to a TLSv1.2 server with various client options.
3316 Testing against older TLS versions."""
3317 if support.verbose:
3318 sys.stdout.write("\n")
3319 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3320 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3321 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003322 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003323 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003324 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003327 client_options=ssl.OP_NO_TLSv1_2)
3328
Christian Heimesa170fa12017-09-15 20:27:30 +02003329 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3331 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3332 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3333 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3334
3335 def test_starttls(self):
3336 """Switching from clear text to encrypted and back again."""
3337 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3338
3339 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003340 starttls_server=True,
3341 chatty=True,
3342 connectionchatty=True)
3343 wrapped = False
3344 with server:
3345 s = socket.socket()
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03003346 s.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003347 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003348 if support.verbose:
3349 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003350 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003351 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003352 sys.stdout.write(
3353 " client: sending %r...\n" % indata)
3354 if wrapped:
3355 conn.write(indata)
3356 outdata = conn.read()
3357 else:
3358 s.send(indata)
3359 outdata = s.recv(1024)
3360 msg = outdata.strip().lower()
3361 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3362 # STARTTLS ok, switch to secure mode
3363 if support.verbose:
3364 sys.stdout.write(
3365 " client: read %r from server, starting TLS...\n"
3366 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003367 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003368 wrapped = True
3369 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3370 # ENDTLS ok, switch back to clear text
3371 if support.verbose:
3372 sys.stdout.write(
3373 " client: read %r from server, ending TLS...\n"
3374 % msg)
3375 s = conn.unwrap()
3376 wrapped = False
3377 else:
3378 if support.verbose:
3379 sys.stdout.write(
3380 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003381 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003382 sys.stdout.write(" client: closing connection.\n")
3383 if wrapped:
3384 conn.write(b"over\n")
3385 else:
3386 s.send(b"over\n")
3387 if wrapped:
3388 conn.close()
3389 else:
3390 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003391
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003392 def test_socketserver(self):
3393 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003394 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003395 # try to connect
3396 if support.verbose:
3397 sys.stdout.write('\n')
3398 with open(CERTFILE, 'rb') as f:
3399 d1 = f.read()
3400 d2 = ''
3401 # now fetch the same data from the HTTPS server
3402 url = 'https://localhost:%d/%s' % (
3403 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003404 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003405 f = urllib.request.urlopen(url, context=context)
3406 try:
3407 dlen = f.info().get("content-length")
3408 if dlen and (int(dlen) > 0):
3409 d2 = f.read(int(dlen))
3410 if support.verbose:
3411 sys.stdout.write(
3412 " client: read %d bytes from remote server '%s'\n"
3413 % (len(d2), server))
3414 finally:
3415 f.close()
3416 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003417
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 def test_asyncore_server(self):
3419 """Check the example asyncore integration."""
3420 if support.verbose:
3421 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003422
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003423 indata = b"FOO\n"
3424 server = AsyncoreEchoServer(CERTFILE)
3425 with server:
3426 s = test_wrap_socket(socket.socket())
3427 s.connect(('127.0.0.1', server.port))
3428 if support.verbose:
3429 sys.stdout.write(
3430 " client: sending %r...\n" % indata)
3431 s.write(indata)
3432 outdata = s.read()
3433 if support.verbose:
3434 sys.stdout.write(" client: read %r\n" % outdata)
3435 if outdata != indata.lower():
3436 self.fail(
3437 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3438 % (outdata[:20], len(outdata),
3439 indata[:20].lower(), len(indata)))
3440 s.write(b"over\n")
3441 if support.verbose:
3442 sys.stdout.write(" client: closing connection.\n")
3443 s.close()
3444 if support.verbose:
3445 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003446
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003447 def test_recv_send(self):
3448 """Test recv(), send() and friends."""
3449 if support.verbose:
3450 sys.stdout.write("\n")
3451
3452 server = ThreadedEchoServer(CERTFILE,
3453 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003454 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003455 cacerts=CERTFILE,
3456 chatty=True,
3457 connectionchatty=False)
3458 with server:
3459 s = test_wrap_socket(socket.socket(),
3460 server_side=False,
3461 certfile=CERTFILE,
3462 ca_certs=CERTFILE,
3463 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003464 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003465 s.connect((HOST, server.port))
3466 # helper methods for standardising recv* method signatures
3467 def _recv_into():
3468 b = bytearray(b"\0"*100)
3469 count = s.recv_into(b)
3470 return b[:count]
3471
3472 def _recvfrom_into():
3473 b = bytearray(b"\0"*100)
3474 count, addr = s.recvfrom_into(b)
3475 return b[:count]
3476
3477 # (name, method, expect success?, *args, return value func)
3478 send_methods = [
3479 ('send', s.send, True, [], len),
3480 ('sendto', s.sendto, False, ["some.address"], len),
3481 ('sendall', s.sendall, True, [], lambda x: None),
3482 ]
3483 # (name, method, whether to expect success, *args)
3484 recv_methods = [
3485 ('recv', s.recv, True, []),
3486 ('recvfrom', s.recvfrom, False, ["some.address"]),
3487 ('recv_into', _recv_into, True, []),
3488 ('recvfrom_into', _recvfrom_into, False, []),
3489 ]
3490 data_prefix = "PREFIX_"
3491
3492 for (meth_name, send_meth, expect_success, args,
3493 ret_val_meth) in send_methods:
3494 indata = (data_prefix + meth_name).encode('ascii')
3495 try:
3496 ret = send_meth(indata, *args)
3497 msg = "sending with {}".format(meth_name)
3498 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3499 outdata = s.read()
3500 if outdata != indata.lower():
3501 self.fail(
3502 "While sending with <<{name:s}>> bad data "
3503 "<<{outdata:r}>> ({nout:d}) received; "
3504 "expected <<{indata:r}>> ({nin:d})\n".format(
3505 name=meth_name, outdata=outdata[:20],
3506 nout=len(outdata),
3507 indata=indata[:20], nin=len(indata)
3508 )
3509 )
3510 except ValueError as e:
3511 if expect_success:
3512 self.fail(
3513 "Failed to send with method <<{name:s}>>; "
3514 "expected to succeed.\n".format(name=meth_name)
3515 )
3516 if not str(e).startswith(meth_name):
3517 self.fail(
3518 "Method <<{name:s}>> failed with unexpected "
3519 "exception message: {exp:s}\n".format(
3520 name=meth_name, exp=e
3521 )
3522 )
3523
3524 for meth_name, recv_meth, expect_success, args in recv_methods:
3525 indata = (data_prefix + meth_name).encode('ascii')
3526 try:
3527 s.send(indata)
3528 outdata = recv_meth(*args)
3529 if outdata != indata.lower():
3530 self.fail(
3531 "While receiving with <<{name:s}>> bad data "
3532 "<<{outdata:r}>> ({nout:d}) received; "
3533 "expected <<{indata:r}>> ({nin:d})\n".format(
3534 name=meth_name, outdata=outdata[:20],
3535 nout=len(outdata),
3536 indata=indata[:20], nin=len(indata)
3537 )
3538 )
3539 except ValueError as e:
3540 if expect_success:
3541 self.fail(
3542 "Failed to receive with method <<{name:s}>>; "
3543 "expected to succeed.\n".format(name=meth_name)
3544 )
3545 if not str(e).startswith(meth_name):
3546 self.fail(
3547 "Method <<{name:s}>> failed with unexpected "
3548 "exception message: {exp:s}\n".format(
3549 name=meth_name, exp=e
3550 )
3551 )
3552 # consume data
3553 s.read()
3554
3555 # read(-1, buffer) is supported, even though read(-1) is not
3556 data = b"data"
3557 s.send(data)
3558 buffer = bytearray(len(data))
3559 self.assertEqual(s.read(-1, buffer), len(data))
3560 self.assertEqual(buffer, data)
3561
Christian Heimes888bbdc2017-09-07 14:18:21 -07003562 # sendall accepts bytes-like objects
3563 if ctypes is not None:
3564 ubyte = ctypes.c_ubyte * len(data)
3565 byteslike = ubyte.from_buffer_copy(data)
3566 s.sendall(byteslike)
3567 self.assertEqual(s.read(), data)
3568
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003569 # Make sure sendmsg et al are disallowed to avoid
3570 # inadvertent disclosure of data and/or corruption
3571 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003572 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003573 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3574 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3575 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003576 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003577 s.write(b"over\n")
3578
3579 self.assertRaises(ValueError, s.recv, -1)
3580 self.assertRaises(ValueError, s.read, -1)
3581
3582 s.close()
3583
3584 def test_recv_zero(self):
3585 server = ThreadedEchoServer(CERTFILE)
3586 server.__enter__()
3587 self.addCleanup(server.__exit__, None, None)
3588 s = socket.create_connection((HOST, server.port))
3589 self.addCleanup(s.close)
3590 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3591 self.addCleanup(s.close)
3592
3593 # recv/read(0) should return no data
3594 s.send(b"data")
3595 self.assertEqual(s.recv(0), b"")
3596 self.assertEqual(s.read(0), b"")
3597 self.assertEqual(s.read(), b"data")
3598
3599 # Should not block if the other end sends no data
3600 s.setblocking(False)
3601 self.assertEqual(s.recv(0), b"")
3602 self.assertEqual(s.recv_into(bytearray()), 0)
3603
3604 def test_nonblocking_send(self):
3605 server = ThreadedEchoServer(CERTFILE,
3606 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003607 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 cacerts=CERTFILE,
3609 chatty=True,
3610 connectionchatty=False)
3611 with server:
3612 s = test_wrap_socket(socket.socket(),
3613 server_side=False,
3614 certfile=CERTFILE,
3615 ca_certs=CERTFILE,
3616 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003617 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003618 s.connect((HOST, server.port))
3619 s.setblocking(False)
3620
3621 # If we keep sending data, at some point the buffers
3622 # will be full and the call will block
3623 buf = bytearray(8192)
3624 def fill_buffer():
3625 while True:
3626 s.send(buf)
3627 self.assertRaises((ssl.SSLWantWriteError,
3628 ssl.SSLWantReadError), fill_buffer)
3629
3630 # Now read all the output and discard it
3631 s.setblocking(True)
3632 s.close()
3633
3634 def test_handshake_timeout(self):
3635 # Issue #5103: SSL handshake must respect the socket timeout
3636 server = socket.socket(socket.AF_INET)
3637 host = "127.0.0.1"
3638 port = support.bind_port(server)
3639 started = threading.Event()
3640 finish = False
3641
3642 def serve():
3643 server.listen()
3644 started.set()
3645 conns = []
3646 while not finish:
3647 r, w, e = select.select([server], [], [], 0.1)
3648 if server in r:
3649 # Let the socket hang around rather than having
3650 # it closed by garbage collection.
3651 conns.append(server.accept()[0])
3652 for sock in conns:
3653 sock.close()
3654
3655 t = threading.Thread(target=serve)
3656 t.start()
3657 started.wait()
3658
3659 try:
3660 try:
3661 c = socket.socket(socket.AF_INET)
3662 c.settimeout(0.2)
3663 c.connect((host, port))
3664 # Will attempt handshake and time out
3665 self.assertRaisesRegex(socket.timeout, "timed out",
3666 test_wrap_socket, c)
3667 finally:
3668 c.close()
3669 try:
3670 c = socket.socket(socket.AF_INET)
3671 c = test_wrap_socket(c)
3672 c.settimeout(0.2)
3673 # Will attempt handshake and time out
3674 self.assertRaisesRegex(socket.timeout, "timed out",
3675 c.connect, (host, port))
3676 finally:
3677 c.close()
3678 finally:
3679 finish = True
3680 t.join()
3681 server.close()
3682
3683 def test_server_accept(self):
3684 # Issue #16357: accept() on a SSLSocket created through
3685 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003686 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003687 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003688 context.load_verify_locations(SIGNING_CA)
3689 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003690 server = socket.socket(socket.AF_INET)
3691 host = "127.0.0.1"
3692 port = support.bind_port(server)
3693 server = context.wrap_socket(server, server_side=True)
3694 self.assertTrue(server.server_side)
3695
3696 evt = threading.Event()
3697 remote = None
3698 peer = None
3699 def serve():
3700 nonlocal remote, peer
3701 server.listen()
3702 # Block on the accept and wait on the connection to close.
3703 evt.set()
3704 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003705 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003706
3707 t = threading.Thread(target=serve)
3708 t.start()
3709 # Client wait until server setup and perform a connect.
3710 evt.wait()
3711 client = context.wrap_socket(socket.socket())
3712 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003713 client.send(b'data')
3714 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003715 client_addr = client.getsockname()
3716 client.close()
3717 t.join()
3718 remote.close()
3719 server.close()
3720 # Sanity checks.
3721 self.assertIsInstance(remote, ssl.SSLSocket)
3722 self.assertEqual(peer, client_addr)
3723
3724 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003725 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003726 with context.wrap_socket(socket.socket()) as sock:
3727 with self.assertRaises(OSError) as cm:
3728 sock.getpeercert()
3729 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3730
3731 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003732 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003733 with context.wrap_socket(socket.socket()) as sock:
3734 with self.assertRaises(OSError) as cm:
3735 sock.do_handshake()
3736 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3737
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003738 def test_no_shared_ciphers(self):
3739 client_context, server_context, hostname = testing_context()
3740 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3741 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003742 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003743 client_context.set_ciphers("AES128")
3744 server_context.set_ciphers("AES256")
3745 with ThreadedEchoServer(context=server_context) as server:
3746 with client_context.wrap_socket(socket.socket(),
3747 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003748 with self.assertRaises(OSError):
3749 s.connect((HOST, server.port))
3750 self.assertIn("no shared cipher", server.conn_errors[0])
3751
3752 def test_version_basic(self):
3753 """
3754 Basic tests for SSLSocket.version().
3755 More tests are done in the test_protocol_*() methods.
3756 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003757 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3758 context.check_hostname = False
3759 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003761 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003762 chatty=False) as server:
3763 with context.wrap_socket(socket.socket()) as s:
3764 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003765 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003766 s.connect((HOST, server.port))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003767 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003768 self.assertEqual(s.version(), 'TLSv1.3')
3769 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003770 self.assertEqual(s.version(), 'TLSv1.2')
3771 else: # 0.9.8 to 1.0.1
3772 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003773 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003774 self.assertIs(s.version(), None)
3775
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003776 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003777 def test_tls1_3(self):
3778 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3779 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003780 context.options |= (
3781 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3782 )
3783 with ThreadedEchoServer(context=context) as server:
3784 with context.wrap_socket(socket.socket()) as s:
3785 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003786 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003787 'TLS_AES_256_GCM_SHA384',
3788 'TLS_CHACHA20_POLY1305_SHA256',
3789 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003790 })
3791 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003792
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003793 @requires_minimum_version
3794 @requires_tls_version('TLSv1_2')
3795 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003796 client_context, server_context, hostname = testing_context()
3797 # client TLSv1.0 to 1.2
3798 client_context.minimum_version = ssl.TLSVersion.TLSv1
3799 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3800 # server only TLSv1.2
3801 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3802 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3803
3804 with ThreadedEchoServer(context=server_context) as server:
3805 with client_context.wrap_socket(socket.socket(),
3806 server_hostname=hostname) as s:
3807 s.connect((HOST, server.port))
3808 self.assertEqual(s.version(), 'TLSv1.2')
3809
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003810 @requires_minimum_version
3811 @requires_tls_version('TLSv1_1')
3812 def test_min_max_version_tlsv1_1(self):
3813 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003814 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003815 client_context.minimum_version = ssl.TLSVersion.TLSv1
3816 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003817 server_context.minimum_version = ssl.TLSVersion.TLSv1
3818 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3819
3820 with ThreadedEchoServer(context=server_context) as server:
3821 with client_context.wrap_socket(socket.socket(),
3822 server_hostname=hostname) as s:
3823 s.connect((HOST, server.port))
3824 self.assertEqual(s.version(), 'TLSv1.1')
3825
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003826 @requires_minimum_version
3827 @requires_tls_version('TLSv1_2')
3828 def test_min_max_version_mismatch(self):
3829 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003830 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003831 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimesc9bc49c2019-09-11 19:24:47 +02003832 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003833 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimesde606ea2019-09-11 19:48:58 +02003834 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003835 with ThreadedEchoServer(context=server_context) as server:
3836 with client_context.wrap_socket(socket.socket(),
3837 server_hostname=hostname) as s:
3838 with self.assertRaises(ssl.SSLError) as e:
3839 s.connect((HOST, server.port))
3840 self.assertIn("alert", str(e.exception))
3841
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003842 @requires_minimum_version
3843 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003844 def test_min_max_version_sslv3(self):
3845 client_context, server_context, hostname = testing_context()
3846 server_context.minimum_version = ssl.TLSVersion.SSLv3
3847 client_context.minimum_version = ssl.TLSVersion.SSLv3
3848 client_context.maximum_version = ssl.TLSVersion.SSLv3
3849 with ThreadedEchoServer(context=server_context) as server:
3850 with client_context.wrap_socket(socket.socket(),
3851 server_hostname=hostname) as s:
3852 s.connect((HOST, server.port))
3853 self.assertEqual(s.version(), 'SSLv3')
3854
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003855 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3856 def test_default_ecdh_curve(self):
3857 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3858 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003860 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003861 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3862 # cipher name.
3863 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003864 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3865 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3866 # our default cipher list should prefer ECDH-based ciphers
3867 # automatically.
3868 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3869 context.set_ciphers("ECCdraft:ECDH")
3870 with ThreadedEchoServer(context=context) as server:
3871 with context.wrap_socket(socket.socket()) as s:
3872 s.connect((HOST, server.port))
3873 self.assertIn("ECDH", s.cipher()[0])
3874
3875 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3876 "'tls-unique' channel binding not available")
3877 def test_tls_unique_channel_binding(self):
3878 """Test tls-unique channel binding."""
3879 if support.verbose:
3880 sys.stdout.write("\n")
3881
Christian Heimes05d9fe32018-02-27 08:55:39 +01003882 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003883
3884 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 chatty=True,
3886 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003887
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003888 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003889 with client_context.wrap_socket(
3890 socket.socket(),
3891 server_hostname=hostname) as s:
3892 s.connect((HOST, server.port))
3893 # get the data
3894 cb_data = s.get_channel_binding("tls-unique")
3895 if support.verbose:
3896 sys.stdout.write(
3897 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003898
Christian Heimes05d9fe32018-02-27 08:55:39 +01003899 # check if it is sane
3900 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003901 if s.version() == 'TLSv1.3':
3902 self.assertEqual(len(cb_data), 48)
3903 else:
3904 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003905
Christian Heimes05d9fe32018-02-27 08:55:39 +01003906 # and compare with the peers version
3907 s.write(b"CB tls-unique\n")
3908 peer_data_repr = s.read().strip()
3909 self.assertEqual(peer_data_repr,
3910 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911
3912 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003913 with client_context.wrap_socket(
3914 socket.socket(),
3915 server_hostname=hostname) as s:
3916 s.connect((HOST, server.port))
3917 new_cb_data = s.get_channel_binding("tls-unique")
3918 if support.verbose:
3919 sys.stdout.write(
3920 "got another channel binding data: {0!r}\n".format(
3921 new_cb_data)
3922 )
3923 # is it really unique
3924 self.assertNotEqual(cb_data, new_cb_data)
3925 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003926 if s.version() == 'TLSv1.3':
3927 self.assertEqual(len(cb_data), 48)
3928 else:
3929 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003930 s.write(b"CB tls-unique\n")
3931 peer_data_repr = s.read().strip()
3932 self.assertEqual(peer_data_repr,
3933 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003934
3935 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003936 client_context, server_context, hostname = testing_context()
3937 stats = server_params_test(client_context, server_context,
3938 chatty=True, connectionchatty=True,
3939 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003940 if support.verbose:
3941 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3942 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3943
3944 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3945 "ssl.OP_NO_COMPRESSION needed for this test")
3946 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003947 client_context, server_context, hostname = testing_context()
3948 client_context.options |= ssl.OP_NO_COMPRESSION
3949 server_context.options |= ssl.OP_NO_COMPRESSION
3950 stats = server_params_test(client_context, server_context,
3951 chatty=True, connectionchatty=True,
3952 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003953 self.assertIs(stats['compression'], None)
3954
Paul Monsonf3550692019-06-19 13:09:54 -07003955 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003956 def test_dh_params(self):
3957 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003958 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003959 # test scenario needs TLS <= 1.2
3960 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003961 server_context.load_dh_params(DHFILE)
3962 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003963 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003964 stats = server_params_test(client_context, server_context,
3965 chatty=True, connectionchatty=True,
3966 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 cipher = stats["cipher"][0]
3968 parts = cipher.split("-")
3969 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3970 self.fail("Non-DH cipher: " + cipher[0])
3971
Christian Heimesb7b92252018-02-25 09:49:31 +01003972 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003973 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003974 def test_ecdh_curve(self):
3975 # server secp384r1, client auto
3976 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003977
Christian Heimesb7b92252018-02-25 09:49:31 +01003978 server_context.set_ecdh_curve("secp384r1")
3979 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3980 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3981 stats = server_params_test(client_context, server_context,
3982 chatty=True, connectionchatty=True,
3983 sni_name=hostname)
3984
3985 # server auto, client secp384r1
3986 client_context, server_context, hostname = testing_context()
3987 client_context.set_ecdh_curve("secp384r1")
3988 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3989 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3990 stats = server_params_test(client_context, server_context,
3991 chatty=True, connectionchatty=True,
3992 sni_name=hostname)
3993
3994 # server / client curve mismatch
3995 client_context, server_context, hostname = testing_context()
3996 client_context.set_ecdh_curve("prime256v1")
3997 server_context.set_ecdh_curve("secp384r1")
3998 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3999 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4000 try:
4001 stats = server_params_test(client_context, server_context,
4002 chatty=True, connectionchatty=True,
4003 sni_name=hostname)
4004 except ssl.SSLError:
4005 pass
4006 else:
4007 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004008 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004009 self.fail("mismatch curve did not fail")
4010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 def test_selected_alpn_protocol(self):
4012 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004013 client_context, server_context, hostname = testing_context()
4014 stats = server_params_test(client_context, server_context,
4015 chatty=True, connectionchatty=True,
4016 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 self.assertIs(stats['client_alpn_protocol'], None)
4018
4019 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4020 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4021 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004022 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004023 server_context.set_alpn_protocols(['foo', 'bar'])
4024 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004025 chatty=True, connectionchatty=True,
4026 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 self.assertIs(stats['client_alpn_protocol'], None)
4028
4029 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4030 def test_alpn_protocols(self):
4031 server_protocols = ['foo', 'bar', 'milkshake']
4032 protocol_tests = [
4033 (['foo', 'bar'], 'foo'),
4034 (['bar', 'foo'], 'foo'),
4035 (['milkshake'], 'milkshake'),
4036 (['http/3.0', 'http/4.0'], None)
4037 ]
4038 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004039 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004040 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004041 client_context.set_alpn_protocols(client_protocols)
4042
4043 try:
4044 stats = server_params_test(client_context,
4045 server_context,
4046 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004047 connectionchatty=True,
4048 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 except ssl.SSLError as e:
4050 stats = e
4051
Christian Heimes05d9fe32018-02-27 08:55:39 +01004052 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004053 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4054 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4055 self.assertIsInstance(stats, ssl.SSLError)
4056 else:
4057 msg = "failed trying %s (s) and %s (c).\n" \
4058 "was expecting %s, but got %%s from the %%s" \
4059 % (str(server_protocols), str(client_protocols),
4060 str(expected))
4061 client_result = stats['client_alpn_protocol']
4062 self.assertEqual(client_result, expected,
4063 msg % (client_result, "client"))
4064 server_result = stats['server_alpn_protocols'][-1] \
4065 if len(stats['server_alpn_protocols']) else 'nothing'
4066 self.assertEqual(server_result, expected,
4067 msg % (server_result, "server"))
4068
4069 def test_selected_npn_protocol(self):
4070 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004071 client_context, server_context, hostname = testing_context()
4072 stats = server_params_test(client_context, server_context,
4073 chatty=True, connectionchatty=True,
4074 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 self.assertIs(stats['client_npn_protocol'], None)
4076
4077 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4078 def test_npn_protocols(self):
4079 server_protocols = ['http/1.1', 'spdy/2']
4080 protocol_tests = [
4081 (['http/1.1', 'spdy/2'], 'http/1.1'),
4082 (['spdy/2', 'http/1.1'], 'http/1.1'),
4083 (['spdy/2', 'test'], 'spdy/2'),
4084 (['abc', 'def'], 'abc')
4085 ]
4086 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004087 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004088 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004089 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004090 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004091 chatty=True, connectionchatty=True,
4092 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 msg = "failed trying %s (s) and %s (c).\n" \
4094 "was expecting %s, but got %%s from the %%s" \
4095 % (str(server_protocols), str(client_protocols),
4096 str(expected))
4097 client_result = stats['client_npn_protocol']
4098 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4099 server_result = stats['server_npn_protocols'][-1] \
4100 if len(stats['server_npn_protocols']) else 'nothing'
4101 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4102
4103 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004105 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004106 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004107 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004108 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 client_context.load_verify_locations(SIGNING_CA)
4110 return server_context, other_context, client_context
4111
4112 def check_common_name(self, stats, name):
4113 cert = stats['peercert']
4114 self.assertIn((('commonName', name),), cert['subject'])
4115
4116 @needs_sni
4117 def test_sni_callback(self):
4118 calls = []
4119 server_context, other_context, client_context = self.sni_contexts()
4120
Christian Heimesa170fa12017-09-15 20:27:30 +02004121 client_context.check_hostname = False
4122
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004123 def servername_cb(ssl_sock, server_name, initial_context):
4124 calls.append((server_name, initial_context))
4125 if server_name is not None:
4126 ssl_sock.context = other_context
4127 server_context.set_servername_callback(servername_cb)
4128
4129 stats = server_params_test(client_context, server_context,
4130 chatty=True,
4131 sni_name='supermessage')
4132 # The hostname was fetched properly, and the certificate was
4133 # changed for the connection.
4134 self.assertEqual(calls, [("supermessage", server_context)])
4135 # CERTFILE4 was selected
4136 self.check_common_name(stats, 'fakehostname')
4137
4138 calls = []
4139 # The callback is called with server_name=None
4140 stats = server_params_test(client_context, server_context,
4141 chatty=True,
4142 sni_name=None)
4143 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004144 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004145
4146 # Check disabling the callback
4147 calls = []
4148 server_context.set_servername_callback(None)
4149
4150 stats = server_params_test(client_context, server_context,
4151 chatty=True,
4152 sni_name='notfunny')
4153 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004154 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004155 self.assertEqual(calls, [])
4156
4157 @needs_sni
4158 def test_sni_callback_alert(self):
4159 # Returning a TLS alert is reflected to the connecting client
4160 server_context, other_context, client_context = self.sni_contexts()
4161
4162 def cb_returning_alert(ssl_sock, server_name, initial_context):
4163 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4164 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004165 with self.assertRaises(ssl.SSLError) as cm:
4166 stats = server_params_test(client_context, server_context,
4167 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004168 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004169 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004171 @needs_sni
4172 def test_sni_callback_raising(self):
4173 # Raising fails the connection with a TLS handshake failure alert.
4174 server_context, other_context, client_context = self.sni_contexts()
4175
4176 def cb_raising(ssl_sock, server_name, initial_context):
4177 1/0
4178 server_context.set_servername_callback(cb_raising)
4179
Victor Stinner00253502019-06-03 03:51:43 +02004180 with support.catch_unraisable_exception() as catch:
4181 with self.assertRaises(ssl.SSLError) as cm:
4182 stats = server_params_test(client_context, server_context,
4183 chatty=False,
4184 sni_name='supermessage')
4185
4186 self.assertEqual(cm.exception.reason,
4187 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4188 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004190 @needs_sni
4191 def test_sni_callback_wrong_return_type(self):
4192 # Returning the wrong return type terminates the TLS connection
4193 # with an internal error alert.
4194 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004195
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004196 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4197 return "foo"
4198 server_context.set_servername_callback(cb_wrong_return_type)
4199
Victor Stinner00253502019-06-03 03:51:43 +02004200 with support.catch_unraisable_exception() as catch:
4201 with self.assertRaises(ssl.SSLError) as cm:
4202 stats = server_params_test(client_context, server_context,
4203 chatty=False,
4204 sni_name='supermessage')
4205
4206
4207 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4208 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004209
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004210 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004211 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004212 client_context.set_ciphers("AES128:AES256")
4213 server_context.set_ciphers("AES256")
4214 expected_algs = [
4215 "AES256", "AES-256",
4216 # TLS 1.3 ciphers are always enabled
4217 "TLS_CHACHA20", "TLS_AES",
4218 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004219
Christian Heimesa170fa12017-09-15 20:27:30 +02004220 stats = server_params_test(client_context, server_context,
4221 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004222 ciphers = stats['server_shared_ciphers'][0]
4223 self.assertGreater(len(ciphers), 0)
4224 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004225 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004226 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004227
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004228 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004229 client_context, server_context, hostname = testing_context()
4230 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004231
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004232 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004233 s = client_context.wrap_socket(socket.socket(),
4234 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004235 s.connect((HOST, server.port))
4236 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004237
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004238 self.assertRaises(ValueError, s.read, 1024)
4239 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004240
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004241 def test_sendfile(self):
4242 TEST_DATA = b"x" * 512
4243 with open(support.TESTFN, 'wb') as f:
4244 f.write(TEST_DATA)
4245 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004246 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004247 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004248 context.load_verify_locations(SIGNING_CA)
4249 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004250 server = ThreadedEchoServer(context=context, chatty=False)
4251 with server:
4252 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004253 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004254 with open(support.TESTFN, 'rb') as file:
4255 s.sendfile(file)
4256 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004257
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004258 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004259 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004260 # TODO: sessions aren't compatible with TLSv1.3 yet
4261 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004262
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004263 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004264 stats = server_params_test(client_context, server_context,
4265 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004266 session = stats['session']
4267 self.assertTrue(session.id)
4268 self.assertGreater(session.time, 0)
4269 self.assertGreater(session.timeout, 0)
4270 self.assertTrue(session.has_ticket)
4271 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4272 self.assertGreater(session.ticket_lifetime_hint, 0)
4273 self.assertFalse(stats['session_reused'])
4274 sess_stat = server_context.session_stats()
4275 self.assertEqual(sess_stat['accept'], 1)
4276 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004277
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004278 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004279 stats = server_params_test(client_context, server_context,
4280 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004281 sess_stat = server_context.session_stats()
4282 self.assertEqual(sess_stat['accept'], 2)
4283 self.assertEqual(sess_stat['hits'], 1)
4284 self.assertTrue(stats['session_reused'])
4285 session2 = stats['session']
4286 self.assertEqual(session2.id, session.id)
4287 self.assertEqual(session2, session)
4288 self.assertIsNot(session2, session)
4289 self.assertGreaterEqual(session2.time, session.time)
4290 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004292 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004293 stats = server_params_test(client_context, server_context,
4294 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004295 self.assertFalse(stats['session_reused'])
4296 session3 = stats['session']
4297 self.assertNotEqual(session3.id, session.id)
4298 self.assertNotEqual(session3, session)
4299 sess_stat = server_context.session_stats()
4300 self.assertEqual(sess_stat['accept'], 3)
4301 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004302
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004303 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004304 stats = server_params_test(client_context, server_context,
4305 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004306 self.assertTrue(stats['session_reused'])
4307 session4 = stats['session']
4308 self.assertEqual(session4.id, session.id)
4309 self.assertEqual(session4, session)
4310 self.assertGreaterEqual(session4.time, session.time)
4311 self.assertGreaterEqual(session4.timeout, session.timeout)
4312 sess_stat = server_context.session_stats()
4313 self.assertEqual(sess_stat['accept'], 4)
4314 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004315
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004316 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004317 client_context, server_context, hostname = testing_context()
4318 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004319
Christian Heimes05d9fe32018-02-27 08:55:39 +01004320 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004321 client_context.options |= ssl.OP_NO_TLSv1_3
4322 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004323
Christian Heimesa170fa12017-09-15 20:27:30 +02004324 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004325 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004326 with client_context.wrap_socket(socket.socket(),
4327 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004328 # session is None before handshake
4329 self.assertEqual(s.session, None)
4330 self.assertEqual(s.session_reused, None)
4331 s.connect((HOST, server.port))
4332 session = s.session
4333 self.assertTrue(session)
4334 with self.assertRaises(TypeError) as e:
4335 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004336 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004337
Christian Heimesa170fa12017-09-15 20:27:30 +02004338 with client_context.wrap_socket(socket.socket(),
4339 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004340 s.connect((HOST, server.port))
4341 # cannot set session after handshake
4342 with self.assertRaises(ValueError) as e:
4343 s.session = session
4344 self.assertEqual(str(e.exception),
4345 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004346
Christian Heimesa170fa12017-09-15 20:27:30 +02004347 with client_context.wrap_socket(socket.socket(),
4348 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004349 # can set session before handshake and before the
4350 # connection was established
4351 s.session = session
4352 s.connect((HOST, server.port))
4353 self.assertEqual(s.session.id, session.id)
4354 self.assertEqual(s.session, session)
4355 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004356
Christian Heimesa170fa12017-09-15 20:27:30 +02004357 with client_context2.wrap_socket(socket.socket(),
4358 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004359 # cannot re-use session with a different SSLContext
4360 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004361 s.session = session
4362 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004363 self.assertEqual(str(e.exception),
4364 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004365
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004366
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02004367@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004368class TestPostHandshakeAuth(unittest.TestCase):
4369 def test_pha_setter(self):
4370 protocols = [
4371 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4372 ]
4373 for protocol in protocols:
4374 ctx = ssl.SSLContext(protocol)
4375 self.assertEqual(ctx.post_handshake_auth, False)
4376
4377 ctx.post_handshake_auth = True
4378 self.assertEqual(ctx.post_handshake_auth, True)
4379
4380 ctx.verify_mode = ssl.CERT_REQUIRED
4381 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4382 self.assertEqual(ctx.post_handshake_auth, True)
4383
4384 ctx.post_handshake_auth = False
4385 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4386 self.assertEqual(ctx.post_handshake_auth, False)
4387
4388 ctx.verify_mode = ssl.CERT_OPTIONAL
4389 ctx.post_handshake_auth = True
4390 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4391 self.assertEqual(ctx.post_handshake_auth, True)
4392
4393 def test_pha_required(self):
4394 client_context, server_context, hostname = testing_context()
4395 server_context.post_handshake_auth = True
4396 server_context.verify_mode = ssl.CERT_REQUIRED
4397 client_context.post_handshake_auth = True
4398 client_context.load_cert_chain(SIGNED_CERTFILE)
4399
4400 server = ThreadedEchoServer(context=server_context, chatty=False)
4401 with server:
4402 with client_context.wrap_socket(socket.socket(),
4403 server_hostname=hostname) as s:
4404 s.connect((HOST, server.port))
4405 s.write(b'HASCERT')
4406 self.assertEqual(s.recv(1024), b'FALSE\n')
4407 s.write(b'PHA')
4408 self.assertEqual(s.recv(1024), b'OK\n')
4409 s.write(b'HASCERT')
4410 self.assertEqual(s.recv(1024), b'TRUE\n')
4411 # PHA method just returns true when cert is already available
4412 s.write(b'PHA')
4413 self.assertEqual(s.recv(1024), b'OK\n')
4414 s.write(b'GETCERT')
4415 cert_text = s.recv(4096).decode('us-ascii')
4416 self.assertIn('Python Software Foundation CA', cert_text)
4417
4418 def test_pha_required_nocert(self):
4419 client_context, server_context, hostname = testing_context()
4420 server_context.post_handshake_auth = True
4421 server_context.verify_mode = ssl.CERT_REQUIRED
4422 client_context.post_handshake_auth = True
4423
Victor Stinner73ea5462019-07-09 14:33:49 +02004424 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4425 # (it is only raised sometimes on Windows)
4426 with support.catch_threading_exception() as cm:
4427 server = ThreadedEchoServer(context=server_context, chatty=False)
4428 with server:
4429 with client_context.wrap_socket(socket.socket(),
4430 server_hostname=hostname) as s:
4431 s.connect((HOST, server.port))
4432 s.write(b'PHA')
4433 # receive CertificateRequest
4434 self.assertEqual(s.recv(1024), b'OK\n')
4435 # send empty Certificate + Finish
4436 s.write(b'HASCERT')
4437 # receive alert
4438 with self.assertRaisesRegex(
4439 ssl.SSLError,
4440 'tlsv13 alert certificate required'):
4441 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004442
4443 def test_pha_optional(self):
4444 if support.verbose:
4445 sys.stdout.write("\n")
4446
4447 client_context, server_context, hostname = testing_context()
4448 server_context.post_handshake_auth = True
4449 server_context.verify_mode = ssl.CERT_REQUIRED
4450 client_context.post_handshake_auth = True
4451 client_context.load_cert_chain(SIGNED_CERTFILE)
4452
4453 # check CERT_OPTIONAL
4454 server_context.verify_mode = ssl.CERT_OPTIONAL
4455 server = ThreadedEchoServer(context=server_context, chatty=False)
4456 with server:
4457 with client_context.wrap_socket(socket.socket(),
4458 server_hostname=hostname) as s:
4459 s.connect((HOST, server.port))
4460 s.write(b'HASCERT')
4461 self.assertEqual(s.recv(1024), b'FALSE\n')
4462 s.write(b'PHA')
4463 self.assertEqual(s.recv(1024), b'OK\n')
4464 s.write(b'HASCERT')
4465 self.assertEqual(s.recv(1024), b'TRUE\n')
4466
4467 def test_pha_optional_nocert(self):
4468 if support.verbose:
4469 sys.stdout.write("\n")
4470
4471 client_context, server_context, hostname = testing_context()
4472 server_context.post_handshake_auth = True
4473 server_context.verify_mode = ssl.CERT_OPTIONAL
4474 client_context.post_handshake_auth = True
4475
4476 server = ThreadedEchoServer(context=server_context, chatty=False)
4477 with server:
4478 with client_context.wrap_socket(socket.socket(),
4479 server_hostname=hostname) as s:
4480 s.connect((HOST, server.port))
4481 s.write(b'HASCERT')
4482 self.assertEqual(s.recv(1024), b'FALSE\n')
4483 s.write(b'PHA')
4484 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004485 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004486 s.write(b'HASCERT')
4487 self.assertEqual(s.recv(1024), b'FALSE\n')
4488
4489 def test_pha_no_pha_client(self):
4490 client_context, server_context, hostname = testing_context()
4491 server_context.post_handshake_auth = True
4492 server_context.verify_mode = ssl.CERT_REQUIRED
4493 client_context.load_cert_chain(SIGNED_CERTFILE)
4494
4495 server = ThreadedEchoServer(context=server_context, chatty=False)
4496 with server:
4497 with client_context.wrap_socket(socket.socket(),
4498 server_hostname=hostname) as s:
4499 s.connect((HOST, server.port))
4500 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4501 s.verify_client_post_handshake()
4502 s.write(b'PHA')
4503 self.assertIn(b'extension not received', s.recv(1024))
4504
4505 def test_pha_no_pha_server(self):
4506 # server doesn't have PHA enabled, cert is requested in handshake
4507 client_context, server_context, hostname = testing_context()
4508 server_context.verify_mode = ssl.CERT_REQUIRED
4509 client_context.post_handshake_auth = True
4510 client_context.load_cert_chain(SIGNED_CERTFILE)
4511
4512 server = ThreadedEchoServer(context=server_context, chatty=False)
4513 with server:
4514 with client_context.wrap_socket(socket.socket(),
4515 server_hostname=hostname) as s:
4516 s.connect((HOST, server.port))
4517 s.write(b'HASCERT')
4518 self.assertEqual(s.recv(1024), b'TRUE\n')
4519 # PHA doesn't fail if there is already a cert
4520 s.write(b'PHA')
4521 self.assertEqual(s.recv(1024), b'OK\n')
4522 s.write(b'HASCERT')
4523 self.assertEqual(s.recv(1024), b'TRUE\n')
4524
4525 def test_pha_not_tls13(self):
4526 # TLS 1.2
4527 client_context, server_context, hostname = testing_context()
4528 server_context.verify_mode = ssl.CERT_REQUIRED
4529 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4530 client_context.post_handshake_auth = True
4531 client_context.load_cert_chain(SIGNED_CERTFILE)
4532
4533 server = ThreadedEchoServer(context=server_context, chatty=False)
4534 with server:
4535 with client_context.wrap_socket(socket.socket(),
4536 server_hostname=hostname) as s:
4537 s.connect((HOST, server.port))
4538 # PHA fails for TLS != 1.3
4539 s.write(b'PHA')
4540 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4541
Christian Heimesf0f59302019-07-01 08:29:17 +02004542 def test_bpo37428_pha_cert_none(self):
4543 # verify that post_handshake_auth does not implicitly enable cert
4544 # validation.
4545 hostname = SIGNED_CERTFILE_HOSTNAME
4546 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4547 client_context.post_handshake_auth = True
4548 client_context.load_cert_chain(SIGNED_CERTFILE)
4549 # no cert validation and CA on client side
4550 client_context.check_hostname = False
4551 client_context.verify_mode = ssl.CERT_NONE
4552
4553 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4554 server_context.load_cert_chain(SIGNED_CERTFILE)
4555 server_context.load_verify_locations(SIGNING_CA)
4556 server_context.post_handshake_auth = True
4557 server_context.verify_mode = ssl.CERT_REQUIRED
4558
4559 server = ThreadedEchoServer(context=server_context, chatty=False)
4560 with server:
4561 with client_context.wrap_socket(socket.socket(),
4562 server_hostname=hostname) as s:
4563 s.connect((HOST, server.port))
4564 s.write(b'HASCERT')
4565 self.assertEqual(s.recv(1024), b'FALSE\n')
4566 s.write(b'PHA')
4567 self.assertEqual(s.recv(1024), b'OK\n')
4568 s.write(b'HASCERT')
4569 self.assertEqual(s.recv(1024), b'TRUE\n')
4570 # server cert has not been validated
4571 self.assertEqual(s.getpeercert(), {})
4572
Christian Heimes9fb051f2018-09-23 08:32:31 +02004573
Christian Heimesc7f70692019-05-31 11:44:05 +02004574HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4575requires_keylog = unittest.skipUnless(
4576 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4577
4578class TestSSLDebug(unittest.TestCase):
4579
4580 def keylog_lines(self, fname=support.TESTFN):
4581 with open(fname) as f:
4582 return len(list(f))
4583
4584 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004585 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004586 def test_keylog_defaults(self):
4587 self.addCleanup(support.unlink, support.TESTFN)
4588 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4589 self.assertEqual(ctx.keylog_filename, None)
4590
4591 self.assertFalse(os.path.isfile(support.TESTFN))
4592 ctx.keylog_filename = support.TESTFN
4593 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4594 self.assertTrue(os.path.isfile(support.TESTFN))
4595 self.assertEqual(self.keylog_lines(), 1)
4596
4597 ctx.keylog_filename = None
4598 self.assertEqual(ctx.keylog_filename, None)
4599
4600 with self.assertRaises((IsADirectoryError, PermissionError)):
4601 # Windows raises PermissionError
4602 ctx.keylog_filename = os.path.dirname(
4603 os.path.abspath(support.TESTFN))
4604
4605 with self.assertRaises(TypeError):
4606 ctx.keylog_filename = 1
4607
4608 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004609 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004610 def test_keylog_filename(self):
4611 self.addCleanup(support.unlink, support.TESTFN)
4612 client_context, server_context, hostname = testing_context()
4613
4614 client_context.keylog_filename = support.TESTFN
4615 server = ThreadedEchoServer(context=server_context, chatty=False)
4616 with server:
4617 with client_context.wrap_socket(socket.socket(),
4618 server_hostname=hostname) as s:
4619 s.connect((HOST, server.port))
4620 # header, 5 lines for TLS 1.3
4621 self.assertEqual(self.keylog_lines(), 6)
4622
4623 client_context.keylog_filename = None
4624 server_context.keylog_filename = support.TESTFN
4625 server = ThreadedEchoServer(context=server_context, chatty=False)
4626 with server:
4627 with client_context.wrap_socket(socket.socket(),
4628 server_hostname=hostname) as s:
4629 s.connect((HOST, server.port))
4630 self.assertGreaterEqual(self.keylog_lines(), 11)
4631
4632 client_context.keylog_filename = support.TESTFN
4633 server_context.keylog_filename = support.TESTFN
4634 server = ThreadedEchoServer(context=server_context, chatty=False)
4635 with server:
4636 with client_context.wrap_socket(socket.socket(),
4637 server_hostname=hostname) as s:
4638 s.connect((HOST, server.port))
4639 self.assertGreaterEqual(self.keylog_lines(), 21)
4640
4641 client_context.keylog_filename = None
4642 server_context.keylog_filename = None
4643
4644 @requires_keylog
4645 @unittest.skipIf(sys.flags.ignore_environment,
4646 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004647 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004648 def test_keylog_env(self):
4649 self.addCleanup(support.unlink, support.TESTFN)
4650 with unittest.mock.patch.dict(os.environ):
4651 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4652 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4653
4654 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4655 self.assertEqual(ctx.keylog_filename, None)
4656
4657 ctx = ssl.create_default_context()
4658 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4659
4660 ctx = ssl._create_stdlib_context()
4661 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4662
4663 def test_msg_callback(self):
4664 client_context, server_context, hostname = testing_context()
4665
4666 def msg_cb(conn, direction, version, content_type, msg_type, data):
4667 pass
4668
4669 self.assertIs(client_context._msg_callback, None)
4670 client_context._msg_callback = msg_cb
4671 self.assertIs(client_context._msg_callback, msg_cb)
4672 with self.assertRaises(TypeError):
4673 client_context._msg_callback = object()
4674
4675 def test_msg_callback_tls12(self):
4676 client_context, server_context, hostname = testing_context()
4677 client_context.options |= ssl.OP_NO_TLSv1_3
4678
4679 msg = []
4680
4681 def msg_cb(conn, direction, version, content_type, msg_type, data):
4682 self.assertIsInstance(conn, ssl.SSLSocket)
4683 self.assertIsInstance(data, bytes)
4684 self.assertIn(direction, {'read', 'write'})
4685 msg.append((direction, version, content_type, msg_type))
4686
4687 client_context._msg_callback = msg_cb
4688
4689 server = ThreadedEchoServer(context=server_context, chatty=False)
4690 with server:
4691 with client_context.wrap_socket(socket.socket(),
4692 server_hostname=hostname) as s:
4693 s.connect((HOST, server.port))
4694
Christian Heimese35d1ba2019-06-03 20:40:15 +02004695 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004696 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4697 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004698 msg
4699 )
4700 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004701 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4702 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004703 msg
4704 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004705
4706
Thomas Woutersed03b412007-08-28 21:37:11 +00004707def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004708 if support.verbose:
4709 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004710 'Mac': platform.mac_ver,
4711 'Windows': platform.win32_ver,
4712 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004713 for name, func in plats.items():
4714 plat = func()
4715 if plat and plat[0]:
4716 plat = '%s %r' % (name, plat)
4717 break
4718 else:
4719 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004720 print("test_ssl: testing with %r %r" %
4721 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4722 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004723 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004724 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4725 try:
4726 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4727 except AttributeError:
4728 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004729
Antoine Pitrou152efa22010-05-16 18:19:27 +00004730 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004731 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004732 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004733 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004734 BADCERT, BADKEY, EMPTYCERT]:
4735 if not os.path.exists(filename):
4736 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004737
Martin Panter3840b2a2016-03-27 01:53:46 +00004738 tests = [
4739 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004740 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004741 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004742 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004743
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004744 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004745 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004746
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004747 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004748 try:
4749 support.run_unittest(*tests)
4750 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004751 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004752
4753if __name__ == "__main__":
4754 test_main()