blob: 4184665b2b15810cac5a0c5a56bc588db3a21d6b [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimesdf6ac7e2019-09-26 17:02:59 +020022import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070023try:
24 import ctypes
25except ImportError:
26 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000027
Antoine Pitrou05d936d2010-10-13 11:38:36 +000028ssl = support.import_module("ssl")
29
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020030from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000031
Paul Monsonf3550692019-06-19 13:09:54 -070032Py_DEBUG = hasattr(sys, 'gettotalrefcount')
33Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
34
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010035PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020037IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010038IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
39IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010040PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000041
Victor Stinner3ef63442019-02-19 18:06:03 +010042PROTOCOL_TO_TLS_VERSION = {}
43for proto, ver in (
44 ("PROTOCOL_SSLv23", "SSLv3"),
45 ("PROTOCOL_TLSv1", "TLSv1"),
46 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
47):
48 try:
49 proto = getattr(ssl, proto)
50 ver = getattr(ssl.TLSVersion, ver)
51 except AttributeError:
52 continue
53 PROTOCOL_TO_TLS_VERSION[proto] = ver
54
Christian Heimesefff7062013-11-21 03:35:02 +010055def data_file(*name):
56 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000057
Antoine Pitrou81564092010-10-08 23:06:24 +000058# The custom key and certificate files used in test_ssl are generated
59# using Lib/test/make_ssl_certs.py.
60# Other certificates are simply fetched from the Internet servers they
61# are meant to authenticate.
62
Antoine Pitrou152efa22010-05-16 18:19:27 +000063CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000064BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000065ONLYCERT = data_file("ssl_cert.pem")
66ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000067BYTES_ONLYCERT = os.fsencode(ONLYCERT)
68BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020069CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
70ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
71KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000072CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000073BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010074CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
75CAFILE_CACERT = data_file("capath", "5ed36f99.0")
76
Christian Heimesbd5c7d22018-01-20 15:16:30 +010077CERTFILE_INFO = {
78 'issuer': ((('countryName', 'XY'),),
79 (('localityName', 'Castle Anthrax'),),
80 (('organizationName', 'Python Software Foundation'),),
81 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020082 'notAfter': 'Aug 26 14:23:15 2028 GMT',
83 'notBefore': 'Aug 29 14:23:15 2018 GMT',
84 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010085 'subject': ((('countryName', 'XY'),),
86 (('localityName', 'Castle Anthrax'),),
87 (('organizationName', 'Python Software Foundation'),),
88 (('commonName', 'localhost'),)),
89 'subjectAltName': (('DNS', 'localhost'),),
90 'version': 3
91}
Antoine Pitrou152efa22010-05-16 18:19:27 +000092
Christian Heimes22587792013-11-21 23:56:13 +010093# empty CRL
94CRLFILE = data_file("revocation.crl")
95
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010096# Two keys and certs signed by the same CA (for SNI tests)
97SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020098SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010099
100SIGNED_CERTFILE_INFO = {
101 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
102 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
103 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
104 'issuer': ((('countryName', 'XY'),),
105 (('organizationName', 'Python Software Foundation CA'),),
106 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200107 'notAfter': 'Jul 7 14:23:16 2028 GMT',
108 'notBefore': 'Aug 29 14:23:16 2018 GMT',
109 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100110 'subject': ((('countryName', 'XY'),),
111 (('localityName', 'Castle Anthrax'),),
112 (('organizationName', 'Python Software Foundation'),),
113 (('commonName', 'localhost'),)),
114 'subjectAltName': (('DNS', 'localhost'),),
115 'version': 3
116}
117
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100118SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200119SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100120SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
121SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
122
Martin Panter3840b2a2016-03-27 01:53:46 +0000123# Same certificate as pycacert.pem, but without extra text in file
124SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200125# cert with all kinds of subject alt names
126ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100127IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Martin Panter3d81d932016-01-14 09:36:00 +0000129REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130
131EMPTYCERT = data_file("nullcert.pem")
132BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000133NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200135NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200136NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100137TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000138
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200139DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100140BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000141
Christian Heimes358cfd42016-09-10 22:43:48 +0200142# Not defined in all versions of OpenSSL
143OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
144OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
145OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
146OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100147OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200148
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100149
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200150def has_tls_protocol(protocol):
151 """Check if a TLS protocol is available and enabled
152
153 :param protocol: enum ssl._SSLMethod member or name
154 :return: bool
155 """
156 if isinstance(protocol, str):
157 assert protocol.startswith('PROTOCOL_')
158 protocol = getattr(ssl, protocol, None)
159 if protocol is None:
160 return False
161 if protocol in {
162 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
163 ssl.PROTOCOL_TLS_CLIENT
164 }:
165 # auto-negotiate protocols are always available
166 return True
167 name = protocol.name
168 return has_tls_version(name[len('PROTOCOL_'):])
169
170
171@functools.lru_cache
172def has_tls_version(version):
173 """Check if a TLS/SSL version is enabled
174
175 :param version: TLS version name or ssl.TLSVersion member
176 :return: bool
177 """
178 if version == "SSLv2":
179 # never supported and not even in TLSVersion enum
180 return False
181
182 if isinstance(version, str):
183 version = ssl.TLSVersion.__members__[version]
184
185 # check compile time flags like ssl.HAS_TLSv1_2
186 if not getattr(ssl, f'HAS_{version.name}'):
187 return False
188
189 # check runtime and dynamic crypto policy settings. A TLS version may
190 # be compiled in but disabled by a policy or config option.
191 ctx = ssl.SSLContext()
192 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200193 hasattr(ctx, 'minimum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200194 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
195 version < ctx.minimum_version
196 ):
197 return False
198 if (
Christian Heimes9f772682019-09-26 18:23:17 +0200199 hasattr(ctx, 'maximum_version') and
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200200 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
201 version > ctx.maximum_version
202 ):
203 return False
204
205 return True
206
207
208def requires_tls_version(version):
209 """Decorator to skip tests when a required TLS version is not available
210
211 :param version: TLS version name or ssl.TLSVersion member
212 :return:
213 """
214 def decorator(func):
215 @functools.wraps(func)
216 def wrapper(*args, **kw):
217 if not has_tls_version(version):
218 raise unittest.SkipTest(f"{version} is not available.")
219 else:
220 return func(*args, **kw)
221 return wrapper
222 return decorator
223
224
225requires_minimum_version = unittest.skipUnless(
226 hasattr(ssl.SSLContext, 'minimum_version'),
227 "required OpenSSL >= 1.1.0g"
228)
229
230
Thomas Woutersed03b412007-08-28 21:37:11 +0000231def handle_error(prefix):
232 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000233 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000234 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000235
Antoine Pitroub5218772010-05-21 09:56:06 +0000236def can_clear_options():
237 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200238 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000239
240def no_sslv2_implies_sslv3_hello():
241 # 0.9.7h or higher
242 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
243
Christian Heimes2427b502013-11-23 11:24:32 +0100244def have_verify_flags():
245 # 0.9.8 or higher
246 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
247
Christian Heimesb7b92252018-02-25 09:49:31 +0100248def _have_secp_curves():
249 if not ssl.HAS_ECDH:
250 return False
251 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
252 try:
253 ctx.set_ecdh_curve("secp384r1")
254 except ValueError:
255 return False
256 else:
257 return True
258
259
260HAVE_SECP_CURVES = _have_secp_curves()
261
262
Antoine Pitrouc695c952014-04-28 20:57:36 +0200263def utc_offset(): #NOTE: ignore issues like #1647654
264 # local time = utc time + utc offset
265 if time.daylight and time.localtime().tm_isdst > 0:
266 return -time.altzone # seconds
267 return -time.timezone
268
Christian Heimes9424bb42013-06-17 15:32:57 +0200269def asn1time(cert_time):
270 # Some versions of OpenSSL ignore seconds, see #18207
271 # 0.9.8.i
272 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
273 fmt = "%b %d %H:%M:%S %Y GMT"
274 dt = datetime.datetime.strptime(cert_time, fmt)
275 dt = dt.replace(second=0)
276 cert_time = dt.strftime(fmt)
277 # %d adds leading zero but ASN1_TIME_print() uses leading space
278 if cert_time[4] == "0":
279 cert_time = cert_time[:4] + " " + cert_time[5:]
280
281 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000282
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100283needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
284
Antoine Pitrou23df4832010-08-04 17:14:06 +0000285
Christian Heimesd0486372016-09-10 23:23:33 +0200286def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
287 cert_reqs=ssl.CERT_NONE, ca_certs=None,
288 ciphers=None, certfile=None, keyfile=None,
289 **kwargs):
290 context = ssl.SSLContext(ssl_version)
291 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200292 if cert_reqs == ssl.CERT_NONE:
293 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200294 context.verify_mode = cert_reqs
295 if ca_certs is not None:
296 context.load_verify_locations(ca_certs)
297 if certfile is not None or keyfile is not None:
298 context.load_cert_chain(certfile, keyfile)
299 if ciphers is not None:
300 context.set_ciphers(ciphers)
301 return context.wrap_socket(sock, **kwargs)
302
Christian Heimesa170fa12017-09-15 20:27:30 +0200303
304def testing_context(server_cert=SIGNED_CERTFILE):
305 """Create context
306
307 client_context, server_context, hostname = testing_context()
308 """
309 if server_cert == SIGNED_CERTFILE:
310 hostname = SIGNED_CERTFILE_HOSTNAME
311 elif server_cert == SIGNED_CERTFILE2:
312 hostname = SIGNED_CERTFILE2_HOSTNAME
313 else:
314 raise ValueError(server_cert)
315
316 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
317 client_context.load_verify_locations(SIGNING_CA)
318
319 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
320 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200321 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200322
323 return client_context, server_context, hostname
324
325
Antoine Pitrou152efa22010-05-16 18:19:27 +0000326class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000327
Antoine Pitrou480a1242010-04-28 21:37:09 +0000328 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000329 ssl.CERT_NONE
330 ssl.CERT_OPTIONAL
331 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100332 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100333 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100334 if ssl.HAS_ECDH:
335 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100336 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
337 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000338 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100339 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700340 ssl.OP_NO_SSLv2
341 ssl.OP_NO_SSLv3
342 ssl.OP_NO_TLSv1
343 ssl.OP_NO_TLSv1_3
344 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
345 ssl.OP_NO_TLSv1_1
346 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200347 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000348
Christian Heimes9d50ab52018-02-27 10:17:30 +0100349 def test_private_init(self):
350 with self.assertRaisesRegex(TypeError, "public constructor"):
351 with socket.socket() as s:
352 ssl.SSLSocket(s)
353
Antoine Pitrou172f0252014-04-18 20:33:08 +0200354 def test_str_for_enums(self):
355 # Make sure that the PROTOCOL_* constants have enum-like string
356 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200357 proto = ssl.PROTOCOL_TLS
358 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200359 ctx = ssl.SSLContext(proto)
360 self.assertIs(ctx.protocol, proto)
361
Antoine Pitrou480a1242010-04-28 21:37:09 +0000362 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000363 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000364 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000365 sys.stdout.write("\n RAND_status is %d (%s)\n"
366 % (v, (v and "sufficient randomness") or
367 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200368
369 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
370 self.assertEqual(len(data), 16)
371 self.assertEqual(is_cryptographic, v == 1)
372 if v:
373 data = ssl.RAND_bytes(16)
374 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200375 else:
376 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200377
Victor Stinner1e81a392013-12-19 16:47:04 +0100378 # negative num is invalid
379 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
380 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
381
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100382 if hasattr(ssl, 'RAND_egd'):
383 self.assertRaises(TypeError, ssl.RAND_egd, 1)
384 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000385 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200386 ssl.RAND_add(b"this is a random bytes object", 75.0)
387 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000388
Christian Heimesf77b4b22013-08-21 13:26:05 +0200389 @unittest.skipUnless(os.name == 'posix', 'requires posix')
390 def test_random_fork(self):
391 status = ssl.RAND_status()
392 if not status:
393 self.fail("OpenSSL's PRNG has insufficient randomness")
394
395 rfd, wfd = os.pipe()
396 pid = os.fork()
397 if pid == 0:
398 try:
399 os.close(rfd)
400 child_random = ssl.RAND_pseudo_bytes(16)[0]
401 self.assertEqual(len(child_random), 16)
402 os.write(wfd, child_random)
403 os.close(wfd)
404 except BaseException:
405 os._exit(1)
406 else:
407 os._exit(0)
408 else:
409 os.close(wfd)
410 self.addCleanup(os.close, rfd)
Victor Stinner278c1e12020-03-31 20:08:12 +0200411 support.wait_process(pid, exitcode=0)
Christian Heimesf77b4b22013-08-21 13:26:05 +0200412
413 child_random = os.read(rfd, 16)
414 self.assertEqual(len(child_random), 16)
415 parent_random = ssl.RAND_pseudo_bytes(16)[0]
416 self.assertEqual(len(parent_random), 16)
417
418 self.assertNotEqual(child_random, parent_random)
419
Christian Heimese6dac002018-08-30 07:25:49 +0200420 maxDiff = None
421
Antoine Pitrou480a1242010-04-28 21:37:09 +0000422 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000423 # note that this uses an 'unofficial' function in _ssl.c,
424 # provided solely for this test, to exercise the certificate
425 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100426 self.assertEqual(
427 ssl._ssl._test_decode_cert(CERTFILE),
428 CERTFILE_INFO
429 )
430 self.assertEqual(
431 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
432 SIGNED_CERTFILE_INFO
433 )
434
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200435 # Issue #13034: the subjectAltName in some certificates
436 # (notably projects.developer.nokia.com:443) wasn't parsed
437 p = ssl._ssl._test_decode_cert(NOKIACERT)
438 if support.verbose:
439 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
440 self.assertEqual(p['subjectAltName'],
441 (('DNS', 'projects.developer.nokia.com'),
442 ('DNS', 'projects.forum.nokia.com'))
443 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100444 # extra OCSP and AIA fields
445 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
446 self.assertEqual(p['caIssuers'],
447 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
448 self.assertEqual(p['crlDistributionPoints'],
449 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000450
Christian Heimesa37f5242019-01-15 23:47:42 +0100451 def test_parse_cert_CVE_2019_5010(self):
452 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
453 if support.verbose:
454 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
455 self.assertEqual(
456 p,
457 {
458 'issuer': (
459 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
460 'notAfter': 'Jun 14 18:00:58 2028 GMT',
461 'notBefore': 'Jun 18 18:00:58 2018 GMT',
462 'serialNumber': '02',
463 'subject': ((('countryName', 'UK'),),
464 (('commonName',
465 'codenomicon-vm-2.test.lal.cisco.com'),)),
466 'subjectAltName': (
467 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
468 'version': 3
469 }
470 )
471
Christian Heimes824f7f32013-08-17 00:54:47 +0200472 def test_parse_cert_CVE_2013_4238(self):
473 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
474 if support.verbose:
475 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
476 subject = ((('countryName', 'US'),),
477 (('stateOrProvinceName', 'Oregon'),),
478 (('localityName', 'Beaverton'),),
479 (('organizationName', 'Python Software Foundation'),),
480 (('organizationalUnitName', 'Python Core Development'),),
481 (('commonName', 'null.python.org\x00example.org'),),
482 (('emailAddress', 'python-dev@python.org'),))
483 self.assertEqual(p['subject'], subject)
484 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200485 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
486 san = (('DNS', 'altnull.python.org\x00example.com'),
487 ('email', 'null@python.org\x00user@example.org'),
488 ('URI', 'http://null.python.org\x00http://example.org'),
489 ('IP Address', '192.0.2.1'),
Christian Heimes2b7de662019-12-07 17:59:36 +0100490 ('IP Address', '2001:DB8:0:0:0:0:0:1'))
Christian Heimes157c9832013-08-25 14:12:41 +0200491 else:
492 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
493 san = (('DNS', 'altnull.python.org\x00example.com'),
494 ('email', 'null@python.org\x00user@example.org'),
495 ('URI', 'http://null.python.org\x00http://example.org'),
496 ('IP Address', '192.0.2.1'),
497 ('IP Address', '<invalid>'))
498
499 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200500
Christian Heimes1c03abd2016-09-06 23:25:35 +0200501 def test_parse_all_sans(self):
502 p = ssl._ssl._test_decode_cert(ALLSANFILE)
503 self.assertEqual(p['subjectAltName'],
504 (
505 ('DNS', 'allsans'),
506 ('othername', '<unsupported>'),
507 ('othername', '<unsupported>'),
508 ('email', 'user@example.org'),
509 ('DNS', 'www.example.org'),
510 ('DirName',
511 ((('countryName', 'XY'),),
512 (('localityName', 'Castle Anthrax'),),
513 (('organizationName', 'Python Software Foundation'),),
514 (('commonName', 'dirname example'),))),
515 ('URI', 'https://www.python.org/'),
516 ('IP Address', '127.0.0.1'),
Christian Heimes2b7de662019-12-07 17:59:36 +0100517 ('IP Address', '0:0:0:0:0:0:0:1'),
Christian Heimes1c03abd2016-09-06 23:25:35 +0200518 ('Registered ID', '1.2.3.4.5')
519 )
520 )
521
Antoine Pitrou480a1242010-04-28 21:37:09 +0000522 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000523 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000524 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000525 d1 = ssl.PEM_cert_to_DER_cert(pem)
526 p2 = ssl.DER_cert_to_PEM_cert(d1)
527 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000528 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000529 if not p2.startswith(ssl.PEM_HEADER + '\n'):
530 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
531 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
532 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000533
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000534 def test_openssl_version(self):
535 n = ssl.OPENSSL_VERSION_NUMBER
536 t = ssl.OPENSSL_VERSION_INFO
537 s = ssl.OPENSSL_VERSION
538 self.assertIsInstance(n, int)
539 self.assertIsInstance(t, tuple)
540 self.assertIsInstance(s, str)
541 # Some sanity checks follow
542 # >= 0.9
543 self.assertGreaterEqual(n, 0x900000)
Christian Heimes2b7de662019-12-07 17:59:36 +0100544 # < 4.0
545 self.assertLess(n, 0x40000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000546 major, minor, fix, patch, status = t
Christian Heimes2b7de662019-12-07 17:59:36 +0100547 self.assertGreaterEqual(major, 1)
548 self.assertLess(major, 4)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000549 self.assertGreaterEqual(minor, 0)
550 self.assertLess(minor, 256)
551 self.assertGreaterEqual(fix, 0)
552 self.assertLess(fix, 256)
553 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100554 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000555 self.assertGreaterEqual(status, 0)
556 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400557 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200558 if IS_LIBRESSL:
559 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100560 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400561 else:
562 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100563 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000564
Antoine Pitrou9d543662010-04-23 23:10:32 +0000565 @support.cpython_only
566 def test_refcycle(self):
567 # Issue #7943: an SSL object doesn't create reference cycles with
568 # itself.
569 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200570 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000571 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100572 with support.check_warnings(("", ResourceWarning)):
573 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100574 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000575
Antoine Pitroua468adc2010-09-14 14:43:44 +0000576 def test_wrapped_unconnected(self):
577 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200578 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000579 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200580 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100581 self.assertRaises(OSError, ss.recv, 1)
582 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
583 self.assertRaises(OSError, ss.recvfrom, 1)
584 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
585 self.assertRaises(OSError, ss.send, b'x')
586 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200587 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100588 self.assertRaises(NotImplementedError, ss.sendmsg,
589 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200590 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
591 self.assertRaises(NotImplementedError, ss.recvmsg_into,
592 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000593
Antoine Pitrou40f08742010-04-24 22:04:40 +0000594 def test_timeout(self):
595 # Issue #8524: when creating an SSL socket, the timeout of the
596 # original socket should be retained.
597 for timeout in (None, 0.0, 5.0):
598 s = socket.socket(socket.AF_INET)
599 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200600 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100601 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000602
Christian Heimesd0486372016-09-10 23:23:33 +0200603 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000604 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000605 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000606 "certfile must be specified",
607 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000608 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000609 "certfile must be specified for server-side operations",
610 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000611 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000612 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200613 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100614 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
615 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200616 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200617 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000618 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000619 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000620 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200621 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000622 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000623 ssl.wrap_socket(sock,
624 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000625 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200626 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000627 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000628 ssl.wrap_socket(sock,
629 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000630 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000631
Martin Panter3464ea22016-02-01 21:58:11 +0000632 def bad_cert_test(self, certfile):
633 """Check that trying to use the given client certificate fails"""
634 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
635 certfile)
636 sock = socket.socket()
637 self.addCleanup(sock.close)
638 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200639 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200640 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000641
642 def test_empty_cert(self):
643 """Wrapping with an empty cert file"""
644 self.bad_cert_test("nullcert.pem")
645
646 def test_malformed_cert(self):
647 """Wrapping with a badly formatted certificate (syntax error)"""
648 self.bad_cert_test("badcert.pem")
649
650 def test_malformed_key(self):
651 """Wrapping with a badly formatted key (syntax error)"""
652 self.bad_cert_test("badkey.pem")
653
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000654 def test_match_hostname(self):
655 def ok(cert, hostname):
656 ssl.match_hostname(cert, hostname)
657 def fail(cert, hostname):
658 self.assertRaises(ssl.CertificateError,
659 ssl.match_hostname, cert, hostname)
660
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100661 # -- Hostname matching --
662
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000663 cert = {'subject': ((('commonName', 'example.com'),),)}
664 ok(cert, 'example.com')
665 ok(cert, 'ExAmple.cOm')
666 fail(cert, 'www.example.com')
667 fail(cert, '.example.com')
668 fail(cert, 'example.org')
669 fail(cert, 'exampleXcom')
670
671 cert = {'subject': ((('commonName', '*.a.com'),),)}
672 ok(cert, 'foo.a.com')
673 fail(cert, 'bar.foo.a.com')
674 fail(cert, 'a.com')
675 fail(cert, 'Xa.com')
676 fail(cert, '.a.com')
677
Mandeep Singhede2ac92017-11-27 04:01:27 +0530678 # only match wildcards when they are the only thing
679 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000680 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530681 fail(cert, 'foo.com')
682 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000683 fail(cert, 'bar.com')
684 fail(cert, 'foo.a.com')
685 fail(cert, 'bar.foo.com')
686
Christian Heimes824f7f32013-08-17 00:54:47 +0200687 # NULL bytes are bad, CVE-2013-4073
688 cert = {'subject': ((('commonName',
689 'null.python.org\x00example.org'),),)}
690 ok(cert, 'null.python.org\x00example.org') # or raise an error?
691 fail(cert, 'example.org')
692 fail(cert, 'null.python.org')
693
Georg Brandl72c98d32013-10-27 07:16:53 +0100694 # error cases with wildcards
695 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
696 fail(cert, 'bar.foo.a.com')
697 fail(cert, 'a.com')
698 fail(cert, 'Xa.com')
699 fail(cert, '.a.com')
700
701 cert = {'subject': ((('commonName', 'a.*.com'),),)}
702 fail(cert, 'a.foo.com')
703 fail(cert, 'a..com')
704 fail(cert, 'a.com')
705
706 # wildcard doesn't match IDNA prefix 'xn--'
707 idna = 'püthon.python.org'.encode("idna").decode("ascii")
708 cert = {'subject': ((('commonName', idna),),)}
709 ok(cert, idna)
710 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
711 fail(cert, idna)
712 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
713 fail(cert, idna)
714
715 # wildcard in first fragment and IDNA A-labels in sequent fragments
716 # are supported.
717 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
718 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530719 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
720 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100721 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
722 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
723
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000724 # Slightly fake real-world example
725 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
726 'subject': ((('commonName', 'linuxfrz.org'),),),
727 'subjectAltName': (('DNS', 'linuxfr.org'),
728 ('DNS', 'linuxfr.com'),
729 ('othername', '<unsupported>'))}
730 ok(cert, 'linuxfr.org')
731 ok(cert, 'linuxfr.com')
732 # Not a "DNS" entry
733 fail(cert, '<unsupported>')
734 # When there is a subjectAltName, commonName isn't used
735 fail(cert, 'linuxfrz.org')
736
737 # A pristine real-world example
738 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
739 'subject': ((('countryName', 'US'),),
740 (('stateOrProvinceName', 'California'),),
741 (('localityName', 'Mountain View'),),
742 (('organizationName', 'Google Inc'),),
743 (('commonName', 'mail.google.com'),))}
744 ok(cert, 'mail.google.com')
745 fail(cert, 'gmail.com')
746 # Only commonName is considered
747 fail(cert, 'California')
748
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100749 # -- IPv4 matching --
750 cert = {'subject': ((('commonName', 'example.com'),),),
751 'subjectAltName': (('DNS', 'example.com'),
752 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200753 ('IP Address', '14.15.16.17'),
754 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100755 ok(cert, '10.11.12.13')
756 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200757 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
758 fail(cert, '127.1')
759 fail(cert, '14.15.16.17 ')
760 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100761 fail(cert, '14.15.16.18')
762 fail(cert, 'example.net')
763
764 # -- IPv6 matching --
Zackery Spytzc2cda632019-06-30 09:24:43 -0600765 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100766 cert = {'subject': ((('commonName', 'example.com'),),),
767 'subjectAltName': (
768 ('DNS', 'example.com'),
769 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
770 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
771 ok(cert, '2001::cafe')
772 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200773 fail(cert, '2003::baba ')
774 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100775 fail(cert, '2003::bebe')
776 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100777
778 # -- Miscellaneous --
779
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000780 # Neither commonName nor subjectAltName
781 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
782 'subject': ((('countryName', 'US'),),
783 (('stateOrProvinceName', 'California'),),
784 (('localityName', 'Mountain View'),),
785 (('organizationName', 'Google Inc'),))}
786 fail(cert, 'mail.google.com')
787
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200788 # No DNS entry in subjectAltName but a commonName
789 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
790 'subject': ((('countryName', 'US'),),
791 (('stateOrProvinceName', 'California'),),
792 (('localityName', 'Mountain View'),),
793 (('commonName', 'mail.google.com'),)),
794 'subjectAltName': (('othername', 'blabla'), )}
795 ok(cert, 'mail.google.com')
796
797 # No DNS entry subjectAltName and no commonName
798 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
799 'subject': ((('countryName', 'US'),),
800 (('stateOrProvinceName', 'California'),),
801 (('localityName', 'Mountain View'),),
802 (('organizationName', 'Google Inc'),)),
803 'subjectAltName': (('othername', 'blabla'),)}
804 fail(cert, 'google.com')
805
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000806 # Empty cert / no cert
807 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
808 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
809
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200810 # Issue #17980: avoid denials of service by refusing more than one
811 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100812 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
813 with self.assertRaisesRegex(
814 ssl.CertificateError,
815 "partial wildcards in leftmost label are not supported"):
816 ssl.match_hostname(cert, 'axxb.example.com')
817
818 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
819 with self.assertRaisesRegex(
820 ssl.CertificateError,
821 "wildcard can only be present in the leftmost label"):
822 ssl.match_hostname(cert, 'www.sub.example.com')
823
824 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
825 with self.assertRaisesRegex(
826 ssl.CertificateError,
827 "too many wildcards"):
828 ssl.match_hostname(cert, 'axxbxxc.example.com')
829
830 cert = {'subject': ((('commonName', '*'),),)}
831 with self.assertRaisesRegex(
832 ssl.CertificateError,
833 "sole wildcard without additional labels are not support"):
834 ssl.match_hostname(cert, 'host')
835
836 cert = {'subject': ((('commonName', '*.com'),),)}
837 with self.assertRaisesRegex(
838 ssl.CertificateError,
839 r"hostname 'com' doesn't match '\*.com'"):
840 ssl.match_hostname(cert, 'com')
841
842 # extra checks for _inet_paton()
843 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
844 with self.assertRaises(ValueError):
845 ssl._inet_paton(invalid)
846 for ipaddr in ['127.0.0.1', '192.168.0.1']:
847 self.assertTrue(ssl._inet_paton(ipaddr))
Zackery Spytzc2cda632019-06-30 09:24:43 -0600848 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100849 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
850 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200851
Antoine Pitroud5323212010-10-22 18:19:07 +0000852 def test_server_side(self):
853 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200854 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000855 with socket.socket() as sock:
856 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
857 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000858
Antoine Pitroud6494802011-07-21 01:11:30 +0200859 def test_unknown_channel_binding(self):
860 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200861 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200862 c = socket.socket(socket.AF_INET)
863 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200864 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100865 with self.assertRaises(ValueError):
866 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200867 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200868
869 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
870 "'tls-unique' channel binding not available")
871 def test_tls_unique_channel_binding(self):
872 # unconnected should return None for known type
873 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200874 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100875 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200876 # the same for server-side
877 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200878 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100879 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200880
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600881 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200882 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600883 r = repr(ss)
884 with self.assertWarns(ResourceWarning) as cm:
885 ss = None
886 support.gc_collect()
887 self.assertIn(r, str(cm.warning.args[0]))
888
Christian Heimes6d7ad132013-06-09 18:02:55 +0200889 def test_get_default_verify_paths(self):
890 paths = ssl.get_default_verify_paths()
891 self.assertEqual(len(paths), 6)
892 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
893
894 with support.EnvironmentVarGuard() as env:
895 env["SSL_CERT_DIR"] = CAPATH
896 env["SSL_CERT_FILE"] = CERTFILE
897 paths = ssl.get_default_verify_paths()
898 self.assertEqual(paths.cafile, CERTFILE)
899 self.assertEqual(paths.capath, CAPATH)
900
Christian Heimes44109d72013-11-22 01:51:30 +0100901 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
902 def test_enum_certificates(self):
903 self.assertTrue(ssl.enum_certificates("CA"))
904 self.assertTrue(ssl.enum_certificates("ROOT"))
905
906 self.assertRaises(TypeError, ssl.enum_certificates)
907 self.assertRaises(WindowsError, ssl.enum_certificates, "")
908
Christian Heimesc2d65e12013-11-22 16:13:55 +0100909 trust_oids = set()
910 for storename in ("CA", "ROOT"):
911 store = ssl.enum_certificates(storename)
912 self.assertIsInstance(store, list)
913 for element in store:
914 self.assertIsInstance(element, tuple)
915 self.assertEqual(len(element), 3)
916 cert, enc, trust = element
917 self.assertIsInstance(cert, bytes)
918 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Christian Heimes915cd3f2019-09-09 18:06:55 +0200919 self.assertIsInstance(trust, (frozenset, set, bool))
920 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100921 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100922
923 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100924 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200925
Christian Heimes46bebee2013-06-09 19:03:31 +0200926 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100927 def test_enum_crls(self):
928 self.assertTrue(ssl.enum_crls("CA"))
929 self.assertRaises(TypeError, ssl.enum_crls)
930 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200931
Christian Heimes44109d72013-11-22 01:51:30 +0100932 crls = ssl.enum_crls("CA")
933 self.assertIsInstance(crls, list)
934 for element in crls:
935 self.assertIsInstance(element, tuple)
936 self.assertEqual(len(element), 2)
937 self.assertIsInstance(element[0], bytes)
938 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200939
Christian Heimes46bebee2013-06-09 19:03:31 +0200940
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100941 def test_asn1object(self):
942 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
943 '1.3.6.1.5.5.7.3.1')
944
945 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
946 self.assertEqual(val, expected)
947 self.assertEqual(val.nid, 129)
948 self.assertEqual(val.shortname, 'serverAuth')
949 self.assertEqual(val.longname, 'TLS Web Server Authentication')
950 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
951 self.assertIsInstance(val, ssl._ASN1Object)
952 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
953
954 val = ssl._ASN1Object.fromnid(129)
955 self.assertEqual(val, expected)
956 self.assertIsInstance(val, ssl._ASN1Object)
957 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100958 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
959 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100960 for i in range(1000):
961 try:
962 obj = ssl._ASN1Object.fromnid(i)
963 except ValueError:
964 pass
965 else:
966 self.assertIsInstance(obj.nid, int)
967 self.assertIsInstance(obj.shortname, str)
968 self.assertIsInstance(obj.longname, str)
969 self.assertIsInstance(obj.oid, (str, type(None)))
970
971 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
972 self.assertEqual(val, expected)
973 self.assertIsInstance(val, ssl._ASN1Object)
974 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
975 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
976 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100977 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
978 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100979
Christian Heimes72d28502013-11-23 13:56:58 +0100980 def test_purpose_enum(self):
981 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
982 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
983 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
984 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
985 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
986 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
987 '1.3.6.1.5.5.7.3.1')
988
989 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
990 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
991 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
992 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
993 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
994 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
995 '1.3.6.1.5.5.7.3.2')
996
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100997 def test_unsupported_dtls(self):
998 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
999 self.addCleanup(s.close)
1000 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +02001001 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001002 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001004 with self.assertRaises(NotImplementedError) as cx:
1005 ctx.wrap_socket(s)
1006 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1007
Antoine Pitrouc695c952014-04-28 20:57:36 +02001008 def cert_time_ok(self, timestring, timestamp):
1009 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1010
1011 def cert_time_fail(self, timestring):
1012 with self.assertRaises(ValueError):
1013 ssl.cert_time_to_seconds(timestring)
1014
1015 @unittest.skipUnless(utc_offset(),
1016 'local time needs to be different from UTC')
1017 def test_cert_time_to_seconds_timezone(self):
1018 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1019 # results if local timezone is not UTC
1020 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1021 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1022
1023 def test_cert_time_to_seconds(self):
1024 timestring = "Jan 5 09:34:43 2018 GMT"
1025 ts = 1515144883.0
1026 self.cert_time_ok(timestring, ts)
1027 # accept keyword parameter, assert its name
1028 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1029 # accept both %e and %d (space or zero generated by strftime)
1030 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1031 # case-insensitive
1032 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1033 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1034 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1035 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1036 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1037 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1038 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1039 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1040
1041 newyear_ts = 1230768000.0
1042 # leap seconds
1043 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1044 # same timestamp
1045 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1046
1047 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1048 # allow 60th second (even if it is not a leap second)
1049 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1050 # allow 2nd leap second for compatibility with time.strptime()
1051 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1052 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1053
Mike53f7a7c2017-12-14 14:04:53 +03001054 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001055 # 99991231235959Z (rfc 5280)
1056 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1057
1058 @support.run_with_locale('LC_ALL', '')
1059 def test_cert_time_to_seconds_locale(self):
1060 # `cert_time_to_seconds()` should be locale independent
1061
1062 def local_february_name():
1063 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1064
1065 if local_february_name().lower() == 'feb':
1066 self.skipTest("locale-specific month name needs to be "
1067 "different from C locale")
1068
1069 # locale-independent
1070 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1071 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1072
Martin Panter3840b2a2016-03-27 01:53:46 +00001073 def test_connect_ex_error(self):
1074 server = socket.socket(socket.AF_INET)
1075 self.addCleanup(server.close)
1076 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001077 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001078 cert_reqs=ssl.CERT_REQUIRED)
1079 self.addCleanup(s.close)
1080 rc = s.connect_ex((HOST, port))
1081 # Issue #19919: Windows machines or VMs hosted on Windows
1082 # machines sometimes return EWOULDBLOCK.
1083 errors = (
1084 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1085 errno.EWOULDBLOCK,
1086 )
1087 self.assertIn(rc, errors)
1088
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001089
Antoine Pitrou152efa22010-05-16 18:19:27 +00001090class ContextTests(unittest.TestCase):
1091
1092 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001093 for protocol in PROTOCOLS:
1094 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001095 ctx = ssl.SSLContext()
1096 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001097 self.assertRaises(ValueError, ssl.SSLContext, -1)
1098 self.assertRaises(ValueError, ssl.SSLContext, 42)
1099
1100 def test_protocol(self):
1101 for proto in PROTOCOLS:
1102 ctx = ssl.SSLContext(proto)
1103 self.assertEqual(ctx.protocol, proto)
1104
1105 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001106 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001107 ctx.set_ciphers("ALL")
1108 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001109 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001110 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001111
Christian Heimes892d66e2018-01-29 14:10:18 +01001112 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1113 "Test applies only to Python default ciphers")
1114 def test_python_ciphers(self):
1115 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1116 ciphers = ctx.get_ciphers()
1117 for suite in ciphers:
1118 name = suite['name']
1119 self.assertNotIn("PSK", name)
1120 self.assertNotIn("SRP", name)
1121 self.assertNotIn("MD5", name)
1122 self.assertNotIn("RC4", name)
1123 self.assertNotIn("3DES", name)
1124
Christian Heimes25bfcd52016-09-06 00:04:45 +02001125 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1126 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001128 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001129 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001130 self.assertIn('AES256-GCM-SHA384', names)
1131 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001132
Antoine Pitroub5218772010-05-21 09:56:06 +00001133 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001134 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001135 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001136 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001137 # SSLContext also enables these by default
1138 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001139 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1140 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001141 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001142 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001143 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001144 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001145 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1146 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001147 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001148 # Ubuntu has OP_NO_SSLv3 forced on by default
1149 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001150 else:
1151 with self.assertRaises(ValueError):
1152 ctx.options = 0
1153
Christian Heimesa170fa12017-09-15 20:27:30 +02001154 def test_verify_mode_protocol(self):
1155 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001156 # Default value
1157 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1158 ctx.verify_mode = ssl.CERT_OPTIONAL
1159 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1160 ctx.verify_mode = ssl.CERT_REQUIRED
1161 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1162 ctx.verify_mode = ssl.CERT_NONE
1163 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1164 with self.assertRaises(TypeError):
1165 ctx.verify_mode = None
1166 with self.assertRaises(ValueError):
1167 ctx.verify_mode = 42
1168
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1170 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1171 self.assertFalse(ctx.check_hostname)
1172
1173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1174 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1175 self.assertTrue(ctx.check_hostname)
1176
Christian Heimes61d478c2018-01-27 15:51:38 +01001177 def test_hostname_checks_common_name(self):
1178 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1179 self.assertTrue(ctx.hostname_checks_common_name)
1180 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1181 ctx.hostname_checks_common_name = True
1182 self.assertTrue(ctx.hostname_checks_common_name)
1183 ctx.hostname_checks_common_name = False
1184 self.assertFalse(ctx.hostname_checks_common_name)
1185 ctx.hostname_checks_common_name = True
1186 self.assertTrue(ctx.hostname_checks_common_name)
1187 else:
1188 with self.assertRaises(AttributeError):
1189 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001190
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001191 @requires_minimum_version
Christian Heimesc9bc49c2019-09-11 19:24:47 +02001192 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001193 def test_min_max_version(self):
1194 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001195 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1196 # Fedora override the setting to TLS 1.0.
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001197 minimum_range = {
1198 # stock OpenSSL
1199 ssl.TLSVersion.MINIMUM_SUPPORTED,
1200 # Fedora 29 uses TLS 1.0 by default
1201 ssl.TLSVersion.TLSv1,
1202 # RHEL 8 uses TLS 1.2 by default
1203 ssl.TLSVersion.TLSv1_2
1204 }
torsava34864d12019-12-02 17:15:42 +01001205 maximum_range = {
1206 # stock OpenSSL
1207 ssl.TLSVersion.MAXIMUM_SUPPORTED,
1208 # Fedora 32 uses TLS 1.3 by default
1209 ssl.TLSVersion.TLSv1_3
1210 }
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001211
Christian Heimes34de2d32019-01-18 16:09:30 +01001212 self.assertIn(
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001213 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001214 )
torsava34864d12019-12-02 17:15:42 +01001215 self.assertIn(
1216 ctx.maximum_version, maximum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001217 )
1218
1219 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1220 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1221 self.assertEqual(
1222 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1223 )
1224 self.assertEqual(
1225 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1226 )
1227
1228 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1229 ctx.maximum_version = ssl.TLSVersion.TLSv1
1230 self.assertEqual(
1231 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1232 )
1233 self.assertEqual(
1234 ctx.maximum_version, ssl.TLSVersion.TLSv1
1235 )
1236
1237 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1238 self.assertEqual(
1239 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1240 )
1241
1242 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1243 self.assertIn(
1244 ctx.maximum_version,
1245 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1246 )
1247
1248 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1249 self.assertIn(
1250 ctx.minimum_version,
1251 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1252 )
1253
1254 with self.assertRaises(ValueError):
1255 ctx.minimum_version = 42
1256
1257 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1258
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001259 self.assertIn(
1260 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001261 )
1262 self.assertEqual(
1263 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1264 )
1265 with self.assertRaises(ValueError):
1266 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1267 with self.assertRaises(ValueError):
1268 ctx.maximum_version = ssl.TLSVersion.TLSv1
1269
1270
Christian Heimes2427b502013-11-23 11:24:32 +01001271 @unittest.skipUnless(have_verify_flags(),
1272 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001273 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001275 # default value
1276 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1277 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001278 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1279 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1280 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1281 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1282 ctx.verify_flags = ssl.VERIFY_DEFAULT
1283 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1284 # supports any value
1285 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1286 self.assertEqual(ctx.verify_flags,
1287 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1288 with self.assertRaises(TypeError):
1289 ctx.verify_flags = None
1290
Antoine Pitrou152efa22010-05-16 18:19:27 +00001291 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001292 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001293 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001294 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001295 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1296 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001297 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001298 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001299 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001300 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001301 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001302 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001303 ctx.load_cert_chain(EMPTYCERT)
1304 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001306 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1307 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1308 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001309 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001310 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001311 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001312 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001313 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001314 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1315 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001316 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001317 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001318 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001319 # Password protected key and cert
1320 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1321 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1322 ctx.load_cert_chain(CERTFILE_PROTECTED,
1323 password=bytearray(KEY_PASSWORD.encode()))
1324 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1325 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1326 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1327 bytearray(KEY_PASSWORD.encode()))
1328 with self.assertRaisesRegex(TypeError, "should be a string"):
1329 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1330 with self.assertRaises(ssl.SSLError):
1331 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1332 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1333 # openssl has a fixed limit on the password buffer.
1334 # PEM_BUFSIZE is generally set to 1kb.
1335 # Return a string larger than this.
1336 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1337 # Password callback
1338 def getpass_unicode():
1339 return KEY_PASSWORD
1340 def getpass_bytes():
1341 return KEY_PASSWORD.encode()
1342 def getpass_bytearray():
1343 return bytearray(KEY_PASSWORD.encode())
1344 def getpass_badpass():
1345 return "badpass"
1346 def getpass_huge():
1347 return b'a' * (1024 * 1024)
1348 def getpass_bad_type():
1349 return 9
1350 def getpass_exception():
1351 raise Exception('getpass error')
1352 class GetPassCallable:
1353 def __call__(self):
1354 return KEY_PASSWORD
1355 def getpass(self):
1356 return KEY_PASSWORD
1357 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1358 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1359 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1360 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1361 ctx.load_cert_chain(CERTFILE_PROTECTED,
1362 password=GetPassCallable().getpass)
1363 with self.assertRaises(ssl.SSLError):
1364 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1365 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1366 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1367 with self.assertRaisesRegex(TypeError, "must return a string"):
1368 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1369 with self.assertRaisesRegex(Exception, "getpass error"):
1370 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1371 # Make sure the password function isn't called if it isn't needed
1372 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001373
1374 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001375 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001376 ctx.load_verify_locations(CERTFILE)
1377 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1378 ctx.load_verify_locations(BYTES_CERTFILE)
1379 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1380 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001381 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001382 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001383 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001384 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001385 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001386 ctx.load_verify_locations(BADCERT)
1387 ctx.load_verify_locations(CERTFILE, CAPATH)
1388 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1389
Victor Stinner80f75e62011-01-29 11:31:20 +00001390 # Issue #10989: crash if the second argument type is invalid
1391 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1392
Christian Heimesefff7062013-11-21 03:35:02 +01001393 def test_load_verify_cadata(self):
1394 # test cadata
1395 with open(CAFILE_CACERT) as f:
1396 cacert_pem = f.read()
1397 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1398 with open(CAFILE_NEURONIO) as f:
1399 neuronio_pem = f.read()
1400 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1401
1402 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001403 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001404 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1405 ctx.load_verify_locations(cadata=cacert_pem)
1406 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1407 ctx.load_verify_locations(cadata=neuronio_pem)
1408 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1409 # cert already in hash table
1410 ctx.load_verify_locations(cadata=neuronio_pem)
1411 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1412
1413 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001414 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001415 combined = "\n".join((cacert_pem, neuronio_pem))
1416 ctx.load_verify_locations(cadata=combined)
1417 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1418
1419 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001420 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001421 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1422 neuronio_pem, "tail"]
1423 ctx.load_verify_locations(cadata="\n".join(combined))
1424 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1425
1426 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001427 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001428 ctx.load_verify_locations(cadata=cacert_der)
1429 ctx.load_verify_locations(cadata=neuronio_der)
1430 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1431 # cert already in hash table
1432 ctx.load_verify_locations(cadata=cacert_der)
1433 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1434
1435 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001436 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001437 combined = b"".join((cacert_der, neuronio_der))
1438 ctx.load_verify_locations(cadata=combined)
1439 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1440
1441 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001442 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001443 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1444
1445 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1446 ctx.load_verify_locations(cadata="broken")
1447 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1448 ctx.load_verify_locations(cadata=b"broken")
1449
1450
Paul Monsonf3550692019-06-19 13:09:54 -07001451 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001452 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001453 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001454 ctx.load_dh_params(DHFILE)
1455 if os.name != 'nt':
1456 ctx.load_dh_params(BYTES_DHFILE)
1457 self.assertRaises(TypeError, ctx.load_dh_params)
1458 self.assertRaises(TypeError, ctx.load_dh_params, None)
1459 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001460 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001461 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001462 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001463 ctx.load_dh_params(CERTFILE)
1464
Antoine Pitroub0182c82010-10-12 20:09:02 +00001465 def test_session_stats(self):
1466 for proto in PROTOCOLS:
1467 ctx = ssl.SSLContext(proto)
1468 self.assertEqual(ctx.session_stats(), {
1469 'number': 0,
1470 'connect': 0,
1471 'connect_good': 0,
1472 'connect_renegotiate': 0,
1473 'accept': 0,
1474 'accept_good': 0,
1475 'accept_renegotiate': 0,
1476 'hits': 0,
1477 'misses': 0,
1478 'timeouts': 0,
1479 'cache_full': 0,
1480 })
1481
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001482 def test_set_default_verify_paths(self):
1483 # There's not much we can do to test that it acts as expected,
1484 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001485 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001486 ctx.set_default_verify_paths()
1487
Antoine Pitrou501da612011-12-21 09:27:41 +01001488 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001489 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001490 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001491 ctx.set_ecdh_curve("prime256v1")
1492 ctx.set_ecdh_curve(b"prime256v1")
1493 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1494 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1495 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1496 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1497
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001498 @needs_sni
1499 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001500 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001501
1502 # set_servername_callback expects a callable, or None
1503 self.assertRaises(TypeError, ctx.set_servername_callback)
1504 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1505 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1506 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1507
1508 def dummycallback(sock, servername, ctx):
1509 pass
1510 ctx.set_servername_callback(None)
1511 ctx.set_servername_callback(dummycallback)
1512
1513 @needs_sni
1514 def test_sni_callback_refcycle(self):
1515 # Reference cycles through the servername callback are detected
1516 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001517 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001518 def dummycallback(sock, servername, ctx, cycle=ctx):
1519 pass
1520 ctx.set_servername_callback(dummycallback)
1521 wr = weakref.ref(ctx)
1522 del ctx, dummycallback
1523 gc.collect()
1524 self.assertIs(wr(), None)
1525
Christian Heimes9a5395a2013-06-17 15:44:12 +02001526 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001527 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001528 self.assertEqual(ctx.cert_store_stats(),
1529 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1530 ctx.load_cert_chain(CERTFILE)
1531 self.assertEqual(ctx.cert_store_stats(),
1532 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1533 ctx.load_verify_locations(CERTFILE)
1534 self.assertEqual(ctx.cert_store_stats(),
1535 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001536 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001537 self.assertEqual(ctx.cert_store_stats(),
1538 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1539
1540 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001541 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001542 self.assertEqual(ctx.get_ca_certs(), [])
1543 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1544 ctx.load_verify_locations(CERTFILE)
1545 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001546 # but CAFILE_CACERT is a CA cert
1547 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001548 self.assertEqual(ctx.get_ca_certs(),
1549 [{'issuer': ((('organizationName', 'Root CA'),),
1550 (('organizationalUnitName', 'http://www.cacert.org'),),
1551 (('commonName', 'CA Cert Signing Authority'),),
1552 (('emailAddress', 'support@cacert.org'),)),
1553 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1554 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1555 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001556 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001557 'subject': ((('organizationName', 'Root CA'),),
1558 (('organizationalUnitName', 'http://www.cacert.org'),),
1559 (('commonName', 'CA Cert Signing Authority'),),
1560 (('emailAddress', 'support@cacert.org'),)),
1561 'version': 3}])
1562
Martin Panterb55f8b72016-01-14 12:53:56 +00001563 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001564 pem = f.read()
1565 der = ssl.PEM_cert_to_DER_cert(pem)
1566 self.assertEqual(ctx.get_ca_certs(True), [der])
1567
Christian Heimes72d28502013-11-23 13:56:58 +01001568 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001569 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001570 ctx.load_default_certs()
1571
Christian Heimesa170fa12017-09-15 20:27:30 +02001572 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001573 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1574 ctx.load_default_certs()
1575
Christian Heimesa170fa12017-09-15 20:27:30 +02001576 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001577 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1578
Christian Heimesa170fa12017-09-15 20:27:30 +02001579 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001580 self.assertRaises(TypeError, ctx.load_default_certs, None)
1581 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1582
Benjamin Peterson91244e02014-10-03 18:17:15 -04001583 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001584 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001585 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001587 with support.EnvironmentVarGuard() as env:
1588 env["SSL_CERT_DIR"] = CAPATH
1589 env["SSL_CERT_FILE"] = CERTFILE
1590 ctx.load_default_certs()
1591 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1592
Benjamin Peterson91244e02014-10-03 18:17:15 -04001593 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001594 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001595 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001596 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001597 ctx.load_default_certs()
1598 stats = ctx.cert_store_stats()
1599
Christian Heimesa170fa12017-09-15 20:27:30 +02001600 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001601 with support.EnvironmentVarGuard() as env:
1602 env["SSL_CERT_DIR"] = CAPATH
1603 env["SSL_CERT_FILE"] = CERTFILE
1604 ctx.load_default_certs()
1605 stats["x509"] += 1
1606 self.assertEqual(ctx.cert_store_stats(), stats)
1607
Christian Heimes358cfd42016-09-10 22:43:48 +02001608 def _assert_context_options(self, ctx):
1609 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1610 if OP_NO_COMPRESSION != 0:
1611 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1612 OP_NO_COMPRESSION)
1613 if OP_SINGLE_DH_USE != 0:
1614 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1615 OP_SINGLE_DH_USE)
1616 if OP_SINGLE_ECDH_USE != 0:
1617 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1618 OP_SINGLE_ECDH_USE)
1619 if OP_CIPHER_SERVER_PREFERENCE != 0:
1620 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1621 OP_CIPHER_SERVER_PREFERENCE)
1622
Christian Heimes4c05b472013-11-23 15:58:30 +01001623 def test_create_default_context(self):
1624 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001625
Christian Heimesa170fa12017-09-15 20:27:30 +02001626 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001627 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001628 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001629 self._assert_context_options(ctx)
1630
Christian Heimes4c05b472013-11-23 15:58:30 +01001631 with open(SIGNING_CA) as f:
1632 cadata = f.read()
1633 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1634 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001635 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001636 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001637 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001638
1639 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001640 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001641 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001642 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001643
Christian Heimes67986f92013-11-23 22:43:47 +01001644 def test__create_stdlib_context(self):
1645 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001646 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001647 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001648 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001649 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001650
1651 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1652 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1653 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001654 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001655
1656 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001657 cert_reqs=ssl.CERT_REQUIRED,
1658 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001659 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1660 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001661 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001662 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001663
1664 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001665 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001666 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001667 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001668
Christian Heimes1aa9a752013-12-02 02:41:19 +01001669 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001670 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001671 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001672 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001673
Christian Heimese82c0342017-09-15 20:29:57 +02001674 # Auto set CERT_REQUIRED
1675 ctx.check_hostname = True
1676 self.assertTrue(ctx.check_hostname)
1677 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1678 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001679 ctx.verify_mode = ssl.CERT_REQUIRED
1680 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001681 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001682
Christian Heimese82c0342017-09-15 20:29:57 +02001683 # Changing verify_mode does not affect check_hostname
1684 ctx.check_hostname = False
1685 ctx.verify_mode = ssl.CERT_NONE
1686 ctx.check_hostname = False
1687 self.assertFalse(ctx.check_hostname)
1688 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1689 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001690 ctx.check_hostname = True
1691 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001692 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1693
1694 ctx.check_hostname = False
1695 ctx.verify_mode = ssl.CERT_OPTIONAL
1696 ctx.check_hostname = False
1697 self.assertFalse(ctx.check_hostname)
1698 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1699 # keep CERT_OPTIONAL
1700 ctx.check_hostname = True
1701 self.assertTrue(ctx.check_hostname)
1702 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001703
1704 # Cannot set CERT_NONE with check_hostname enabled
1705 with self.assertRaises(ValueError):
1706 ctx.verify_mode = ssl.CERT_NONE
1707 ctx.check_hostname = False
1708 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001709 ctx.verify_mode = ssl.CERT_NONE
1710 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001711
Christian Heimes5fe668c2016-09-12 00:01:11 +02001712 def test_context_client_server(self):
1713 # PROTOCOL_TLS_CLIENT has sane defaults
1714 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1715 self.assertTrue(ctx.check_hostname)
1716 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1717
1718 # PROTOCOL_TLS_SERVER has different but also sane defaults
1719 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1720 self.assertFalse(ctx.check_hostname)
1721 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1722
Christian Heimes4df60f12017-09-15 20:26:05 +02001723 def test_context_custom_class(self):
1724 class MySSLSocket(ssl.SSLSocket):
1725 pass
1726
1727 class MySSLObject(ssl.SSLObject):
1728 pass
1729
1730 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1731 ctx.sslsocket_class = MySSLSocket
1732 ctx.sslobject_class = MySSLObject
1733
1734 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1735 self.assertIsInstance(sock, MySSLSocket)
1736 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1737 self.assertIsInstance(obj, MySSLObject)
1738
Christian Heimes78c7d522019-06-03 21:00:10 +02001739 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1740 def test_num_tickest(self):
1741 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1742 self.assertEqual(ctx.num_tickets, 2)
1743 ctx.num_tickets = 1
1744 self.assertEqual(ctx.num_tickets, 1)
1745 ctx.num_tickets = 0
1746 self.assertEqual(ctx.num_tickets, 0)
1747 with self.assertRaises(ValueError):
1748 ctx.num_tickets = -1
1749 with self.assertRaises(TypeError):
1750 ctx.num_tickets = None
1751
1752 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1753 self.assertEqual(ctx.num_tickets, 2)
1754 with self.assertRaises(ValueError):
1755 ctx.num_tickets = 1
1756
Antoine Pitrou152efa22010-05-16 18:19:27 +00001757
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001758class SSLErrorTests(unittest.TestCase):
1759
1760 def test_str(self):
1761 # The str() of a SSLError doesn't include the errno
1762 e = ssl.SSLError(1, "foo")
1763 self.assertEqual(str(e), "foo")
1764 self.assertEqual(e.errno, 1)
1765 # Same for a subclass
1766 e = ssl.SSLZeroReturnError(1, "foo")
1767 self.assertEqual(str(e), "foo")
1768 self.assertEqual(e.errno, 1)
1769
Paul Monsonf3550692019-06-19 13:09:54 -07001770 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001771 def test_lib_reason(self):
1772 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001773 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001774 with self.assertRaises(ssl.SSLError) as cm:
1775 ctx.load_dh_params(CERTFILE)
1776 self.assertEqual(cm.exception.library, 'PEM')
1777 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1778 s = str(cm.exception)
1779 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1780
1781 def test_subclass(self):
1782 # Check that the appropriate SSLError subclass is raised
1783 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001784 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1785 ctx.check_hostname = False
1786 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001787 with socket.create_server(("127.0.0.1", 0)) as s:
1788 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001789 c.setblocking(False)
1790 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001791 with self.assertRaises(ssl.SSLWantReadError) as cm:
1792 c.do_handshake()
1793 s = str(cm.exception)
1794 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1795 # For compatibility
1796 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1797
1798
Christian Heimes61d478c2018-01-27 15:51:38 +01001799 def test_bad_server_hostname(self):
1800 ctx = ssl.create_default_context()
1801 with self.assertRaises(ValueError):
1802 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1803 server_hostname="")
1804 with self.assertRaises(ValueError):
1805 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1806 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001807 with self.assertRaises(TypeError):
1808 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1809 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001810
1811
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001812class MemoryBIOTests(unittest.TestCase):
1813
1814 def test_read_write(self):
1815 bio = ssl.MemoryBIO()
1816 bio.write(b'foo')
1817 self.assertEqual(bio.read(), b'foo')
1818 self.assertEqual(bio.read(), b'')
1819 bio.write(b'foo')
1820 bio.write(b'bar')
1821 self.assertEqual(bio.read(), b'foobar')
1822 self.assertEqual(bio.read(), b'')
1823 bio.write(b'baz')
1824 self.assertEqual(bio.read(2), b'ba')
1825 self.assertEqual(bio.read(1), b'z')
1826 self.assertEqual(bio.read(1), b'')
1827
1828 def test_eof(self):
1829 bio = ssl.MemoryBIO()
1830 self.assertFalse(bio.eof)
1831 self.assertEqual(bio.read(), b'')
1832 self.assertFalse(bio.eof)
1833 bio.write(b'foo')
1834 self.assertFalse(bio.eof)
1835 bio.write_eof()
1836 self.assertFalse(bio.eof)
1837 self.assertEqual(bio.read(2), b'fo')
1838 self.assertFalse(bio.eof)
1839 self.assertEqual(bio.read(1), b'o')
1840 self.assertTrue(bio.eof)
1841 self.assertEqual(bio.read(), b'')
1842 self.assertTrue(bio.eof)
1843
1844 def test_pending(self):
1845 bio = ssl.MemoryBIO()
1846 self.assertEqual(bio.pending, 0)
1847 bio.write(b'foo')
1848 self.assertEqual(bio.pending, 3)
1849 for i in range(3):
1850 bio.read(1)
1851 self.assertEqual(bio.pending, 3-i-1)
1852 for i in range(3):
1853 bio.write(b'x')
1854 self.assertEqual(bio.pending, i+1)
1855 bio.read()
1856 self.assertEqual(bio.pending, 0)
1857
1858 def test_buffer_types(self):
1859 bio = ssl.MemoryBIO()
1860 bio.write(b'foo')
1861 self.assertEqual(bio.read(), b'foo')
1862 bio.write(bytearray(b'bar'))
1863 self.assertEqual(bio.read(), b'bar')
1864 bio.write(memoryview(b'baz'))
1865 self.assertEqual(bio.read(), b'baz')
1866
1867 def test_error_types(self):
1868 bio = ssl.MemoryBIO()
1869 self.assertRaises(TypeError, bio.write, 'foo')
1870 self.assertRaises(TypeError, bio.write, None)
1871 self.assertRaises(TypeError, bio.write, True)
1872 self.assertRaises(TypeError, bio.write, 1)
1873
1874
Christian Heimes9d50ab52018-02-27 10:17:30 +01001875class SSLObjectTests(unittest.TestCase):
1876 def test_private_init(self):
1877 bio = ssl.MemoryBIO()
1878 with self.assertRaisesRegex(TypeError, "public constructor"):
1879 ssl.SSLObject(bio, bio)
1880
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001881 def test_unwrap(self):
1882 client_ctx, server_ctx, hostname = testing_context()
1883 c_in = ssl.MemoryBIO()
1884 c_out = ssl.MemoryBIO()
1885 s_in = ssl.MemoryBIO()
1886 s_out = ssl.MemoryBIO()
1887 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1888 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1889
1890 # Loop on the handshake for a bit to get it settled
1891 for _ in range(5):
1892 try:
1893 client.do_handshake()
1894 except ssl.SSLWantReadError:
1895 pass
1896 if c_out.pending:
1897 s_in.write(c_out.read())
1898 try:
1899 server.do_handshake()
1900 except ssl.SSLWantReadError:
1901 pass
1902 if s_out.pending:
1903 c_in.write(s_out.read())
1904 # Now the handshakes should be complete (don't raise WantReadError)
1905 client.do_handshake()
1906 server.do_handshake()
1907
1908 # Now if we unwrap one side unilaterally, it should send close-notify
1909 # and raise WantReadError:
1910 with self.assertRaises(ssl.SSLWantReadError):
1911 client.unwrap()
1912
1913 # But server.unwrap() does not raise, because it reads the client's
1914 # close-notify:
1915 s_in.write(c_out.read())
1916 server.unwrap()
1917
1918 # And now that the client gets the server's close-notify, it doesn't
1919 # raise either.
1920 c_in.write(s_out.read())
1921 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001922
Martin Panter3840b2a2016-03-27 01:53:46 +00001923class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 """Tests that connect to a simple server running in the background"""
1925
1926 def setUp(self):
1927 server = ThreadedEchoServer(SIGNED_CERTFILE)
1928 self.server_addr = (HOST, server.port)
1929 server.__enter__()
1930 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001931
Antoine Pitrou480a1242010-04-28 21:37:09 +00001932 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001933 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001934 cert_reqs=ssl.CERT_NONE) as s:
1935 s.connect(self.server_addr)
1936 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001937 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001938
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001940 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 cert_reqs=ssl.CERT_REQUIRED,
1942 ca_certs=SIGNING_CA) as s:
1943 s.connect(self.server_addr)
1944 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001945 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001946
Martin Panter3840b2a2016-03-27 01:53:46 +00001947 def test_connect_fail(self):
1948 # This should fail because we have no verification certs. Connection
1949 # failure crashes ThreadedEchoServer, so run this in an independent
1950 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001951 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001952 cert_reqs=ssl.CERT_REQUIRED)
1953 self.addCleanup(s.close)
1954 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1955 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001956
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001957 def test_connect_ex(self):
1958 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001959 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 cert_reqs=ssl.CERT_REQUIRED,
1961 ca_certs=SIGNING_CA)
1962 self.addCleanup(s.close)
1963 self.assertEqual(0, s.connect_ex(self.server_addr))
1964 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001965
1966 def test_non_blocking_connect_ex(self):
1967 # Issue #11326: non-blocking connect_ex() should allow handshake
1968 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001969 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001970 cert_reqs=ssl.CERT_REQUIRED,
1971 ca_certs=SIGNING_CA,
1972 do_handshake_on_connect=False)
1973 self.addCleanup(s.close)
1974 s.setblocking(False)
1975 rc = s.connect_ex(self.server_addr)
1976 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1977 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1978 # Wait for connect to finish
1979 select.select([], [s], [], 5.0)
1980 # Non-blocking handshake
1981 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001982 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001983 s.do_handshake()
1984 break
1985 except ssl.SSLWantReadError:
1986 select.select([s], [], [], 5.0)
1987 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001988 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001989 # SSL established
1990 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001991
Antoine Pitrou152efa22010-05-16 18:19:27 +00001992 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001993 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1996 s.connect(self.server_addr)
1997 self.assertEqual({}, s.getpeercert())
1998 # Same with a server hostname
1999 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2000 server_hostname="dummy") as s:
2001 s.connect(self.server_addr)
2002 ctx.verify_mode = ssl.CERT_REQUIRED
2003 # This should succeed because we specify the root cert
2004 ctx.load_verify_locations(SIGNING_CA)
2005 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2006 s.connect(self.server_addr)
2007 cert = s.getpeercert()
2008 self.assertTrue(cert)
2009
2010 def test_connect_with_context_fail(self):
2011 # This should fail because we have no verification certs. Connection
2012 # failure crashes ThreadedEchoServer, so run this in an independent
2013 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002015 ctx.verify_mode = ssl.CERT_REQUIRED
2016 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2017 self.addCleanup(s.close)
2018 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2019 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002020
2021 def test_connect_capath(self):
2022 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002023 # NOTE: the subject hashing algorithm has been changed between
2024 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2025 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002026 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002027 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 ctx.verify_mode = ssl.CERT_REQUIRED
2029 ctx.load_verify_locations(capath=CAPATH)
2030 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2031 s.connect(self.server_addr)
2032 cert = s.getpeercert()
2033 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002034
Martin Panter3840b2a2016-03-27 01:53:46 +00002035 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002036 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002037 ctx.verify_mode = ssl.CERT_REQUIRED
2038 ctx.load_verify_locations(capath=BYTES_CAPATH)
2039 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2040 s.connect(self.server_addr)
2041 cert = s.getpeercert()
2042 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002043
Christian Heimesefff7062013-11-21 03:35:02 +01002044 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002045 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002046 pem = f.read()
2047 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002049 ctx.verify_mode = ssl.CERT_REQUIRED
2050 ctx.load_verify_locations(cadata=pem)
2051 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2052 s.connect(self.server_addr)
2053 cert = s.getpeercert()
2054 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002055
Martin Panter3840b2a2016-03-27 01:53:46 +00002056 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002058 ctx.verify_mode = ssl.CERT_REQUIRED
2059 ctx.load_verify_locations(cadata=der)
2060 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2061 s.connect(self.server_addr)
2062 cert = s.getpeercert()
2063 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002064
Antoine Pitroue3220242010-04-24 11:13:53 +00002065 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2066 def test_makefile_close(self):
2067 # Issue #5238: creating a file-like object with makefile() shouldn't
2068 # delay closing the underlying "real socket" (here tested with its
2069 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002070 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002071 ss.connect(self.server_addr)
2072 fd = ss.fileno()
2073 f = ss.makefile()
2074 f.close()
2075 # The fd is still open
2076 os.read(fd, 0)
2077 # Closing the SSL socket should close the fd too
2078 ss.close()
2079 gc.collect()
2080 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002081 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002082 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002083
Antoine Pitrou480a1242010-04-28 21:37:09 +00002084 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002085 s = socket.socket(socket.AF_INET)
2086 s.connect(self.server_addr)
2087 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002088 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002089 cert_reqs=ssl.CERT_NONE,
2090 do_handshake_on_connect=False)
2091 self.addCleanup(s.close)
2092 count = 0
2093 while True:
2094 try:
2095 count += 1
2096 s.do_handshake()
2097 break
2098 except ssl.SSLWantReadError:
2099 select.select([s], [], [])
2100 except ssl.SSLWantWriteError:
2101 select.select([], [s], [])
2102 if support.verbose:
2103 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002104
Antoine Pitrou480a1242010-04-28 21:37:09 +00002105 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002106 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002107
Martin Panter3840b2a2016-03-27 01:53:46 +00002108 def test_get_server_certificate_fail(self):
2109 # Connection failure crashes ThreadedEchoServer, so run this in an
2110 # independent test method
2111 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002112
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002113 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002114 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002115 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2116 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002117 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002118 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2119 s.connect(self.server_addr)
2120 # Error checking can happen at instantiation or when connecting
2121 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2122 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002123 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002124 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2125 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002126
Christian Heimes9a5395a2013-06-17 15:44:12 +02002127 def test_get_ca_certs_capath(self):
2128 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002130 ctx.load_verify_locations(capath=CAPATH)
2131 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002132 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2133 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002134 s.connect(self.server_addr)
2135 cert = s.getpeercert()
2136 self.assertTrue(cert)
2137 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002138
Christian Heimes575596e2013-12-15 21:49:17 +01002139 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002140 def test_context_setget(self):
2141 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002142 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2143 ctx1.load_verify_locations(capath=CAPATH)
2144 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2145 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002146 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002147 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002148 ss.connect(self.server_addr)
2149 self.assertIs(ss.context, ctx1)
2150 self.assertIs(ss._sslobj.context, ctx1)
2151 ss.context = ctx2
2152 self.assertIs(ss.context, ctx2)
2153 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002154
2155 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2156 # A simple IO loop. Call func(*args) depending on the error we get
2157 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
Victor Stinner0d63bac2019-12-11 11:30:03 +01002158 timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
Victor Stinner50a72af2017-09-11 09:34:24 -07002159 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002160 count = 0
2161 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002162 if time.monotonic() > deadline:
2163 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002164 errno = None
2165 count += 1
2166 try:
2167 ret = func(*args)
2168 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002169 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002170 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002171 raise
2172 errno = e.errno
2173 # Get any data from the outgoing BIO irrespective of any error, and
2174 # send it to the socket.
2175 buf = outgoing.read()
2176 sock.sendall(buf)
2177 # If there's no error, we're done. For WANT_READ, we need to get
2178 # data from the socket and put it in the incoming BIO.
2179 if errno is None:
2180 break
2181 elif errno == ssl.SSL_ERROR_WANT_READ:
2182 buf = sock.recv(32768)
2183 if buf:
2184 incoming.write(buf)
2185 else:
2186 incoming.write_eof()
2187 if support.verbose:
2188 sys.stdout.write("Needed %d calls to complete %s().\n"
2189 % (count, func.__name__))
2190 return ret
2191
Martin Panter3840b2a2016-03-27 01:53:46 +00002192 def test_bio_handshake(self):
2193 sock = socket.socket(socket.AF_INET)
2194 self.addCleanup(sock.close)
2195 sock.connect(self.server_addr)
2196 incoming = ssl.MemoryBIO()
2197 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002198 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2199 self.assertTrue(ctx.check_hostname)
2200 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002201 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002202 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2203 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002204 self.assertIs(sslobj._sslobj.owner, sslobj)
2205 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002206 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002207 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002208 self.assertRaises(ValueError, sslobj.getpeercert)
2209 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2210 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2211 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2212 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002213 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002214 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002215 self.assertTrue(sslobj.getpeercert())
2216 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2217 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2218 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002219 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002220 except ssl.SSLSyscallError:
2221 # If the server shuts down the TCP connection without sending a
2222 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2223 pass
2224 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2225
2226 def test_bio_read_write_data(self):
2227 sock = socket.socket(socket.AF_INET)
2228 self.addCleanup(sock.close)
2229 sock.connect(self.server_addr)
2230 incoming = ssl.MemoryBIO()
2231 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002232 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002233 ctx.verify_mode = ssl.CERT_NONE
2234 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2235 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2236 req = b'FOO\n'
2237 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2238 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2239 self.assertEqual(buf, b'foo\n')
2240 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002241
2242
Martin Panter3840b2a2016-03-27 01:53:46 +00002243class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002244
Martin Panter3840b2a2016-03-27 01:53:46 +00002245 def test_timeout_connect_ex(self):
2246 # Issue #12065: on a timeout, connect_ex() should return the original
2247 # errno (mimicking the behaviour of non-SSL sockets).
2248 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002249 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002250 cert_reqs=ssl.CERT_REQUIRED,
2251 do_handshake_on_connect=False)
2252 self.addCleanup(s.close)
2253 s.settimeout(0.0000001)
2254 rc = s.connect_ex((REMOTE_HOST, 443))
2255 if rc == 0:
2256 self.skipTest("REMOTE_HOST responded too quickly")
2257 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2258
2259 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2260 def test_get_server_certificate_ipv6(self):
2261 with support.transient_internet('ipv6.google.com'):
2262 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2263 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2264
Martin Panter3840b2a2016-03-27 01:53:46 +00002265
2266def _test_get_server_certificate(test, host, port, cert=None):
2267 pem = ssl.get_server_certificate((host, port))
2268 if not pem:
2269 test.fail("No server certificate on %s:%s!" % (host, port))
2270
2271 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2272 if not pem:
2273 test.fail("No server certificate on %s:%s!" % (host, port))
2274 if support.verbose:
2275 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2276
2277def _test_get_server_certificate_fail(test, host, port):
2278 try:
2279 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2280 except ssl.SSLError as x:
2281 #should fail
2282 if support.verbose:
2283 sys.stdout.write("%s\n" % x)
2284 else:
2285 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2286
2287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002288from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002289
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002290class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002292 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002293
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 """A mildly complicated class, because we want it to work both
2295 with and without the SSL wrapper around the socket connection, so
2296 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002297
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002298 def __init__(self, server, connsock, addr):
2299 self.server = server
2300 self.running = False
2301 self.sock = connsock
2302 self.addr = addr
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03002303 self.sock.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002304 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002305 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002306 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 def wrap_conn(self):
2309 try:
2310 self.sslconn = self.server.context.wrap_socket(
2311 self.sock, server_side=True)
2312 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2313 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002314 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002315 # We treat ConnectionResetError as though it were an
2316 # SSLError - OpenSSL on Ubuntu abruptly closes the
2317 # connection when asked to use an unsupported protocol.
2318 #
Christian Heimes529525f2018-05-23 22:24:45 +02002319 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2320 # tries to send session tickets after handshake.
2321 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002322 #
2323 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2324 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002325 self.server.conn_errors.append(str(e))
2326 if self.server.chatty:
2327 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2328 self.running = False
2329 self.close()
2330 return False
2331 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002332 # OSError may occur with wrong protocols, e.g. both
2333 # sides use PROTOCOL_TLS_SERVER.
2334 #
2335 # XXX Various errors can have happened here, for example
2336 # a mismatching protocol version, an invalid certificate,
2337 # or a low-level bug. This should be made more discriminating.
2338 #
2339 # bpo-31323: Store the exception as string to prevent
2340 # a reference leak: server -> conn_errors -> exception
2341 # -> traceback -> self (ConnectionHandler) -> server
2342 self.server.conn_errors.append(str(e))
2343 if self.server.chatty:
2344 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2345 self.running = False
2346 self.server.stop()
2347 self.close()
2348 return False
2349 else:
2350 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2351 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2352 cert = self.sslconn.getpeercert()
2353 if support.verbose and self.server.chatty:
2354 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2355 cert_binary = self.sslconn.getpeercert(True)
2356 if support.verbose and self.server.chatty:
2357 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2358 cipher = self.sslconn.cipher()
2359 if support.verbose and self.server.chatty:
2360 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2361 sys.stdout.write(" server: selected protocol is now "
2362 + str(self.sslconn.selected_npn_protocol()) + "\n")
2363 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002364
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002365 def read(self):
2366 if self.sslconn:
2367 return self.sslconn.read()
2368 else:
2369 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002370
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002371 def write(self, bytes):
2372 if self.sslconn:
2373 return self.sslconn.write(bytes)
2374 else:
2375 return self.sock.send(bytes)
2376
2377 def close(self):
2378 if self.sslconn:
2379 self.sslconn.close()
2380 else:
2381 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002382
Antoine Pitrou480a1242010-04-28 21:37:09 +00002383 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002384 self.running = True
2385 if not self.server.starttls_server:
2386 if not self.wrap_conn():
2387 return
2388 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002389 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002390 msg = self.read()
2391 stripped = msg.strip()
2392 if not stripped:
2393 # eof, so quit this handler
2394 self.running = False
2395 try:
2396 self.sock = self.sslconn.unwrap()
2397 except OSError:
2398 # Many tests shut the TCP connection down
2399 # without an SSL shutdown. This causes
2400 # unwrap() to raise OSError with errno=0!
2401 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002402 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002403 self.sslconn = None
2404 self.close()
2405 elif stripped == b'over':
2406 if support.verbose and self.server.connectionchatty:
2407 sys.stdout.write(" server: client closed connection\n")
2408 self.close()
2409 return
2410 elif (self.server.starttls_server and
2411 stripped == b'STARTTLS'):
2412 if support.verbose and self.server.connectionchatty:
2413 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2414 self.write(b"OK\n")
2415 if not self.wrap_conn():
2416 return
2417 elif (self.server.starttls_server and self.sslconn
2418 and stripped == b'ENDTLS'):
2419 if support.verbose and self.server.connectionchatty:
2420 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2421 self.write(b"OK\n")
2422 self.sock = self.sslconn.unwrap()
2423 self.sslconn = None
2424 if support.verbose and self.server.connectionchatty:
2425 sys.stdout.write(" server: connection is now unencrypted...\n")
2426 elif stripped == b'CB tls-unique':
2427 if support.verbose and self.server.connectionchatty:
2428 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2429 data = self.sslconn.get_channel_binding("tls-unique")
2430 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002431 elif stripped == b'PHA':
2432 if support.verbose and self.server.connectionchatty:
2433 sys.stdout.write(" server: initiating post handshake auth\n")
2434 try:
2435 self.sslconn.verify_client_post_handshake()
2436 except ssl.SSLError as e:
2437 self.write(repr(e).encode("us-ascii") + b"\n")
2438 else:
2439 self.write(b"OK\n")
2440 elif stripped == b'HASCERT':
2441 if self.sslconn.getpeercert() is not None:
2442 self.write(b'TRUE\n')
2443 else:
2444 self.write(b'FALSE\n')
2445 elif stripped == b'GETCERT':
2446 cert = self.sslconn.getpeercert()
2447 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002448 else:
2449 if (support.verbose and
2450 self.server.connectionchatty):
2451 ctype = (self.sslconn and "encrypted") or "unencrypted"
2452 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2453 % (msg, ctype, msg.lower(), ctype))
2454 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002455 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002456 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2457 # when connection is not shut down gracefully.
2458 if self.server.chatty and support.verbose:
2459 sys.stdout.write(
2460 " Connection reset by peer: {}\n".format(
2461 self.addr)
2462 )
2463 self.close()
2464 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002465 except ssl.SSLError as err:
2466 # On Windows sometimes test_pha_required_nocert receives the
2467 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2468 # before the 'tlsv13 alert certificate required' exception.
2469 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2470 # is received test_pha_required_nocert fails with ConnectionResetError
2471 # because the underlying socket is closed
2472 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2473 if self.server.chatty and support.verbose:
2474 sys.stdout.write(err.args[1])
2475 # test_pha_required_nocert is expecting this exception
2476 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002477 except OSError:
2478 if self.server.chatty:
2479 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002480 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002481 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002482
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 # normally, we'd just stop here, but for the test
2484 # harness, we want to stop the server
2485 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002486
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002487 def __init__(self, certificate=None, ssl_version=None,
2488 certreqs=None, cacerts=None,
2489 chatty=True, connectionchatty=False, starttls_server=False,
2490 npn_protocols=None, alpn_protocols=None,
2491 ciphers=None, context=None):
2492 if context:
2493 self.context = context
2494 else:
2495 self.context = ssl.SSLContext(ssl_version
2496 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002497 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 self.context.verify_mode = (certreqs if certreqs is not None
2499 else ssl.CERT_NONE)
2500 if cacerts:
2501 self.context.load_verify_locations(cacerts)
2502 if certificate:
2503 self.context.load_cert_chain(certificate)
2504 if npn_protocols:
2505 self.context.set_npn_protocols(npn_protocols)
2506 if alpn_protocols:
2507 self.context.set_alpn_protocols(alpn_protocols)
2508 if ciphers:
2509 self.context.set_ciphers(ciphers)
2510 self.chatty = chatty
2511 self.connectionchatty = connectionchatty
2512 self.starttls_server = starttls_server
2513 self.sock = socket.socket()
2514 self.port = support.bind_port(self.sock)
2515 self.flag = None
2516 self.active = False
2517 self.selected_npn_protocols = []
2518 self.selected_alpn_protocols = []
2519 self.shared_ciphers = []
2520 self.conn_errors = []
2521 threading.Thread.__init__(self)
2522 self.daemon = True
2523
2524 def __enter__(self):
2525 self.start(threading.Event())
2526 self.flag.wait()
2527 return self
2528
2529 def __exit__(self, *args):
2530 self.stop()
2531 self.join()
2532
2533 def start(self, flag=None):
2534 self.flag = flag
2535 threading.Thread.start(self)
2536
2537 def run(self):
2538 self.sock.settimeout(0.05)
2539 self.sock.listen()
2540 self.active = True
2541 if self.flag:
2542 # signal an event
2543 self.flag.set()
2544 while self.active:
2545 try:
2546 newconn, connaddr = self.sock.accept()
2547 if support.verbose and self.chatty:
2548 sys.stdout.write(' server: new connection from '
2549 + repr(connaddr) + '\n')
2550 handler = self.ConnectionHandler(self, newconn, connaddr)
2551 handler.start()
2552 handler.join()
2553 except socket.timeout:
2554 pass
2555 except KeyboardInterrupt:
2556 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002557 except BaseException as e:
2558 if support.verbose and self.chatty:
2559 sys.stdout.write(
2560 ' connection handling failed: ' + repr(e) + '\n')
2561
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002562 self.sock.close()
2563
2564 def stop(self):
2565 self.active = False
2566
2567class AsyncoreEchoServer(threading.Thread):
2568
2569 # this one's based on asyncore.dispatcher
2570
2571 class EchoServer (asyncore.dispatcher):
2572
2573 class ConnectionHandler(asyncore.dispatcher_with_send):
2574
2575 def __init__(self, conn, certfile):
2576 self.socket = test_wrap_socket(conn, server_side=True,
2577 certfile=certfile,
2578 do_handshake_on_connect=False)
2579 asyncore.dispatcher_with_send.__init__(self, self.socket)
2580 self._ssl_accepting = True
2581 self._do_ssl_handshake()
2582
2583 def readable(self):
2584 if isinstance(self.socket, ssl.SSLSocket):
2585 while self.socket.pending() > 0:
2586 self.handle_read_event()
2587 return True
2588
2589 def _do_ssl_handshake(self):
2590 try:
2591 self.socket.do_handshake()
2592 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2593 return
2594 except ssl.SSLEOFError:
2595 return self.handle_close()
2596 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002597 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002598 except OSError as err:
2599 if err.args[0] == errno.ECONNABORTED:
2600 return self.handle_close()
2601 else:
2602 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002603
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002604 def handle_read(self):
2605 if self._ssl_accepting:
2606 self._do_ssl_handshake()
2607 else:
2608 data = self.recv(1024)
2609 if support.verbose:
2610 sys.stdout.write(" server: read %s from client\n" % repr(data))
2611 if not data:
2612 self.close()
2613 else:
2614 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002615
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002616 def handle_close(self):
2617 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002618 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002619 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002620
2621 def handle_error(self):
2622 raise
2623
Trent Nelson78520002008-04-10 20:54:35 +00002624 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002625 self.certfile = certfile
2626 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2627 self.port = support.bind_port(sock, '')
2628 asyncore.dispatcher.__init__(self, sock)
2629 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002630
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002631 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002632 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002633 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2634 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002635
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002636 def handle_error(self):
2637 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002638
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002639 def __init__(self, certfile):
2640 self.flag = None
2641 self.active = False
2642 self.server = self.EchoServer(certfile)
2643 self.port = self.server.port
2644 threading.Thread.__init__(self)
2645 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002646
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002647 def __str__(self):
2648 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002649
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650 def __enter__(self):
2651 self.start(threading.Event())
2652 self.flag.wait()
2653 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002654
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002655 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002656 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002657 sys.stdout.write(" cleanup: stopping server.\n")
2658 self.stop()
2659 if support.verbose:
2660 sys.stdout.write(" cleanup: joining server thread.\n")
2661 self.join()
2662 if support.verbose:
2663 sys.stdout.write(" cleanup: successfully joined.\n")
2664 # make sure that ConnectionHandler is removed from socket_map
2665 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002666
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002667 def start (self, flag=None):
2668 self.flag = flag
2669 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002670
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671 def run(self):
2672 self.active = True
2673 if self.flag:
2674 self.flag.set()
2675 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002676 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002677 asyncore.loop(1)
2678 except:
2679 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002680
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002681 def stop(self):
2682 self.active = False
2683 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002684
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685def server_params_test(client_context, server_context, indata=b"FOO\n",
2686 chatty=True, connectionchatty=False, sni_name=None,
2687 session=None):
2688 """
2689 Launch a server, connect a client to it and try various reads
2690 and writes.
2691 """
2692 stats = {}
2693 server = ThreadedEchoServer(context=server_context,
2694 chatty=chatty,
2695 connectionchatty=False)
2696 with server:
2697 with client_context.wrap_socket(socket.socket(),
2698 server_hostname=sni_name, session=session) as s:
2699 s.connect((HOST, server.port))
2700 for arg in [indata, bytearray(indata), memoryview(indata)]:
2701 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002702 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002703 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002704 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002706 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002707 if connectionchatty:
2708 if support.verbose:
2709 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002710 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002711 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002712 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2713 % (outdata[:20], len(outdata),
2714 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002715 s.write(b"over\n")
2716 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002717 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002718 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002719 stats.update({
2720 'compression': s.compression(),
2721 'cipher': s.cipher(),
2722 'peercert': s.getpeercert(),
2723 'client_alpn_protocol': s.selected_alpn_protocol(),
2724 'client_npn_protocol': s.selected_npn_protocol(),
2725 'version': s.version(),
2726 'session_reused': s.session_reused,
2727 'session': s.session,
2728 })
2729 s.close()
2730 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2731 stats['server_npn_protocols'] = server.selected_npn_protocols
2732 stats['server_shared_ciphers'] = server.shared_ciphers
2733 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002734
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002735def try_protocol_combo(server_protocol, client_protocol, expect_success,
2736 certsreqs=None, server_options=0, client_options=0):
2737 """
2738 Try to SSL-connect using *client_protocol* to *server_protocol*.
2739 If *expect_success* is true, assert that the connection succeeds,
2740 if it's false, assert that the connection fails.
2741 Also, if *expect_success* is a string, assert that it is the protocol
2742 version actually used by the connection.
2743 """
2744 if certsreqs is None:
2745 certsreqs = ssl.CERT_NONE
2746 certtype = {
2747 ssl.CERT_NONE: "CERT_NONE",
2748 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2749 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2750 }[certsreqs]
2751 if support.verbose:
2752 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2753 sys.stdout.write(formatstr %
2754 (ssl.get_protocol_name(client_protocol),
2755 ssl.get_protocol_name(server_protocol),
2756 certtype))
2757 client_context = ssl.SSLContext(client_protocol)
2758 client_context.options |= client_options
2759 server_context = ssl.SSLContext(server_protocol)
2760 server_context.options |= server_options
2761
Victor Stinner3ef63442019-02-19 18:06:03 +01002762 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2763 if (min_version is not None
2764 # SSLContext.minimum_version is only available on recent OpenSSL
2765 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2766 and hasattr(server_context, 'minimum_version')
2767 and server_protocol == ssl.PROTOCOL_TLS
2768 and server_context.minimum_version > min_version):
2769 # If OpenSSL configuration is strict and requires more recent TLS
2770 # version, we have to change the minimum to test old TLS versions.
2771 server_context.minimum_version = min_version
2772
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002773 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2774 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2775 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002776 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002777 client_context.set_ciphers("ALL")
2778
2779 for ctx in (client_context, server_context):
2780 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002781 ctx.load_cert_chain(SIGNED_CERTFILE)
2782 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002783 try:
2784 stats = server_params_test(client_context, server_context,
2785 chatty=False, connectionchatty=False)
2786 # Protocol mismatch can result in either an SSLError, or a
2787 # "Connection reset by peer" error.
2788 except ssl.SSLError:
2789 if expect_success:
2790 raise
2791 except OSError as e:
2792 if expect_success or e.errno != errno.ECONNRESET:
2793 raise
2794 else:
2795 if not expect_success:
2796 raise AssertionError(
2797 "Client protocol %s succeeded with server protocol %s!"
2798 % (ssl.get_protocol_name(client_protocol),
2799 ssl.get_protocol_name(server_protocol)))
2800 elif (expect_success is not True
2801 and expect_success != stats['version']):
2802 raise AssertionError("version mismatch: expected %r, got %r"
2803 % (expect_success, stats['version']))
2804
2805
2806class ThreadedTests(unittest.TestCase):
2807
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002808 def test_echo(self):
2809 """Basic test of an SSL client connecting to a server"""
2810 if support.verbose:
2811 sys.stdout.write("\n")
2812 for protocol in PROTOCOLS:
2813 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2814 continue
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02002815 if not has_tls_protocol(protocol):
2816 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002817 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2818 context = ssl.SSLContext(protocol)
2819 context.load_cert_chain(CERTFILE)
2820 server_params_test(context, context,
2821 chatty=True, connectionchatty=True)
2822
Christian Heimesa170fa12017-09-15 20:27:30 +02002823 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824
2825 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2826 server_params_test(client_context=client_context,
2827 server_context=server_context,
2828 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002829 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002830
2831 client_context.check_hostname = False
2832 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2833 with self.assertRaises(ssl.SSLError) as e:
2834 server_params_test(client_context=server_context,
2835 server_context=client_context,
2836 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002837 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002838 self.assertIn('called a function you should not call',
2839 str(e.exception))
2840
2841 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2842 with self.assertRaises(ssl.SSLError) as e:
2843 server_params_test(client_context=server_context,
2844 server_context=server_context,
2845 chatty=True, connectionchatty=True)
2846 self.assertIn('called a function you should not call',
2847 str(e.exception))
2848
2849 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2850 with self.assertRaises(ssl.SSLError) as e:
2851 server_params_test(client_context=server_context,
2852 server_context=client_context,
2853 chatty=True, connectionchatty=True)
2854 self.assertIn('called a function you should not call',
2855 str(e.exception))
2856
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002857 def test_getpeercert(self):
2858 if support.verbose:
2859 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002860
2861 client_context, server_context, hostname = testing_context()
2862 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002863 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002864 with client_context.wrap_socket(socket.socket(),
2865 do_handshake_on_connect=False,
2866 server_hostname=hostname) as s:
2867 s.connect((HOST, server.port))
2868 # getpeercert() raise ValueError while the handshake isn't
2869 # done.
2870 with self.assertRaises(ValueError):
2871 s.getpeercert()
2872 s.do_handshake()
2873 cert = s.getpeercert()
2874 self.assertTrue(cert, "Can't get peer certificate.")
2875 cipher = s.cipher()
2876 if support.verbose:
2877 sys.stdout.write(pprint.pformat(cert) + '\n')
2878 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2879 if 'subject' not in cert:
2880 self.fail("No subject field in certificate: %s." %
2881 pprint.pformat(cert))
2882 if ((('organizationName', 'Python Software Foundation'),)
2883 not in cert['subject']):
2884 self.fail(
2885 "Missing or invalid 'organizationName' field in certificate subject; "
2886 "should be 'Python Software Foundation'.")
2887 self.assertIn('notBefore', cert)
2888 self.assertIn('notAfter', cert)
2889 before = ssl.cert_time_to_seconds(cert['notBefore'])
2890 after = ssl.cert_time_to_seconds(cert['notAfter'])
2891 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002892
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893 @unittest.skipUnless(have_verify_flags(),
2894 "verify_flags need OpenSSL > 0.9.8")
2895 def test_crl_check(self):
2896 if support.verbose:
2897 sys.stdout.write("\n")
2898
Christian Heimesa170fa12017-09-15 20:27:30 +02002899 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002900
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002901 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002902 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002903
2904 # VERIFY_DEFAULT should pass
2905 server = ThreadedEchoServer(context=server_context, chatty=True)
2906 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002907 with client_context.wrap_socket(socket.socket(),
2908 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002909 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002910 cert = s.getpeercert()
2911 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002912
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002913 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002914 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002915
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002916 server = ThreadedEchoServer(context=server_context, chatty=True)
2917 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002918 with client_context.wrap_socket(socket.socket(),
2919 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002920 with self.assertRaisesRegex(ssl.SSLError,
2921 "certificate verify failed"):
2922 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002923
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002924 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002925 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002926
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 server = ThreadedEchoServer(context=server_context, chatty=True)
2928 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002929 with client_context.wrap_socket(socket.socket(),
2930 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002931 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002932 cert = s.getpeercert()
2933 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002934
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002935 def test_check_hostname(self):
2936 if support.verbose:
2937 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002938
Christian Heimesa170fa12017-09-15 20:27:30 +02002939 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002940
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002941 # correct hostname should verify
2942 server = ThreadedEchoServer(context=server_context, chatty=True)
2943 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002944 with client_context.wrap_socket(socket.socket(),
2945 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 s.connect((HOST, server.port))
2947 cert = s.getpeercert()
2948 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002949
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002950 # incorrect hostname should raise an exception
2951 server = ThreadedEchoServer(context=server_context, chatty=True)
2952 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002953 with client_context.wrap_socket(socket.socket(),
2954 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002955 with self.assertRaisesRegex(
2956 ssl.CertificateError,
2957 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002958 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002959
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002960 # missing server_hostname arg should cause an exception, too
2961 server = ThreadedEchoServer(context=server_context, chatty=True)
2962 with server:
2963 with socket.socket() as s:
2964 with self.assertRaisesRegex(ValueError,
2965 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002966 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002967
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002968 def test_ecc_cert(self):
2969 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2970 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002971 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002972 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2973
2974 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2975 # load ECC cert
2976 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2977
2978 # correct hostname should verify
2979 server = ThreadedEchoServer(context=server_context, chatty=True)
2980 with server:
2981 with client_context.wrap_socket(socket.socket(),
2982 server_hostname=hostname) as s:
2983 s.connect((HOST, server.port))
2984 cert = s.getpeercert()
2985 self.assertTrue(cert, "Can't get peer certificate.")
2986 cipher = s.cipher()[0].split('-')
2987 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2988
2989 def test_dual_rsa_ecc(self):
2990 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2991 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002992 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2993 # algorithms.
2994 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002995 # only ECDSA certs
2996 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2997 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2998
2999 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3000 # load ECC and RSA key/cert pairs
3001 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
3002 server_context.load_cert_chain(SIGNED_CERTFILE)
3003
3004 # correct hostname should verify
3005 server = ThreadedEchoServer(context=server_context, chatty=True)
3006 with server:
3007 with client_context.wrap_socket(socket.socket(),
3008 server_hostname=hostname) as s:
3009 s.connect((HOST, server.port))
3010 cert = s.getpeercert()
3011 self.assertTrue(cert, "Can't get peer certificate.")
3012 cipher = s.cipher()[0].split('-')
3013 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3014
Christian Heimes66e57422018-01-29 14:25:13 +01003015 def test_check_hostname_idn(self):
3016 if support.verbose:
3017 sys.stdout.write("\n")
3018
Christian Heimes11a14932018-02-24 02:35:08 +01003019 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003020 server_context.load_cert_chain(IDNSANSFILE)
3021
Christian Heimes11a14932018-02-24 02:35:08 +01003022 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003023 context.verify_mode = ssl.CERT_REQUIRED
3024 context.check_hostname = True
3025 context.load_verify_locations(SIGNING_CA)
3026
3027 # correct hostname should verify, when specified in several
3028 # different ways
3029 idn_hostnames = [
3030 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003031 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003032 ('xn--knig-5qa.idn.pythontest.net',
3033 'xn--knig-5qa.idn.pythontest.net'),
3034 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003035 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003036
3037 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003038 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003039 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3040 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3041 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003042 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3043
3044 # ('königsgäßchen.idna2008.pythontest.net',
3045 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3046 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3047 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3048 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3049 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3050
Christian Heimes66e57422018-01-29 14:25:13 +01003051 ]
3052 for server_hostname, expected_hostname in idn_hostnames:
3053 server = ThreadedEchoServer(context=server_context, chatty=True)
3054 with server:
3055 with context.wrap_socket(socket.socket(),
3056 server_hostname=server_hostname) as s:
3057 self.assertEqual(s.server_hostname, expected_hostname)
3058 s.connect((HOST, server.port))
3059 cert = s.getpeercert()
3060 self.assertEqual(s.server_hostname, expected_hostname)
3061 self.assertTrue(cert, "Can't get peer certificate.")
3062
Christian Heimes66e57422018-01-29 14:25:13 +01003063 # incorrect hostname should raise an exception
3064 server = ThreadedEchoServer(context=server_context, chatty=True)
3065 with server:
3066 with context.wrap_socket(socket.socket(),
3067 server_hostname="python.example.org") as s:
3068 with self.assertRaises(ssl.CertificateError):
3069 s.connect((HOST, server.port))
3070
Christian Heimes529525f2018-05-23 22:24:45 +02003071 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003072 """Connecting when the server rejects the client's certificate
3073
3074 Launch a server with CERT_REQUIRED, and check that trying to
3075 connect to it with a wrong client certificate fails.
3076 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003077 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003078 # load client cert that is not signed by trusted CA
3079 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003080 # require TLS client authentication
3081 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003082 # TLS 1.3 has different handshake
3083 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003084
3085 server = ThreadedEchoServer(
3086 context=server_context, chatty=True, connectionchatty=True,
3087 )
3088
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003090 client_context.wrap_socket(socket.socket(),
3091 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003092 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 # Expect either an SSL error about the server rejecting
3094 # the connection, or a low-level connection reset (which
3095 # sometimes happens on Windows)
3096 s.connect((HOST, server.port))
3097 except ssl.SSLError as e:
3098 if support.verbose:
3099 sys.stdout.write("\nSSLError is %r\n" % e)
3100 except OSError as e:
3101 if e.errno != errno.ECONNRESET:
3102 raise
3103 if support.verbose:
3104 sys.stdout.write("\nsocket.error is %r\n" % e)
3105 else:
3106 self.fail("Use of invalid cert should have failed!")
3107
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003108 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003109 def test_wrong_cert_tls13(self):
3110 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003111 # load client cert that is not signed by trusted CA
3112 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003113 server_context.verify_mode = ssl.CERT_REQUIRED
3114 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3115 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3116
3117 server = ThreadedEchoServer(
3118 context=server_context, chatty=True, connectionchatty=True,
3119 )
3120 with server, \
3121 client_context.wrap_socket(socket.socket(),
3122 server_hostname=hostname) as s:
3123 # TLS 1.3 perform client cert exchange after handshake
3124 s.connect((HOST, server.port))
3125 try:
3126 s.write(b'data')
3127 s.read(4)
3128 except ssl.SSLError as e:
3129 if support.verbose:
3130 sys.stdout.write("\nSSLError is %r\n" % e)
3131 except OSError as e:
3132 if e.errno != errno.ECONNRESET:
3133 raise
3134 if support.verbose:
3135 sys.stdout.write("\nsocket.error is %r\n" % e)
3136 else:
3137 self.fail("Use of invalid cert should have failed!")
3138
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003139 def test_rude_shutdown(self):
3140 """A brutal shutdown of an SSL server should raise an OSError
3141 in the client when attempting handshake.
3142 """
3143 listener_ready = threading.Event()
3144 listener_gone = threading.Event()
3145
3146 s = socket.socket()
3147 port = support.bind_port(s, HOST)
3148
3149 # `listener` runs in a thread. It sits in an accept() until
3150 # the main thread connects. Then it rudely closes the socket,
3151 # and sets Event `listener_gone` to let the main thread know
3152 # the socket is gone.
3153 def listener():
3154 s.listen()
3155 listener_ready.set()
3156 newsock, addr = s.accept()
3157 newsock.close()
3158 s.close()
3159 listener_gone.set()
3160
3161 def connector():
3162 listener_ready.wait()
3163 with socket.socket() as c:
3164 c.connect((HOST, port))
3165 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003166 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003167 ssl_sock = test_wrap_socket(c)
3168 except OSError:
3169 pass
3170 else:
3171 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003173 t = threading.Thread(target=listener)
3174 t.start()
3175 try:
3176 connector()
3177 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003178 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003179
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003180 def test_ssl_cert_verify_error(self):
3181 if support.verbose:
3182 sys.stdout.write("\n")
3183
3184 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3185 server_context.load_cert_chain(SIGNED_CERTFILE)
3186
3187 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3188
3189 server = ThreadedEchoServer(context=server_context, chatty=True)
3190 with server:
3191 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003192 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003193 try:
3194 s.connect((HOST, server.port))
3195 except ssl.SSLError as e:
3196 msg = 'unable to get local issuer certificate'
3197 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3198 self.assertEqual(e.verify_code, 20)
3199 self.assertEqual(e.verify_message, msg)
3200 self.assertIn(msg, repr(e))
3201 self.assertIn('certificate verify failed', repr(e))
3202
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003203 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003204 def test_protocol_sslv2(self):
3205 """Connecting to an SSLv2 server with various client options"""
3206 if support.verbose:
3207 sys.stdout.write("\n")
3208 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3209 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3210 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003211 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003212 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3214 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3215 # SSLv23 client with specific SSL options
3216 if no_sslv2_implies_sslv3_hello():
3217 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003218 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003220 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003222 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003223 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003224
Christian Heimesa170fa12017-09-15 20:27:30 +02003225 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003226 """Connecting to an SSLv23 server with various client options"""
3227 if support.verbose:
3228 sys.stdout.write("\n")
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003229 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003230 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003231 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003232 except OSError as x:
3233 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3234 if support.verbose:
3235 sys.stdout.write(
3236 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3237 % str(x))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003238 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003239 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3240 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003241 if has_tls_version('TLSv1'):
3242 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003243
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003244 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003245 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3246 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003247 if has_tls_version('TLSv1'):
3248 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003249
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003250 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003251 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3252 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003253 if has_tls_version('TLSv1'):
3254 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003255
3256 # Server with specific SSL options
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003257 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003258 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003259 server_options=ssl.OP_NO_SSLv3)
3260 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003261 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003263 if has_tls_version('TLSv1'):
3264 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3265 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003266
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003267 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003268 def test_protocol_sslv3(self):
3269 """Connecting to an SSLv3 server with various client options"""
3270 if support.verbose:
3271 sys.stdout.write("\n")
3272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3273 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3274 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003275 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003276 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003277 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003278 client_options=ssl.OP_NO_SSLv3)
3279 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3280 if no_sslv2_implies_sslv3_hello():
3281 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003282 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003283 False, client_options=ssl.OP_NO_SSLv2)
3284
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003285 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003286 def test_protocol_tlsv1(self):
3287 """Connecting to a TLSv1 server with various client options"""
3288 if support.verbose:
3289 sys.stdout.write("\n")
3290 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3291 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3292 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003293 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003294 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003295 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003296 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003297 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003298 client_options=ssl.OP_NO_TLSv1)
3299
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003300 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003301 def test_protocol_tlsv1_1(self):
3302 """Connecting to a TLSv1.1 server with various client options.
3303 Testing against older TLS versions."""
3304 if support.verbose:
3305 sys.stdout.write("\n")
3306 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003307 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003308 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003309 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003311 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312 client_options=ssl.OP_NO_TLSv1_1)
3313
Christian Heimesa170fa12017-09-15 20:27:30 +02003314 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003315 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3316 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003317
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003318 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003319 def test_protocol_tlsv1_2(self):
3320 """Connecting to a TLSv1.2 server with various client options.
3321 Testing against older TLS versions."""
3322 if support.verbose:
3323 sys.stdout.write("\n")
3324 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3325 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3326 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003327 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003328 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003329 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003331 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003332 client_options=ssl.OP_NO_TLSv1_2)
3333
Christian Heimesa170fa12017-09-15 20:27:30 +02003334 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3336 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3337 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3338 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3339
3340 def test_starttls(self):
3341 """Switching from clear text to encrypted and back again."""
3342 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3343
3344 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003345 starttls_server=True,
3346 chatty=True,
3347 connectionchatty=True)
3348 wrapped = False
3349 with server:
3350 s = socket.socket()
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03003351 s.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003352 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003353 if support.verbose:
3354 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003355 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003356 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003357 sys.stdout.write(
3358 " client: sending %r...\n" % indata)
3359 if wrapped:
3360 conn.write(indata)
3361 outdata = conn.read()
3362 else:
3363 s.send(indata)
3364 outdata = s.recv(1024)
3365 msg = outdata.strip().lower()
3366 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3367 # STARTTLS ok, switch to secure mode
3368 if support.verbose:
3369 sys.stdout.write(
3370 " client: read %r from server, starting TLS...\n"
3371 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003372 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003373 wrapped = True
3374 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3375 # ENDTLS ok, switch back to clear text
3376 if support.verbose:
3377 sys.stdout.write(
3378 " client: read %r from server, ending TLS...\n"
3379 % msg)
3380 s = conn.unwrap()
3381 wrapped = False
3382 else:
3383 if support.verbose:
3384 sys.stdout.write(
3385 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003386 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003387 sys.stdout.write(" client: closing connection.\n")
3388 if wrapped:
3389 conn.write(b"over\n")
3390 else:
3391 s.send(b"over\n")
3392 if wrapped:
3393 conn.close()
3394 else:
3395 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003396
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003397 def test_socketserver(self):
3398 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003399 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003400 # try to connect
3401 if support.verbose:
3402 sys.stdout.write('\n')
3403 with open(CERTFILE, 'rb') as f:
3404 d1 = f.read()
3405 d2 = ''
3406 # now fetch the same data from the HTTPS server
3407 url = 'https://localhost:%d/%s' % (
3408 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003409 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003410 f = urllib.request.urlopen(url, context=context)
3411 try:
3412 dlen = f.info().get("content-length")
3413 if dlen and (int(dlen) > 0):
3414 d2 = f.read(int(dlen))
3415 if support.verbose:
3416 sys.stdout.write(
3417 " client: read %d bytes from remote server '%s'\n"
3418 % (len(d2), server))
3419 finally:
3420 f.close()
3421 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003422
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003423 def test_asyncore_server(self):
3424 """Check the example asyncore integration."""
3425 if support.verbose:
3426 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003427
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003428 indata = b"FOO\n"
3429 server = AsyncoreEchoServer(CERTFILE)
3430 with server:
3431 s = test_wrap_socket(socket.socket())
3432 s.connect(('127.0.0.1', server.port))
3433 if support.verbose:
3434 sys.stdout.write(
3435 " client: sending %r...\n" % indata)
3436 s.write(indata)
3437 outdata = s.read()
3438 if support.verbose:
3439 sys.stdout.write(" client: read %r\n" % outdata)
3440 if outdata != indata.lower():
3441 self.fail(
3442 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3443 % (outdata[:20], len(outdata),
3444 indata[:20].lower(), len(indata)))
3445 s.write(b"over\n")
3446 if support.verbose:
3447 sys.stdout.write(" client: closing connection.\n")
3448 s.close()
3449 if support.verbose:
3450 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003451
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003452 def test_recv_send(self):
3453 """Test recv(), send() and friends."""
3454 if support.verbose:
3455 sys.stdout.write("\n")
3456
3457 server = ThreadedEchoServer(CERTFILE,
3458 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003459 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003460 cacerts=CERTFILE,
3461 chatty=True,
3462 connectionchatty=False)
3463 with server:
3464 s = test_wrap_socket(socket.socket(),
3465 server_side=False,
3466 certfile=CERTFILE,
3467 ca_certs=CERTFILE,
3468 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003469 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003470 s.connect((HOST, server.port))
3471 # helper methods for standardising recv* method signatures
3472 def _recv_into():
3473 b = bytearray(b"\0"*100)
3474 count = s.recv_into(b)
3475 return b[:count]
3476
3477 def _recvfrom_into():
3478 b = bytearray(b"\0"*100)
3479 count, addr = s.recvfrom_into(b)
3480 return b[:count]
3481
3482 # (name, method, expect success?, *args, return value func)
3483 send_methods = [
3484 ('send', s.send, True, [], len),
3485 ('sendto', s.sendto, False, ["some.address"], len),
3486 ('sendall', s.sendall, True, [], lambda x: None),
3487 ]
3488 # (name, method, whether to expect success, *args)
3489 recv_methods = [
3490 ('recv', s.recv, True, []),
3491 ('recvfrom', s.recvfrom, False, ["some.address"]),
3492 ('recv_into', _recv_into, True, []),
3493 ('recvfrom_into', _recvfrom_into, False, []),
3494 ]
3495 data_prefix = "PREFIX_"
3496
3497 for (meth_name, send_meth, expect_success, args,
3498 ret_val_meth) in send_methods:
3499 indata = (data_prefix + meth_name).encode('ascii')
3500 try:
3501 ret = send_meth(indata, *args)
3502 msg = "sending with {}".format(meth_name)
3503 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3504 outdata = s.read()
3505 if outdata != indata.lower():
3506 self.fail(
3507 "While sending with <<{name:s}>> bad data "
3508 "<<{outdata:r}>> ({nout:d}) received; "
3509 "expected <<{indata:r}>> ({nin:d})\n".format(
3510 name=meth_name, outdata=outdata[:20],
3511 nout=len(outdata),
3512 indata=indata[:20], nin=len(indata)
3513 )
3514 )
3515 except ValueError as e:
3516 if expect_success:
3517 self.fail(
3518 "Failed to send with method <<{name:s}>>; "
3519 "expected to succeed.\n".format(name=meth_name)
3520 )
3521 if not str(e).startswith(meth_name):
3522 self.fail(
3523 "Method <<{name:s}>> failed with unexpected "
3524 "exception message: {exp:s}\n".format(
3525 name=meth_name, exp=e
3526 )
3527 )
3528
3529 for meth_name, recv_meth, expect_success, args in recv_methods:
3530 indata = (data_prefix + meth_name).encode('ascii')
3531 try:
3532 s.send(indata)
3533 outdata = recv_meth(*args)
3534 if outdata != indata.lower():
3535 self.fail(
3536 "While receiving with <<{name:s}>> bad data "
3537 "<<{outdata:r}>> ({nout:d}) received; "
3538 "expected <<{indata:r}>> ({nin:d})\n".format(
3539 name=meth_name, outdata=outdata[:20],
3540 nout=len(outdata),
3541 indata=indata[:20], nin=len(indata)
3542 )
3543 )
3544 except ValueError as e:
3545 if expect_success:
3546 self.fail(
3547 "Failed to receive with method <<{name:s}>>; "
3548 "expected to succeed.\n".format(name=meth_name)
3549 )
3550 if not str(e).startswith(meth_name):
3551 self.fail(
3552 "Method <<{name:s}>> failed with unexpected "
3553 "exception message: {exp:s}\n".format(
3554 name=meth_name, exp=e
3555 )
3556 )
3557 # consume data
3558 s.read()
3559
3560 # read(-1, buffer) is supported, even though read(-1) is not
3561 data = b"data"
3562 s.send(data)
3563 buffer = bytearray(len(data))
3564 self.assertEqual(s.read(-1, buffer), len(data))
3565 self.assertEqual(buffer, data)
3566
Christian Heimes888bbdc2017-09-07 14:18:21 -07003567 # sendall accepts bytes-like objects
3568 if ctypes is not None:
3569 ubyte = ctypes.c_ubyte * len(data)
3570 byteslike = ubyte.from_buffer_copy(data)
3571 s.sendall(byteslike)
3572 self.assertEqual(s.read(), data)
3573
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003574 # Make sure sendmsg et al are disallowed to avoid
3575 # inadvertent disclosure of data and/or corruption
3576 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003577 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3579 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3580 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003581 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003582 s.write(b"over\n")
3583
3584 self.assertRaises(ValueError, s.recv, -1)
3585 self.assertRaises(ValueError, s.read, -1)
3586
3587 s.close()
3588
3589 def test_recv_zero(self):
3590 server = ThreadedEchoServer(CERTFILE)
3591 server.__enter__()
3592 self.addCleanup(server.__exit__, None, None)
3593 s = socket.create_connection((HOST, server.port))
3594 self.addCleanup(s.close)
3595 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3596 self.addCleanup(s.close)
3597
3598 # recv/read(0) should return no data
3599 s.send(b"data")
3600 self.assertEqual(s.recv(0), b"")
3601 self.assertEqual(s.read(0), b"")
3602 self.assertEqual(s.read(), b"data")
3603
3604 # Should not block if the other end sends no data
3605 s.setblocking(False)
3606 self.assertEqual(s.recv(0), b"")
3607 self.assertEqual(s.recv_into(bytearray()), 0)
3608
3609 def test_nonblocking_send(self):
3610 server = ThreadedEchoServer(CERTFILE,
3611 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003612 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003613 cacerts=CERTFILE,
3614 chatty=True,
3615 connectionchatty=False)
3616 with server:
3617 s = test_wrap_socket(socket.socket(),
3618 server_side=False,
3619 certfile=CERTFILE,
3620 ca_certs=CERTFILE,
3621 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003622 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003623 s.connect((HOST, server.port))
3624 s.setblocking(False)
3625
3626 # If we keep sending data, at some point the buffers
3627 # will be full and the call will block
3628 buf = bytearray(8192)
3629 def fill_buffer():
3630 while True:
3631 s.send(buf)
3632 self.assertRaises((ssl.SSLWantWriteError,
3633 ssl.SSLWantReadError), fill_buffer)
3634
3635 # Now read all the output and discard it
3636 s.setblocking(True)
3637 s.close()
3638
3639 def test_handshake_timeout(self):
3640 # Issue #5103: SSL handshake must respect the socket timeout
3641 server = socket.socket(socket.AF_INET)
3642 host = "127.0.0.1"
3643 port = support.bind_port(server)
3644 started = threading.Event()
3645 finish = False
3646
3647 def serve():
3648 server.listen()
3649 started.set()
3650 conns = []
3651 while not finish:
3652 r, w, e = select.select([server], [], [], 0.1)
3653 if server in r:
3654 # Let the socket hang around rather than having
3655 # it closed by garbage collection.
3656 conns.append(server.accept()[0])
3657 for sock in conns:
3658 sock.close()
3659
3660 t = threading.Thread(target=serve)
3661 t.start()
3662 started.wait()
3663
3664 try:
3665 try:
3666 c = socket.socket(socket.AF_INET)
3667 c.settimeout(0.2)
3668 c.connect((host, port))
3669 # Will attempt handshake and time out
3670 self.assertRaisesRegex(socket.timeout, "timed out",
3671 test_wrap_socket, c)
3672 finally:
3673 c.close()
3674 try:
3675 c = socket.socket(socket.AF_INET)
3676 c = test_wrap_socket(c)
3677 c.settimeout(0.2)
3678 # Will attempt handshake and time out
3679 self.assertRaisesRegex(socket.timeout, "timed out",
3680 c.connect, (host, port))
3681 finally:
3682 c.close()
3683 finally:
3684 finish = True
3685 t.join()
3686 server.close()
3687
3688 def test_server_accept(self):
3689 # Issue #16357: accept() on a SSLSocket created through
3690 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003691 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003692 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003693 context.load_verify_locations(SIGNING_CA)
3694 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003695 server = socket.socket(socket.AF_INET)
3696 host = "127.0.0.1"
3697 port = support.bind_port(server)
3698 server = context.wrap_socket(server, server_side=True)
3699 self.assertTrue(server.server_side)
3700
3701 evt = threading.Event()
3702 remote = None
3703 peer = None
3704 def serve():
3705 nonlocal remote, peer
3706 server.listen()
3707 # Block on the accept and wait on the connection to close.
3708 evt.set()
3709 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003710 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003711
3712 t = threading.Thread(target=serve)
3713 t.start()
3714 # Client wait until server setup and perform a connect.
3715 evt.wait()
3716 client = context.wrap_socket(socket.socket())
3717 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003718 client.send(b'data')
3719 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003720 client_addr = client.getsockname()
3721 client.close()
3722 t.join()
3723 remote.close()
3724 server.close()
3725 # Sanity checks.
3726 self.assertIsInstance(remote, ssl.SSLSocket)
3727 self.assertEqual(peer, client_addr)
3728
3729 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003730 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003731 with context.wrap_socket(socket.socket()) as sock:
3732 with self.assertRaises(OSError) as cm:
3733 sock.getpeercert()
3734 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3735
3736 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003737 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003738 with context.wrap_socket(socket.socket()) as sock:
3739 with self.assertRaises(OSError) as cm:
3740 sock.do_handshake()
3741 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3742
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003743 def test_no_shared_ciphers(self):
3744 client_context, server_context, hostname = testing_context()
3745 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3746 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003747 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003748 client_context.set_ciphers("AES128")
3749 server_context.set_ciphers("AES256")
3750 with ThreadedEchoServer(context=server_context) as server:
3751 with client_context.wrap_socket(socket.socket(),
3752 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003753 with self.assertRaises(OSError):
3754 s.connect((HOST, server.port))
3755 self.assertIn("no shared cipher", server.conn_errors[0])
3756
3757 def test_version_basic(self):
3758 """
3759 Basic tests for SSLSocket.version().
3760 More tests are done in the test_protocol_*() methods.
3761 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003762 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3763 context.check_hostname = False
3764 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003766 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003767 chatty=False) as server:
3768 with context.wrap_socket(socket.socket()) as s:
3769 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003770 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003771 s.connect((HOST, server.port))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003772 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003773 self.assertEqual(s.version(), 'TLSv1.3')
3774 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003775 self.assertEqual(s.version(), 'TLSv1.2')
3776 else: # 0.9.8 to 1.0.1
3777 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003778 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003779 self.assertIs(s.version(), None)
3780
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003781 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003782 def test_tls1_3(self):
3783 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3784 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003785 context.options |= (
3786 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3787 )
3788 with ThreadedEchoServer(context=context) as server:
3789 with context.wrap_socket(socket.socket()) as s:
3790 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003791 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003792 'TLS_AES_256_GCM_SHA384',
3793 'TLS_CHACHA20_POLY1305_SHA256',
3794 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003795 })
3796 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003797
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003798 @requires_minimum_version
3799 @requires_tls_version('TLSv1_2')
3800 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003801 client_context, server_context, hostname = testing_context()
3802 # client TLSv1.0 to 1.2
3803 client_context.minimum_version = ssl.TLSVersion.TLSv1
3804 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3805 # server only TLSv1.2
3806 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3807 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3808
3809 with ThreadedEchoServer(context=server_context) as server:
3810 with client_context.wrap_socket(socket.socket(),
3811 server_hostname=hostname) as s:
3812 s.connect((HOST, server.port))
3813 self.assertEqual(s.version(), 'TLSv1.2')
3814
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003815 @requires_minimum_version
3816 @requires_tls_version('TLSv1_1')
3817 def test_min_max_version_tlsv1_1(self):
3818 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003819 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003820 client_context.minimum_version = ssl.TLSVersion.TLSv1
3821 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003822 server_context.minimum_version = ssl.TLSVersion.TLSv1
3823 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3824
3825 with ThreadedEchoServer(context=server_context) as server:
3826 with client_context.wrap_socket(socket.socket(),
3827 server_hostname=hostname) as s:
3828 s.connect((HOST, server.port))
3829 self.assertEqual(s.version(), 'TLSv1.1')
3830
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003831 @requires_minimum_version
3832 @requires_tls_version('TLSv1_2')
3833 def test_min_max_version_mismatch(self):
3834 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003835 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003836 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimesc9bc49c2019-09-11 19:24:47 +02003837 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003838 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimesde606ea2019-09-11 19:48:58 +02003839 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003840 with ThreadedEchoServer(context=server_context) as server:
3841 with client_context.wrap_socket(socket.socket(),
3842 server_hostname=hostname) as s:
3843 with self.assertRaises(ssl.SSLError) as e:
3844 s.connect((HOST, server.port))
3845 self.assertIn("alert", str(e.exception))
3846
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003847 @requires_minimum_version
3848 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003849 def test_min_max_version_sslv3(self):
3850 client_context, server_context, hostname = testing_context()
3851 server_context.minimum_version = ssl.TLSVersion.SSLv3
3852 client_context.minimum_version = ssl.TLSVersion.SSLv3
3853 client_context.maximum_version = ssl.TLSVersion.SSLv3
3854 with ThreadedEchoServer(context=server_context) as server:
3855 with client_context.wrap_socket(socket.socket(),
3856 server_hostname=hostname) as s:
3857 s.connect((HOST, server.port))
3858 self.assertEqual(s.version(), 'SSLv3')
3859
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003860 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3861 def test_default_ecdh_curve(self):
3862 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3863 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003864 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003866 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3867 # cipher name.
3868 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003869 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3870 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3871 # our default cipher list should prefer ECDH-based ciphers
3872 # automatically.
3873 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3874 context.set_ciphers("ECCdraft:ECDH")
3875 with ThreadedEchoServer(context=context) as server:
3876 with context.wrap_socket(socket.socket()) as s:
3877 s.connect((HOST, server.port))
3878 self.assertIn("ECDH", s.cipher()[0])
3879
3880 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3881 "'tls-unique' channel binding not available")
3882 def test_tls_unique_channel_binding(self):
3883 """Test tls-unique channel binding."""
3884 if support.verbose:
3885 sys.stdout.write("\n")
3886
Christian Heimes05d9fe32018-02-27 08:55:39 +01003887 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003888
3889 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003890 chatty=True,
3891 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003892
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003893 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003894 with client_context.wrap_socket(
3895 socket.socket(),
3896 server_hostname=hostname) as s:
3897 s.connect((HOST, server.port))
3898 # get the data
3899 cb_data = s.get_channel_binding("tls-unique")
3900 if support.verbose:
3901 sys.stdout.write(
3902 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903
Christian Heimes05d9fe32018-02-27 08:55:39 +01003904 # check if it is sane
3905 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003906 if s.version() == 'TLSv1.3':
3907 self.assertEqual(len(cb_data), 48)
3908 else:
3909 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003910
Christian Heimes05d9fe32018-02-27 08:55:39 +01003911 # and compare with the peers version
3912 s.write(b"CB tls-unique\n")
3913 peer_data_repr = s.read().strip()
3914 self.assertEqual(peer_data_repr,
3915 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003916
3917 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003918 with client_context.wrap_socket(
3919 socket.socket(),
3920 server_hostname=hostname) as s:
3921 s.connect((HOST, server.port))
3922 new_cb_data = s.get_channel_binding("tls-unique")
3923 if support.verbose:
3924 sys.stdout.write(
3925 "got another channel binding data: {0!r}\n".format(
3926 new_cb_data)
3927 )
3928 # is it really unique
3929 self.assertNotEqual(cb_data, new_cb_data)
3930 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003931 if s.version() == 'TLSv1.3':
3932 self.assertEqual(len(cb_data), 48)
3933 else:
3934 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003935 s.write(b"CB tls-unique\n")
3936 peer_data_repr = s.read().strip()
3937 self.assertEqual(peer_data_repr,
3938 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939
3940 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003941 client_context, server_context, hostname = testing_context()
3942 stats = server_params_test(client_context, server_context,
3943 chatty=True, connectionchatty=True,
3944 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003945 if support.verbose:
3946 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3947 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3948
3949 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3950 "ssl.OP_NO_COMPRESSION needed for this test")
3951 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003952 client_context, server_context, hostname = testing_context()
3953 client_context.options |= ssl.OP_NO_COMPRESSION
3954 server_context.options |= ssl.OP_NO_COMPRESSION
3955 stats = server_params_test(client_context, server_context,
3956 chatty=True, connectionchatty=True,
3957 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003958 self.assertIs(stats['compression'], None)
3959
Paul Monsonf3550692019-06-19 13:09:54 -07003960 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003961 def test_dh_params(self):
3962 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003964 # test scenario needs TLS <= 1.2
3965 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003966 server_context.load_dh_params(DHFILE)
3967 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003968 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003969 stats = server_params_test(client_context, server_context,
3970 chatty=True, connectionchatty=True,
3971 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003972 cipher = stats["cipher"][0]
3973 parts = cipher.split("-")
3974 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3975 self.fail("Non-DH cipher: " + cipher[0])
3976
Christian Heimesb7b92252018-02-25 09:49:31 +01003977 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003978 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003979 def test_ecdh_curve(self):
3980 # server secp384r1, client auto
3981 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003982
Christian Heimesb7b92252018-02-25 09:49:31 +01003983 server_context.set_ecdh_curve("secp384r1")
3984 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3985 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3986 stats = server_params_test(client_context, server_context,
3987 chatty=True, connectionchatty=True,
3988 sni_name=hostname)
3989
3990 # server auto, client secp384r1
3991 client_context, server_context, hostname = testing_context()
3992 client_context.set_ecdh_curve("secp384r1")
3993 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3994 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3995 stats = server_params_test(client_context, server_context,
3996 chatty=True, connectionchatty=True,
3997 sni_name=hostname)
3998
3999 # server / client curve mismatch
4000 client_context, server_context, hostname = testing_context()
4001 client_context.set_ecdh_curve("prime256v1")
4002 server_context.set_ecdh_curve("secp384r1")
4003 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
4004 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4005 try:
4006 stats = server_params_test(client_context, server_context,
4007 chatty=True, connectionchatty=True,
4008 sni_name=hostname)
4009 except ssl.SSLError:
4010 pass
4011 else:
4012 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004013 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004014 self.fail("mismatch curve did not fail")
4015
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004016 def test_selected_alpn_protocol(self):
4017 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004018 client_context, server_context, hostname = testing_context()
4019 stats = server_params_test(client_context, server_context,
4020 chatty=True, connectionchatty=True,
4021 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 self.assertIs(stats['client_alpn_protocol'], None)
4023
4024 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4025 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4026 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004027 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 server_context.set_alpn_protocols(['foo', 'bar'])
4029 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004030 chatty=True, connectionchatty=True,
4031 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004032 self.assertIs(stats['client_alpn_protocol'], None)
4033
4034 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4035 def test_alpn_protocols(self):
4036 server_protocols = ['foo', 'bar', 'milkshake']
4037 protocol_tests = [
4038 (['foo', 'bar'], 'foo'),
4039 (['bar', 'foo'], 'foo'),
4040 (['milkshake'], 'milkshake'),
4041 (['http/3.0', 'http/4.0'], None)
4042 ]
4043 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004044 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004045 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004046 client_context.set_alpn_protocols(client_protocols)
4047
4048 try:
4049 stats = server_params_test(client_context,
4050 server_context,
4051 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004052 connectionchatty=True,
4053 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004054 except ssl.SSLError as e:
4055 stats = e
4056
Christian Heimes05d9fe32018-02-27 08:55:39 +01004057 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004058 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4059 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4060 self.assertIsInstance(stats, ssl.SSLError)
4061 else:
4062 msg = "failed trying %s (s) and %s (c).\n" \
4063 "was expecting %s, but got %%s from the %%s" \
4064 % (str(server_protocols), str(client_protocols),
4065 str(expected))
4066 client_result = stats['client_alpn_protocol']
4067 self.assertEqual(client_result, expected,
4068 msg % (client_result, "client"))
4069 server_result = stats['server_alpn_protocols'][-1] \
4070 if len(stats['server_alpn_protocols']) else 'nothing'
4071 self.assertEqual(server_result, expected,
4072 msg % (server_result, "server"))
4073
4074 def test_selected_npn_protocol(self):
4075 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004076 client_context, server_context, hostname = testing_context()
4077 stats = server_params_test(client_context, server_context,
4078 chatty=True, connectionchatty=True,
4079 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004080 self.assertIs(stats['client_npn_protocol'], None)
4081
4082 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4083 def test_npn_protocols(self):
4084 server_protocols = ['http/1.1', 'spdy/2']
4085 protocol_tests = [
4086 (['http/1.1', 'spdy/2'], 'http/1.1'),
4087 (['spdy/2', 'http/1.1'], 'http/1.1'),
4088 (['spdy/2', 'test'], 'spdy/2'),
4089 (['abc', 'def'], 'abc')
4090 ]
4091 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004092 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004094 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004095 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004096 chatty=True, connectionchatty=True,
4097 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004098 msg = "failed trying %s (s) and %s (c).\n" \
4099 "was expecting %s, but got %%s from the %%s" \
4100 % (str(server_protocols), str(client_protocols),
4101 str(expected))
4102 client_result = stats['client_npn_protocol']
4103 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4104 server_result = stats['server_npn_protocols'][-1] \
4105 if len(stats['server_npn_protocols']) else 'nothing'
4106 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4107
4108 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004109 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004110 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004111 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004112 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004113 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004114 client_context.load_verify_locations(SIGNING_CA)
4115 return server_context, other_context, client_context
4116
4117 def check_common_name(self, stats, name):
4118 cert = stats['peercert']
4119 self.assertIn((('commonName', name),), cert['subject'])
4120
4121 @needs_sni
4122 def test_sni_callback(self):
4123 calls = []
4124 server_context, other_context, client_context = self.sni_contexts()
4125
Christian Heimesa170fa12017-09-15 20:27:30 +02004126 client_context.check_hostname = False
4127
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004128 def servername_cb(ssl_sock, server_name, initial_context):
4129 calls.append((server_name, initial_context))
4130 if server_name is not None:
4131 ssl_sock.context = other_context
4132 server_context.set_servername_callback(servername_cb)
4133
4134 stats = server_params_test(client_context, server_context,
4135 chatty=True,
4136 sni_name='supermessage')
4137 # The hostname was fetched properly, and the certificate was
4138 # changed for the connection.
4139 self.assertEqual(calls, [("supermessage", server_context)])
4140 # CERTFILE4 was selected
4141 self.check_common_name(stats, 'fakehostname')
4142
4143 calls = []
4144 # The callback is called with server_name=None
4145 stats = server_params_test(client_context, server_context,
4146 chatty=True,
4147 sni_name=None)
4148 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004149 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150
4151 # Check disabling the callback
4152 calls = []
4153 server_context.set_servername_callback(None)
4154
4155 stats = server_params_test(client_context, server_context,
4156 chatty=True,
4157 sni_name='notfunny')
4158 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004159 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004160 self.assertEqual(calls, [])
4161
4162 @needs_sni
4163 def test_sni_callback_alert(self):
4164 # Returning a TLS alert is reflected to the connecting client
4165 server_context, other_context, client_context = self.sni_contexts()
4166
4167 def cb_returning_alert(ssl_sock, server_name, initial_context):
4168 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4169 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004170 with self.assertRaises(ssl.SSLError) as cm:
4171 stats = server_params_test(client_context, server_context,
4172 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004173 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004174 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004176 @needs_sni
4177 def test_sni_callback_raising(self):
4178 # Raising fails the connection with a TLS handshake failure alert.
4179 server_context, other_context, client_context = self.sni_contexts()
4180
4181 def cb_raising(ssl_sock, server_name, initial_context):
4182 1/0
4183 server_context.set_servername_callback(cb_raising)
4184
Victor Stinner00253502019-06-03 03:51:43 +02004185 with support.catch_unraisable_exception() as catch:
4186 with self.assertRaises(ssl.SSLError) as cm:
4187 stats = server_params_test(client_context, server_context,
4188 chatty=False,
4189 sni_name='supermessage')
4190
4191 self.assertEqual(cm.exception.reason,
4192 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4193 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004194
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004195 @needs_sni
4196 def test_sni_callback_wrong_return_type(self):
4197 # Returning the wrong return type terminates the TLS connection
4198 # with an internal error alert.
4199 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004201 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4202 return "foo"
4203 server_context.set_servername_callback(cb_wrong_return_type)
4204
Victor Stinner00253502019-06-03 03:51:43 +02004205 with support.catch_unraisable_exception() as catch:
4206 with self.assertRaises(ssl.SSLError) as cm:
4207 stats = server_params_test(client_context, server_context,
4208 chatty=False,
4209 sni_name='supermessage')
4210
4211
4212 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4213 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004214
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004215 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004216 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004217 client_context.set_ciphers("AES128:AES256")
4218 server_context.set_ciphers("AES256")
4219 expected_algs = [
4220 "AES256", "AES-256",
4221 # TLS 1.3 ciphers are always enabled
4222 "TLS_CHACHA20", "TLS_AES",
4223 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004224
Christian Heimesa170fa12017-09-15 20:27:30 +02004225 stats = server_params_test(client_context, server_context,
4226 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004227 ciphers = stats['server_shared_ciphers'][0]
4228 self.assertGreater(len(ciphers), 0)
4229 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004230 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004231 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004232
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004233 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004234 client_context, server_context, hostname = testing_context()
4235 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004237 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004238 s = client_context.wrap_socket(socket.socket(),
4239 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004240 s.connect((HOST, server.port))
4241 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004242
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004243 self.assertRaises(ValueError, s.read, 1024)
4244 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004245
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004246 def test_sendfile(self):
4247 TEST_DATA = b"x" * 512
4248 with open(support.TESTFN, 'wb') as f:
4249 f.write(TEST_DATA)
4250 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004251 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004252 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004253 context.load_verify_locations(SIGNING_CA)
4254 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004255 server = ThreadedEchoServer(context=context, chatty=False)
4256 with server:
4257 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004258 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004259 with open(support.TESTFN, 'rb') as file:
4260 s.sendfile(file)
4261 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004262
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004263 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004264 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004265 # TODO: sessions aren't compatible with TLSv1.3 yet
4266 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004267
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004268 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004269 stats = server_params_test(client_context, server_context,
4270 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004271 session = stats['session']
4272 self.assertTrue(session.id)
4273 self.assertGreater(session.time, 0)
4274 self.assertGreater(session.timeout, 0)
4275 self.assertTrue(session.has_ticket)
4276 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4277 self.assertGreater(session.ticket_lifetime_hint, 0)
4278 self.assertFalse(stats['session_reused'])
4279 sess_stat = server_context.session_stats()
4280 self.assertEqual(sess_stat['accept'], 1)
4281 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004282
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004283 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004284 stats = server_params_test(client_context, server_context,
4285 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004286 sess_stat = server_context.session_stats()
4287 self.assertEqual(sess_stat['accept'], 2)
4288 self.assertEqual(sess_stat['hits'], 1)
4289 self.assertTrue(stats['session_reused'])
4290 session2 = stats['session']
4291 self.assertEqual(session2.id, session.id)
4292 self.assertEqual(session2, session)
4293 self.assertIsNot(session2, session)
4294 self.assertGreaterEqual(session2.time, session.time)
4295 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004296
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004297 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004298 stats = server_params_test(client_context, server_context,
4299 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004300 self.assertFalse(stats['session_reused'])
4301 session3 = stats['session']
4302 self.assertNotEqual(session3.id, session.id)
4303 self.assertNotEqual(session3, session)
4304 sess_stat = server_context.session_stats()
4305 self.assertEqual(sess_stat['accept'], 3)
4306 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004308 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004309 stats = server_params_test(client_context, server_context,
4310 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004311 self.assertTrue(stats['session_reused'])
4312 session4 = stats['session']
4313 self.assertEqual(session4.id, session.id)
4314 self.assertEqual(session4, session)
4315 self.assertGreaterEqual(session4.time, session.time)
4316 self.assertGreaterEqual(session4.timeout, session.timeout)
4317 sess_stat = server_context.session_stats()
4318 self.assertEqual(sess_stat['accept'], 4)
4319 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004320
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004321 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004322 client_context, server_context, hostname = testing_context()
4323 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004324
Christian Heimes05d9fe32018-02-27 08:55:39 +01004325 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004326 client_context.options |= ssl.OP_NO_TLSv1_3
4327 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004328
Christian Heimesa170fa12017-09-15 20:27:30 +02004329 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004330 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004331 with client_context.wrap_socket(socket.socket(),
4332 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004333 # session is None before handshake
4334 self.assertEqual(s.session, None)
4335 self.assertEqual(s.session_reused, None)
4336 s.connect((HOST, server.port))
4337 session = s.session
4338 self.assertTrue(session)
4339 with self.assertRaises(TypeError) as e:
4340 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004341 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004342
Christian Heimesa170fa12017-09-15 20:27:30 +02004343 with client_context.wrap_socket(socket.socket(),
4344 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004345 s.connect((HOST, server.port))
4346 # cannot set session after handshake
4347 with self.assertRaises(ValueError) as e:
4348 s.session = session
4349 self.assertEqual(str(e.exception),
4350 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004351
Christian Heimesa170fa12017-09-15 20:27:30 +02004352 with client_context.wrap_socket(socket.socket(),
4353 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004354 # can set session before handshake and before the
4355 # connection was established
4356 s.session = session
4357 s.connect((HOST, server.port))
4358 self.assertEqual(s.session.id, session.id)
4359 self.assertEqual(s.session, session)
4360 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004361
Christian Heimesa170fa12017-09-15 20:27:30 +02004362 with client_context2.wrap_socket(socket.socket(),
4363 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004364 # cannot re-use session with a different SSLContext
4365 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004366 s.session = session
4367 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004368 self.assertEqual(str(e.exception),
4369 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004370
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004371
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02004372@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004373class TestPostHandshakeAuth(unittest.TestCase):
4374 def test_pha_setter(self):
4375 protocols = [
4376 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4377 ]
4378 for protocol in protocols:
4379 ctx = ssl.SSLContext(protocol)
4380 self.assertEqual(ctx.post_handshake_auth, False)
4381
4382 ctx.post_handshake_auth = True
4383 self.assertEqual(ctx.post_handshake_auth, True)
4384
4385 ctx.verify_mode = ssl.CERT_REQUIRED
4386 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4387 self.assertEqual(ctx.post_handshake_auth, True)
4388
4389 ctx.post_handshake_auth = False
4390 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4391 self.assertEqual(ctx.post_handshake_auth, False)
4392
4393 ctx.verify_mode = ssl.CERT_OPTIONAL
4394 ctx.post_handshake_auth = True
4395 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4396 self.assertEqual(ctx.post_handshake_auth, True)
4397
4398 def test_pha_required(self):
4399 client_context, server_context, hostname = testing_context()
4400 server_context.post_handshake_auth = True
4401 server_context.verify_mode = ssl.CERT_REQUIRED
4402 client_context.post_handshake_auth = True
4403 client_context.load_cert_chain(SIGNED_CERTFILE)
4404
4405 server = ThreadedEchoServer(context=server_context, chatty=False)
4406 with server:
4407 with client_context.wrap_socket(socket.socket(),
4408 server_hostname=hostname) as s:
4409 s.connect((HOST, server.port))
4410 s.write(b'HASCERT')
4411 self.assertEqual(s.recv(1024), b'FALSE\n')
4412 s.write(b'PHA')
4413 self.assertEqual(s.recv(1024), b'OK\n')
4414 s.write(b'HASCERT')
4415 self.assertEqual(s.recv(1024), b'TRUE\n')
4416 # PHA method just returns true when cert is already available
4417 s.write(b'PHA')
4418 self.assertEqual(s.recv(1024), b'OK\n')
4419 s.write(b'GETCERT')
4420 cert_text = s.recv(4096).decode('us-ascii')
4421 self.assertIn('Python Software Foundation CA', cert_text)
4422
4423 def test_pha_required_nocert(self):
4424 client_context, server_context, hostname = testing_context()
4425 server_context.post_handshake_auth = True
4426 server_context.verify_mode = ssl.CERT_REQUIRED
4427 client_context.post_handshake_auth = True
4428
Victor Stinner73ea5462019-07-09 14:33:49 +02004429 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4430 # (it is only raised sometimes on Windows)
4431 with support.catch_threading_exception() as cm:
4432 server = ThreadedEchoServer(context=server_context, chatty=False)
4433 with server:
4434 with client_context.wrap_socket(socket.socket(),
4435 server_hostname=hostname) as s:
4436 s.connect((HOST, server.port))
4437 s.write(b'PHA')
4438 # receive CertificateRequest
4439 self.assertEqual(s.recv(1024), b'OK\n')
4440 # send empty Certificate + Finish
4441 s.write(b'HASCERT')
4442 # receive alert
4443 with self.assertRaisesRegex(
4444 ssl.SSLError,
4445 'tlsv13 alert certificate required'):
4446 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004447
4448 def test_pha_optional(self):
4449 if support.verbose:
4450 sys.stdout.write("\n")
4451
4452 client_context, server_context, hostname = testing_context()
4453 server_context.post_handshake_auth = True
4454 server_context.verify_mode = ssl.CERT_REQUIRED
4455 client_context.post_handshake_auth = True
4456 client_context.load_cert_chain(SIGNED_CERTFILE)
4457
4458 # check CERT_OPTIONAL
4459 server_context.verify_mode = ssl.CERT_OPTIONAL
4460 server = ThreadedEchoServer(context=server_context, chatty=False)
4461 with server:
4462 with client_context.wrap_socket(socket.socket(),
4463 server_hostname=hostname) as s:
4464 s.connect((HOST, server.port))
4465 s.write(b'HASCERT')
4466 self.assertEqual(s.recv(1024), b'FALSE\n')
4467 s.write(b'PHA')
4468 self.assertEqual(s.recv(1024), b'OK\n')
4469 s.write(b'HASCERT')
4470 self.assertEqual(s.recv(1024), b'TRUE\n')
4471
4472 def test_pha_optional_nocert(self):
4473 if support.verbose:
4474 sys.stdout.write("\n")
4475
4476 client_context, server_context, hostname = testing_context()
4477 server_context.post_handshake_auth = True
4478 server_context.verify_mode = ssl.CERT_OPTIONAL
4479 client_context.post_handshake_auth = True
4480
4481 server = ThreadedEchoServer(context=server_context, chatty=False)
4482 with server:
4483 with client_context.wrap_socket(socket.socket(),
4484 server_hostname=hostname) as s:
4485 s.connect((HOST, server.port))
4486 s.write(b'HASCERT')
4487 self.assertEqual(s.recv(1024), b'FALSE\n')
4488 s.write(b'PHA')
4489 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004490 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004491 s.write(b'HASCERT')
4492 self.assertEqual(s.recv(1024), b'FALSE\n')
4493
4494 def test_pha_no_pha_client(self):
4495 client_context, server_context, hostname = testing_context()
4496 server_context.post_handshake_auth = True
4497 server_context.verify_mode = ssl.CERT_REQUIRED
4498 client_context.load_cert_chain(SIGNED_CERTFILE)
4499
4500 server = ThreadedEchoServer(context=server_context, chatty=False)
4501 with server:
4502 with client_context.wrap_socket(socket.socket(),
4503 server_hostname=hostname) as s:
4504 s.connect((HOST, server.port))
4505 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4506 s.verify_client_post_handshake()
4507 s.write(b'PHA')
4508 self.assertIn(b'extension not received', s.recv(1024))
4509
4510 def test_pha_no_pha_server(self):
4511 # server doesn't have PHA enabled, cert is requested in handshake
4512 client_context, server_context, hostname = testing_context()
4513 server_context.verify_mode = ssl.CERT_REQUIRED
4514 client_context.post_handshake_auth = True
4515 client_context.load_cert_chain(SIGNED_CERTFILE)
4516
4517 server = ThreadedEchoServer(context=server_context, chatty=False)
4518 with server:
4519 with client_context.wrap_socket(socket.socket(),
4520 server_hostname=hostname) as s:
4521 s.connect((HOST, server.port))
4522 s.write(b'HASCERT')
4523 self.assertEqual(s.recv(1024), b'TRUE\n')
4524 # PHA doesn't fail if there is already a cert
4525 s.write(b'PHA')
4526 self.assertEqual(s.recv(1024), b'OK\n')
4527 s.write(b'HASCERT')
4528 self.assertEqual(s.recv(1024), b'TRUE\n')
4529
4530 def test_pha_not_tls13(self):
4531 # TLS 1.2
4532 client_context, server_context, hostname = testing_context()
4533 server_context.verify_mode = ssl.CERT_REQUIRED
4534 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4535 client_context.post_handshake_auth = True
4536 client_context.load_cert_chain(SIGNED_CERTFILE)
4537
4538 server = ThreadedEchoServer(context=server_context, chatty=False)
4539 with server:
4540 with client_context.wrap_socket(socket.socket(),
4541 server_hostname=hostname) as s:
4542 s.connect((HOST, server.port))
4543 # PHA fails for TLS != 1.3
4544 s.write(b'PHA')
4545 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4546
Christian Heimesf0f59302019-07-01 08:29:17 +02004547 def test_bpo37428_pha_cert_none(self):
4548 # verify that post_handshake_auth does not implicitly enable cert
4549 # validation.
4550 hostname = SIGNED_CERTFILE_HOSTNAME
4551 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4552 client_context.post_handshake_auth = True
4553 client_context.load_cert_chain(SIGNED_CERTFILE)
4554 # no cert validation and CA on client side
4555 client_context.check_hostname = False
4556 client_context.verify_mode = ssl.CERT_NONE
4557
4558 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4559 server_context.load_cert_chain(SIGNED_CERTFILE)
4560 server_context.load_verify_locations(SIGNING_CA)
4561 server_context.post_handshake_auth = True
4562 server_context.verify_mode = ssl.CERT_REQUIRED
4563
4564 server = ThreadedEchoServer(context=server_context, chatty=False)
4565 with server:
4566 with client_context.wrap_socket(socket.socket(),
4567 server_hostname=hostname) as s:
4568 s.connect((HOST, server.port))
4569 s.write(b'HASCERT')
4570 self.assertEqual(s.recv(1024), b'FALSE\n')
4571 s.write(b'PHA')
4572 self.assertEqual(s.recv(1024), b'OK\n')
4573 s.write(b'HASCERT')
4574 self.assertEqual(s.recv(1024), b'TRUE\n')
4575 # server cert has not been validated
4576 self.assertEqual(s.getpeercert(), {})
4577
Christian Heimes9fb051f2018-09-23 08:32:31 +02004578
Christian Heimesc7f70692019-05-31 11:44:05 +02004579HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4580requires_keylog = unittest.skipUnless(
4581 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4582
4583class TestSSLDebug(unittest.TestCase):
4584
4585 def keylog_lines(self, fname=support.TESTFN):
4586 with open(fname) as f:
4587 return len(list(f))
4588
4589 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004590 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004591 def test_keylog_defaults(self):
4592 self.addCleanup(support.unlink, support.TESTFN)
4593 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4594 self.assertEqual(ctx.keylog_filename, None)
4595
4596 self.assertFalse(os.path.isfile(support.TESTFN))
4597 ctx.keylog_filename = support.TESTFN
4598 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4599 self.assertTrue(os.path.isfile(support.TESTFN))
4600 self.assertEqual(self.keylog_lines(), 1)
4601
4602 ctx.keylog_filename = None
4603 self.assertEqual(ctx.keylog_filename, None)
4604
4605 with self.assertRaises((IsADirectoryError, PermissionError)):
4606 # Windows raises PermissionError
4607 ctx.keylog_filename = os.path.dirname(
4608 os.path.abspath(support.TESTFN))
4609
4610 with self.assertRaises(TypeError):
4611 ctx.keylog_filename = 1
4612
4613 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004614 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004615 def test_keylog_filename(self):
4616 self.addCleanup(support.unlink, support.TESTFN)
4617 client_context, server_context, hostname = testing_context()
4618
4619 client_context.keylog_filename = support.TESTFN
4620 server = ThreadedEchoServer(context=server_context, chatty=False)
4621 with server:
4622 with client_context.wrap_socket(socket.socket(),
4623 server_hostname=hostname) as s:
4624 s.connect((HOST, server.port))
4625 # header, 5 lines for TLS 1.3
4626 self.assertEqual(self.keylog_lines(), 6)
4627
4628 client_context.keylog_filename = None
4629 server_context.keylog_filename = support.TESTFN
4630 server = ThreadedEchoServer(context=server_context, chatty=False)
4631 with server:
4632 with client_context.wrap_socket(socket.socket(),
4633 server_hostname=hostname) as s:
4634 s.connect((HOST, server.port))
4635 self.assertGreaterEqual(self.keylog_lines(), 11)
4636
4637 client_context.keylog_filename = support.TESTFN
4638 server_context.keylog_filename = support.TESTFN
4639 server = ThreadedEchoServer(context=server_context, chatty=False)
4640 with server:
4641 with client_context.wrap_socket(socket.socket(),
4642 server_hostname=hostname) as s:
4643 s.connect((HOST, server.port))
4644 self.assertGreaterEqual(self.keylog_lines(), 21)
4645
4646 client_context.keylog_filename = None
4647 server_context.keylog_filename = None
4648
4649 @requires_keylog
4650 @unittest.skipIf(sys.flags.ignore_environment,
4651 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004652 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004653 def test_keylog_env(self):
4654 self.addCleanup(support.unlink, support.TESTFN)
4655 with unittest.mock.patch.dict(os.environ):
4656 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4657 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4658
4659 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4660 self.assertEqual(ctx.keylog_filename, None)
4661
4662 ctx = ssl.create_default_context()
4663 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4664
4665 ctx = ssl._create_stdlib_context()
4666 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4667
4668 def test_msg_callback(self):
4669 client_context, server_context, hostname = testing_context()
4670
4671 def msg_cb(conn, direction, version, content_type, msg_type, data):
4672 pass
4673
4674 self.assertIs(client_context._msg_callback, None)
4675 client_context._msg_callback = msg_cb
4676 self.assertIs(client_context._msg_callback, msg_cb)
4677 with self.assertRaises(TypeError):
4678 client_context._msg_callback = object()
4679
4680 def test_msg_callback_tls12(self):
4681 client_context, server_context, hostname = testing_context()
4682 client_context.options |= ssl.OP_NO_TLSv1_3
4683
4684 msg = []
4685
4686 def msg_cb(conn, direction, version, content_type, msg_type, data):
4687 self.assertIsInstance(conn, ssl.SSLSocket)
4688 self.assertIsInstance(data, bytes)
4689 self.assertIn(direction, {'read', 'write'})
4690 msg.append((direction, version, content_type, msg_type))
4691
4692 client_context._msg_callback = msg_cb
4693
4694 server = ThreadedEchoServer(context=server_context, chatty=False)
4695 with server:
4696 with client_context.wrap_socket(socket.socket(),
4697 server_hostname=hostname) as s:
4698 s.connect((HOST, server.port))
4699
Christian Heimese35d1ba2019-06-03 20:40:15 +02004700 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004701 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4702 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004703 msg
4704 )
4705 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004706 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4707 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004708 msg
4709 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004710
4711
Thomas Woutersed03b412007-08-28 21:37:11 +00004712def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004713 if support.verbose:
4714 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004715 'Mac': platform.mac_ver,
4716 'Windows': platform.win32_ver,
4717 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004718 for name, func in plats.items():
4719 plat = func()
4720 if plat and plat[0]:
4721 plat = '%s %r' % (name, plat)
4722 break
4723 else:
4724 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004725 print("test_ssl: testing with %r %r" %
4726 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4727 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004728 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004729 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4730 try:
4731 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4732 except AttributeError:
4733 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004734
Antoine Pitrou152efa22010-05-16 18:19:27 +00004735 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004736 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004737 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004738 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004739 BADCERT, BADKEY, EMPTYCERT]:
4740 if not os.path.exists(filename):
4741 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004742
Martin Panter3840b2a2016-03-27 01:53:46 +00004743 tests = [
4744 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004745 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004746 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004747 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004748
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004749 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004750 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004751
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004752 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004753 try:
4754 support.run_unittest(*tests)
4755 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004756 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004757
4758if __name__ == "__main__":
4759 test_main()