blob: 419506f4d3c072c2ab895c6eb07f7c84bdb7abfa [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes19311322019-09-26 22:53:09 +020022import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070023try:
24 import ctypes
25except ImportError:
26 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000027
Antoine Pitrou05d936d2010-10-13 11:38:36 +000028ssl = support.import_module("ssl")
29
Victor Stinner466e18e2019-07-01 19:01:52 +020030from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000031
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010032PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000033HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020034IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010035IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
36IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010037PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Victor Stinner3ef63442019-02-19 18:06:03 +010039PROTOCOL_TO_TLS_VERSION = {}
40for proto, ver in (
41 ("PROTOCOL_SSLv23", "SSLv3"),
42 ("PROTOCOL_TLSv1", "TLSv1"),
43 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
44):
45 try:
46 proto = getattr(ssl, proto)
47 ver = getattr(ssl.TLSVersion, ver)
48 except AttributeError:
49 continue
50 PROTOCOL_TO_TLS_VERSION[proto] = ver
51
Christian Heimesefff7062013-11-21 03:35:02 +010052def data_file(*name):
53 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000054
Antoine Pitrou81564092010-10-08 23:06:24 +000055# The custom key and certificate files used in test_ssl are generated
56# using Lib/test/make_ssl_certs.py.
57# Other certificates are simply fetched from the Internet servers they
58# are meant to authenticate.
59
Antoine Pitrou152efa22010-05-16 18:19:27 +000060CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000061BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000062ONLYCERT = data_file("ssl_cert.pem")
63ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000064BYTES_ONLYCERT = os.fsencode(ONLYCERT)
65BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020066CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
67ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
68KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000069CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000070BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010071CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
72CAFILE_CACERT = data_file("capath", "5ed36f99.0")
73
Christian Heimesbd5c7d22018-01-20 15:16:30 +010074CERTFILE_INFO = {
75 'issuer': ((('countryName', 'XY'),),
76 (('localityName', 'Castle Anthrax'),),
77 (('organizationName', 'Python Software Foundation'),),
78 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020079 'notAfter': 'Aug 26 14:23:15 2028 GMT',
80 'notBefore': 'Aug 29 14:23:15 2018 GMT',
81 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082 'subject': ((('countryName', 'XY'),),
83 (('localityName', 'Castle Anthrax'),),
84 (('organizationName', 'Python Software Foundation'),),
85 (('commonName', 'localhost'),)),
86 'subjectAltName': (('DNS', 'localhost'),),
87 'version': 3
88}
Antoine Pitrou152efa22010-05-16 18:19:27 +000089
Christian Heimes22587792013-11-21 23:56:13 +010090# empty CRL
91CRLFILE = data_file("revocation.crl")
92
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010093# Two keys and certs signed by the same CA (for SNI tests)
94SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020095SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010096
97SIGNED_CERTFILE_INFO = {
98 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
99 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
100 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
101 'issuer': ((('countryName', 'XY'),),
102 (('organizationName', 'Python Software Foundation CA'),),
103 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200104 'notAfter': 'Jul 7 14:23:16 2028 GMT',
105 'notBefore': 'Aug 29 14:23:16 2018 GMT',
106 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100107 'subject': ((('countryName', 'XY'),),
108 (('localityName', 'Castle Anthrax'),),
109 (('organizationName', 'Python Software Foundation'),),
110 (('commonName', 'localhost'),)),
111 'subjectAltName': (('DNS', 'localhost'),),
112 'version': 3
113}
114
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100115SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200116SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100117SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
118SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
119
Martin Panter3840b2a2016-03-27 01:53:46 +0000120# Same certificate as pycacert.pem, but without extra text in file
121SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200122# cert with all kinds of subject alt names
123ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100124IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100125
Martin Panter3d81d932016-01-14 09:36:00 +0000126REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000127
128EMPTYCERT = data_file("nullcert.pem")
129BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000130NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000131BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200132NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200133NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100134TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000135
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200136DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100137BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000138
Christian Heimes358cfd42016-09-10 22:43:48 +0200139# Not defined in all versions of OpenSSL
140OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
141OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
142OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
143OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100144OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200145
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100146
Christian Heimes19311322019-09-26 22:53:09 +0200147def has_tls_protocol(protocol):
148 """Check if a TLS protocol is available and enabled
149
150 :param protocol: enum ssl._SSLMethod member or name
151 :return: bool
152 """
153 if isinstance(protocol, str):
154 assert protocol.startswith('PROTOCOL_')
155 protocol = getattr(ssl, protocol, None)
156 if protocol is None:
157 return False
158 if protocol in {
159 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
160 ssl.PROTOCOL_TLS_CLIENT
161 }:
162 # auto-negotiate protocols are always available
163 return True
164 name = protocol.name
165 return has_tls_version(name[len('PROTOCOL_'):])
166
167
168@functools.lru_cache
169def has_tls_version(version):
170 """Check if a TLS/SSL version is enabled
171
172 :param version: TLS version name or ssl.TLSVersion member
173 :return: bool
174 """
175 if version == "SSLv2":
176 # never supported and not even in TLSVersion enum
177 return False
178
179 if isinstance(version, str):
180 version = ssl.TLSVersion.__members__[version]
181
182 # check compile time flags like ssl.HAS_TLSv1_2
183 if not getattr(ssl, f'HAS_{version.name}'):
184 return False
185
186 # check runtime and dynamic crypto policy settings. A TLS version may
187 # be compiled in but disabled by a policy or config option.
188 ctx = ssl.SSLContext()
189 if (
190 hasattr(ctx, 'minimum_version') and
191 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
192 version < ctx.minimum_version
193 ):
194 return False
195 if (
196 hasattr(ctx, 'maximum_version') and
197 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
198 version > ctx.maximum_version
199 ):
200 return False
201
202 return True
203
204
205def requires_tls_version(version):
206 """Decorator to skip tests when a required TLS version is not available
207
208 :param version: TLS version name or ssl.TLSVersion member
209 :return:
210 """
211 def decorator(func):
212 @functools.wraps(func)
213 def wrapper(*args, **kw):
214 if not has_tls_version(version):
215 raise unittest.SkipTest(f"{version} is not available.")
216 else:
217 return func(*args, **kw)
218 return wrapper
219 return decorator
220
221
222requires_minimum_version = unittest.skipUnless(
223 hasattr(ssl.SSLContext, 'minimum_version'),
224 "required OpenSSL >= 1.1.0g"
225)
226
227
Thomas Woutersed03b412007-08-28 21:37:11 +0000228def handle_error(prefix):
229 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000230 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000231 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000232
Antoine Pitroub5218772010-05-21 09:56:06 +0000233def can_clear_options():
234 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200235 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000236
237def no_sslv2_implies_sslv3_hello():
238 # 0.9.7h or higher
239 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
240
Christian Heimes2427b502013-11-23 11:24:32 +0100241def have_verify_flags():
242 # 0.9.8 or higher
243 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
244
Christian Heimesb7b92252018-02-25 09:49:31 +0100245def _have_secp_curves():
246 if not ssl.HAS_ECDH:
247 return False
248 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
249 try:
250 ctx.set_ecdh_curve("secp384r1")
251 except ValueError:
252 return False
253 else:
254 return True
255
256
257HAVE_SECP_CURVES = _have_secp_curves()
258
259
Antoine Pitrouc695c952014-04-28 20:57:36 +0200260def utc_offset(): #NOTE: ignore issues like #1647654
261 # local time = utc time + utc offset
262 if time.daylight and time.localtime().tm_isdst > 0:
263 return -time.altzone # seconds
264 return -time.timezone
265
Christian Heimes9424bb42013-06-17 15:32:57 +0200266def asn1time(cert_time):
267 # Some versions of OpenSSL ignore seconds, see #18207
268 # 0.9.8.i
269 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
270 fmt = "%b %d %H:%M:%S %Y GMT"
271 dt = datetime.datetime.strptime(cert_time, fmt)
272 dt = dt.replace(second=0)
273 cert_time = dt.strftime(fmt)
274 # %d adds leading zero but ASN1_TIME_print() uses leading space
275 if cert_time[4] == "0":
276 cert_time = cert_time[:4] + " " + cert_time[5:]
277
278 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000279
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100280needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
281
Antoine Pitrou23df4832010-08-04 17:14:06 +0000282
Christian Heimesd0486372016-09-10 23:23:33 +0200283def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
284 cert_reqs=ssl.CERT_NONE, ca_certs=None,
285 ciphers=None, certfile=None, keyfile=None,
286 **kwargs):
287 context = ssl.SSLContext(ssl_version)
288 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200289 if cert_reqs == ssl.CERT_NONE:
290 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200291 context.verify_mode = cert_reqs
292 if ca_certs is not None:
293 context.load_verify_locations(ca_certs)
294 if certfile is not None or keyfile is not None:
295 context.load_cert_chain(certfile, keyfile)
296 if ciphers is not None:
297 context.set_ciphers(ciphers)
298 return context.wrap_socket(sock, **kwargs)
299
Christian Heimesa170fa12017-09-15 20:27:30 +0200300
301def testing_context(server_cert=SIGNED_CERTFILE):
302 """Create context
303
304 client_context, server_context, hostname = testing_context()
305 """
306 if server_cert == SIGNED_CERTFILE:
307 hostname = SIGNED_CERTFILE_HOSTNAME
308 elif server_cert == SIGNED_CERTFILE2:
309 hostname = SIGNED_CERTFILE2_HOSTNAME
310 else:
311 raise ValueError(server_cert)
312
313 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
314 client_context.load_verify_locations(SIGNING_CA)
315
316 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
317 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200318 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200319
320 return client_context, server_context, hostname
321
322
Antoine Pitrou152efa22010-05-16 18:19:27 +0000323class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000324
Antoine Pitrou480a1242010-04-28 21:37:09 +0000325 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000326 ssl.CERT_NONE
327 ssl.CERT_OPTIONAL
328 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100329 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100330 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100331 if ssl.HAS_ECDH:
332 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100333 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
334 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000335 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100336 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700337 ssl.OP_NO_SSLv2
338 ssl.OP_NO_SSLv3
339 ssl.OP_NO_TLSv1
340 ssl.OP_NO_TLSv1_3
341 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
342 ssl.OP_NO_TLSv1_1
343 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200344 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000345
Christian Heimes9d50ab52018-02-27 10:17:30 +0100346 def test_private_init(self):
347 with self.assertRaisesRegex(TypeError, "public constructor"):
348 with socket.socket() as s:
349 ssl.SSLSocket(s)
350
Antoine Pitrou172f0252014-04-18 20:33:08 +0200351 def test_str_for_enums(self):
352 # Make sure that the PROTOCOL_* constants have enum-like string
353 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200354 proto = ssl.PROTOCOL_TLS
355 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200356 ctx = ssl.SSLContext(proto)
357 self.assertIs(ctx.protocol, proto)
358
Antoine Pitrou480a1242010-04-28 21:37:09 +0000359 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000360 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000361 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000362 sys.stdout.write("\n RAND_status is %d (%s)\n"
363 % (v, (v and "sufficient randomness") or
364 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200365
366 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
367 self.assertEqual(len(data), 16)
368 self.assertEqual(is_cryptographic, v == 1)
369 if v:
370 data = ssl.RAND_bytes(16)
371 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200372 else:
373 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200374
Victor Stinner1e81a392013-12-19 16:47:04 +0100375 # negative num is invalid
376 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
377 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
378
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100379 if hasattr(ssl, 'RAND_egd'):
380 self.assertRaises(TypeError, ssl.RAND_egd, 1)
381 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000382 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200383 ssl.RAND_add(b"this is a random bytes object", 75.0)
384 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000385
Christian Heimesf77b4b22013-08-21 13:26:05 +0200386 @unittest.skipUnless(os.name == 'posix', 'requires posix')
387 def test_random_fork(self):
388 status = ssl.RAND_status()
389 if not status:
390 self.fail("OpenSSL's PRNG has insufficient randomness")
391
392 rfd, wfd = os.pipe()
393 pid = os.fork()
394 if pid == 0:
395 try:
396 os.close(rfd)
397 child_random = ssl.RAND_pseudo_bytes(16)[0]
398 self.assertEqual(len(child_random), 16)
399 os.write(wfd, child_random)
400 os.close(wfd)
401 except BaseException:
402 os._exit(1)
403 else:
404 os._exit(0)
405 else:
406 os.close(wfd)
407 self.addCleanup(os.close, rfd)
408 _, status = os.waitpid(pid, 0)
409 self.assertEqual(status, 0)
410
411 child_random = os.read(rfd, 16)
412 self.assertEqual(len(child_random), 16)
413 parent_random = ssl.RAND_pseudo_bytes(16)[0]
414 self.assertEqual(len(parent_random), 16)
415
416 self.assertNotEqual(child_random, parent_random)
417
Christian Heimese6dac002018-08-30 07:25:49 +0200418 maxDiff = None
419
Antoine Pitrou480a1242010-04-28 21:37:09 +0000420 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000421 # note that this uses an 'unofficial' function in _ssl.c,
422 # provided solely for this test, to exercise the certificate
423 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100424 self.assertEqual(
425 ssl._ssl._test_decode_cert(CERTFILE),
426 CERTFILE_INFO
427 )
428 self.assertEqual(
429 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
430 SIGNED_CERTFILE_INFO
431 )
432
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200433 # Issue #13034: the subjectAltName in some certificates
434 # (notably projects.developer.nokia.com:443) wasn't parsed
435 p = ssl._ssl._test_decode_cert(NOKIACERT)
436 if support.verbose:
437 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
438 self.assertEqual(p['subjectAltName'],
439 (('DNS', 'projects.developer.nokia.com'),
440 ('DNS', 'projects.forum.nokia.com'))
441 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100442 # extra OCSP and AIA fields
443 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
444 self.assertEqual(p['caIssuers'],
445 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
446 self.assertEqual(p['crlDistributionPoints'],
447 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000448
Christian Heimesa37f5242019-01-15 23:47:42 +0100449 def test_parse_cert_CVE_2019_5010(self):
450 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
451 if support.verbose:
452 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
453 self.assertEqual(
454 p,
455 {
456 'issuer': (
457 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
458 'notAfter': 'Jun 14 18:00:58 2028 GMT',
459 'notBefore': 'Jun 18 18:00:58 2018 GMT',
460 'serialNumber': '02',
461 'subject': ((('countryName', 'UK'),),
462 (('commonName',
463 'codenomicon-vm-2.test.lal.cisco.com'),)),
464 'subjectAltName': (
465 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
466 'version': 3
467 }
468 )
469
Christian Heimes824f7f32013-08-17 00:54:47 +0200470 def test_parse_cert_CVE_2013_4238(self):
471 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
472 if support.verbose:
473 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
474 subject = ((('countryName', 'US'),),
475 (('stateOrProvinceName', 'Oregon'),),
476 (('localityName', 'Beaverton'),),
477 (('organizationName', 'Python Software Foundation'),),
478 (('organizationalUnitName', 'Python Core Development'),),
479 (('commonName', 'null.python.org\x00example.org'),),
480 (('emailAddress', 'python-dev@python.org'),))
481 self.assertEqual(p['subject'], subject)
482 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200483 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
484 san = (('DNS', 'altnull.python.org\x00example.com'),
485 ('email', 'null@python.org\x00user@example.org'),
486 ('URI', 'http://null.python.org\x00http://example.org'),
487 ('IP Address', '192.0.2.1'),
488 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
489 else:
490 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
491 san = (('DNS', 'altnull.python.org\x00example.com'),
492 ('email', 'null@python.org\x00user@example.org'),
493 ('URI', 'http://null.python.org\x00http://example.org'),
494 ('IP Address', '192.0.2.1'),
495 ('IP Address', '<invalid>'))
496
497 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200498
Christian Heimes1c03abd2016-09-06 23:25:35 +0200499 def test_parse_all_sans(self):
500 p = ssl._ssl._test_decode_cert(ALLSANFILE)
501 self.assertEqual(p['subjectAltName'],
502 (
503 ('DNS', 'allsans'),
504 ('othername', '<unsupported>'),
505 ('othername', '<unsupported>'),
506 ('email', 'user@example.org'),
507 ('DNS', 'www.example.org'),
508 ('DirName',
509 ((('countryName', 'XY'),),
510 (('localityName', 'Castle Anthrax'),),
511 (('organizationName', 'Python Software Foundation'),),
512 (('commonName', 'dirname example'),))),
513 ('URI', 'https://www.python.org/'),
514 ('IP Address', '127.0.0.1'),
515 ('IP Address', '0:0:0:0:0:0:0:1\n'),
516 ('Registered ID', '1.2.3.4.5')
517 )
518 )
519
Antoine Pitrou480a1242010-04-28 21:37:09 +0000520 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000521 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000522 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000523 d1 = ssl.PEM_cert_to_DER_cert(pem)
524 p2 = ssl.DER_cert_to_PEM_cert(d1)
525 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000526 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000527 if not p2.startswith(ssl.PEM_HEADER + '\n'):
528 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
529 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
530 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000531
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000532 def test_openssl_version(self):
533 n = ssl.OPENSSL_VERSION_NUMBER
534 t = ssl.OPENSSL_VERSION_INFO
535 s = ssl.OPENSSL_VERSION
536 self.assertIsInstance(n, int)
537 self.assertIsInstance(t, tuple)
538 self.assertIsInstance(s, str)
539 # Some sanity checks follow
540 # >= 0.9
541 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400542 # < 3.0
543 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000544 major, minor, fix, patch, status = t
545 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400546 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000547 self.assertGreaterEqual(minor, 0)
548 self.assertLess(minor, 256)
549 self.assertGreaterEqual(fix, 0)
550 self.assertLess(fix, 256)
551 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100552 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000553 self.assertGreaterEqual(status, 0)
554 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400555 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200556 if IS_LIBRESSL:
557 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100558 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400559 else:
560 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100561 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000562
Antoine Pitrou9d543662010-04-23 23:10:32 +0000563 @support.cpython_only
564 def test_refcycle(self):
565 # Issue #7943: an SSL object doesn't create reference cycles with
566 # itself.
567 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200568 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000569 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100570 with support.check_warnings(("", ResourceWarning)):
571 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100572 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000573
Antoine Pitroua468adc2010-09-14 14:43:44 +0000574 def test_wrapped_unconnected(self):
575 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200576 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000577 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200578 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100579 self.assertRaises(OSError, ss.recv, 1)
580 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
581 self.assertRaises(OSError, ss.recvfrom, 1)
582 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
583 self.assertRaises(OSError, ss.send, b'x')
584 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200585 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100586 self.assertRaises(NotImplementedError, ss.sendmsg,
587 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200588 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
589 self.assertRaises(NotImplementedError, ss.recvmsg_into,
590 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000591
Antoine Pitrou40f08742010-04-24 22:04:40 +0000592 def test_timeout(self):
593 # Issue #8524: when creating an SSL socket, the timeout of the
594 # original socket should be retained.
595 for timeout in (None, 0.0, 5.0):
596 s = socket.socket(socket.AF_INET)
597 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200598 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100599 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000600
Christian Heimesd0486372016-09-10 23:23:33 +0200601 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000602 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000603 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000604 "certfile must be specified",
605 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000606 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000607 "certfile must be specified for server-side operations",
608 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000609 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000610 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200611 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100612 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
613 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200614 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200615 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000616 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000617 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000618 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200619 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000620 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000621 ssl.wrap_socket(sock,
622 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000623 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200624 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000625 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000626 ssl.wrap_socket(sock,
627 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000628 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000629
Martin Panter3464ea22016-02-01 21:58:11 +0000630 def bad_cert_test(self, certfile):
631 """Check that trying to use the given client certificate fails"""
632 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
633 certfile)
634 sock = socket.socket()
635 self.addCleanup(sock.close)
636 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200637 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200638 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000639
640 def test_empty_cert(self):
641 """Wrapping with an empty cert file"""
642 self.bad_cert_test("nullcert.pem")
643
644 def test_malformed_cert(self):
645 """Wrapping with a badly formatted certificate (syntax error)"""
646 self.bad_cert_test("badcert.pem")
647
648 def test_malformed_key(self):
649 """Wrapping with a badly formatted key (syntax error)"""
650 self.bad_cert_test("badkey.pem")
651
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000652 def test_match_hostname(self):
653 def ok(cert, hostname):
654 ssl.match_hostname(cert, hostname)
655 def fail(cert, hostname):
656 self.assertRaises(ssl.CertificateError,
657 ssl.match_hostname, cert, hostname)
658
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100659 # -- Hostname matching --
660
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000661 cert = {'subject': ((('commonName', 'example.com'),),)}
662 ok(cert, 'example.com')
663 ok(cert, 'ExAmple.cOm')
664 fail(cert, 'www.example.com')
665 fail(cert, '.example.com')
666 fail(cert, 'example.org')
667 fail(cert, 'exampleXcom')
668
669 cert = {'subject': ((('commonName', '*.a.com'),),)}
670 ok(cert, 'foo.a.com')
671 fail(cert, 'bar.foo.a.com')
672 fail(cert, 'a.com')
673 fail(cert, 'Xa.com')
674 fail(cert, '.a.com')
675
Mandeep Singhede2ac92017-11-27 04:01:27 +0530676 # only match wildcards when they are the only thing
677 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000678 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530679 fail(cert, 'foo.com')
680 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000681 fail(cert, 'bar.com')
682 fail(cert, 'foo.a.com')
683 fail(cert, 'bar.foo.com')
684
Christian Heimes824f7f32013-08-17 00:54:47 +0200685 # NULL bytes are bad, CVE-2013-4073
686 cert = {'subject': ((('commonName',
687 'null.python.org\x00example.org'),),)}
688 ok(cert, 'null.python.org\x00example.org') # or raise an error?
689 fail(cert, 'example.org')
690 fail(cert, 'null.python.org')
691
Georg Brandl72c98d32013-10-27 07:16:53 +0100692 # error cases with wildcards
693 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
694 fail(cert, 'bar.foo.a.com')
695 fail(cert, 'a.com')
696 fail(cert, 'Xa.com')
697 fail(cert, '.a.com')
698
699 cert = {'subject': ((('commonName', 'a.*.com'),),)}
700 fail(cert, 'a.foo.com')
701 fail(cert, 'a..com')
702 fail(cert, 'a.com')
703
704 # wildcard doesn't match IDNA prefix 'xn--'
705 idna = 'püthon.python.org'.encode("idna").decode("ascii")
706 cert = {'subject': ((('commonName', idna),),)}
707 ok(cert, idna)
708 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
709 fail(cert, idna)
710 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
711 fail(cert, idna)
712
713 # wildcard in first fragment and IDNA A-labels in sequent fragments
714 # are supported.
715 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
716 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530717 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
718 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100719 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
720 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
721
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000722 # Slightly fake real-world example
723 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
724 'subject': ((('commonName', 'linuxfrz.org'),),),
725 'subjectAltName': (('DNS', 'linuxfr.org'),
726 ('DNS', 'linuxfr.com'),
727 ('othername', '<unsupported>'))}
728 ok(cert, 'linuxfr.org')
729 ok(cert, 'linuxfr.com')
730 # Not a "DNS" entry
731 fail(cert, '<unsupported>')
732 # When there is a subjectAltName, commonName isn't used
733 fail(cert, 'linuxfrz.org')
734
735 # A pristine real-world example
736 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
737 'subject': ((('countryName', 'US'),),
738 (('stateOrProvinceName', 'California'),),
739 (('localityName', 'Mountain View'),),
740 (('organizationName', 'Google Inc'),),
741 (('commonName', 'mail.google.com'),))}
742 ok(cert, 'mail.google.com')
743 fail(cert, 'gmail.com')
744 # Only commonName is considered
745 fail(cert, 'California')
746
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100747 # -- IPv4 matching --
748 cert = {'subject': ((('commonName', 'example.com'),),),
749 'subjectAltName': (('DNS', 'example.com'),
750 ('IP Address', '10.11.12.13'),
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700751 ('IP Address', '14.15.16.17'),
752 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100753 ok(cert, '10.11.12.13')
754 ok(cert, '14.15.16.17')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700755 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
756 fail(cert, '127.1')
757 fail(cert, '14.15.16.17 ')
758 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100759 fail(cert, '14.15.16.18')
760 fail(cert, 'example.net')
761
762 # -- IPv6 matching --
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700763 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100764 cert = {'subject': ((('commonName', 'example.com'),),),
765 'subjectAltName': (
766 ('DNS', 'example.com'),
767 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
768 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
769 ok(cert, '2001::cafe')
770 ok(cert, '2003::baba')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700771 fail(cert, '2003::baba ')
772 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100773 fail(cert, '2003::bebe')
774 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100775
776 # -- Miscellaneous --
777
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000778 # Neither commonName nor subjectAltName
779 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
780 'subject': ((('countryName', 'US'),),
781 (('stateOrProvinceName', 'California'),),
782 (('localityName', 'Mountain View'),),
783 (('organizationName', 'Google Inc'),))}
784 fail(cert, 'mail.google.com')
785
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200786 # No DNS entry in subjectAltName but a commonName
787 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
788 'subject': ((('countryName', 'US'),),
789 (('stateOrProvinceName', 'California'),),
790 (('localityName', 'Mountain View'),),
791 (('commonName', 'mail.google.com'),)),
792 'subjectAltName': (('othername', 'blabla'), )}
793 ok(cert, 'mail.google.com')
794
795 # No DNS entry subjectAltName and no commonName
796 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
797 'subject': ((('countryName', 'US'),),
798 (('stateOrProvinceName', 'California'),),
799 (('localityName', 'Mountain View'),),
800 (('organizationName', 'Google Inc'),)),
801 'subjectAltName': (('othername', 'blabla'),)}
802 fail(cert, 'google.com')
803
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000804 # Empty cert / no cert
805 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
806 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
807
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200808 # Issue #17980: avoid denials of service by refusing more than one
809 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100810 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
811 with self.assertRaisesRegex(
812 ssl.CertificateError,
813 "partial wildcards in leftmost label are not supported"):
814 ssl.match_hostname(cert, 'axxb.example.com')
815
816 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
817 with self.assertRaisesRegex(
818 ssl.CertificateError,
819 "wildcard can only be present in the leftmost label"):
820 ssl.match_hostname(cert, 'www.sub.example.com')
821
822 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
823 with self.assertRaisesRegex(
824 ssl.CertificateError,
825 "too many wildcards"):
826 ssl.match_hostname(cert, 'axxbxxc.example.com')
827
828 cert = {'subject': ((('commonName', '*'),),)}
829 with self.assertRaisesRegex(
830 ssl.CertificateError,
831 "sole wildcard without additional labels are not support"):
832 ssl.match_hostname(cert, 'host')
833
834 cert = {'subject': ((('commonName', '*.com'),),)}
835 with self.assertRaisesRegex(
836 ssl.CertificateError,
837 r"hostname 'com' doesn't match '\*.com'"):
838 ssl.match_hostname(cert, 'com')
839
840 # extra checks for _inet_paton()
841 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
842 with self.assertRaises(ValueError):
843 ssl._inet_paton(invalid)
844 for ipaddr in ['127.0.0.1', '192.168.0.1']:
845 self.assertTrue(ssl._inet_paton(ipaddr))
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700846 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100847 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
848 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200849
Antoine Pitroud5323212010-10-22 18:19:07 +0000850 def test_server_side(self):
851 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000853 with socket.socket() as sock:
854 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
855 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000856
Antoine Pitroud6494802011-07-21 01:11:30 +0200857 def test_unknown_channel_binding(self):
858 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200859 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200860 c = socket.socket(socket.AF_INET)
861 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200862 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100863 with self.assertRaises(ValueError):
864 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200865 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200866
867 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
868 "'tls-unique' channel binding not available")
869 def test_tls_unique_channel_binding(self):
870 # unconnected should return None for known type
871 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200872 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100873 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200874 # the same for server-side
875 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200876 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100877 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200878
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600879 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200880 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600881 r = repr(ss)
882 with self.assertWarns(ResourceWarning) as cm:
883 ss = None
884 support.gc_collect()
885 self.assertIn(r, str(cm.warning.args[0]))
886
Christian Heimes6d7ad132013-06-09 18:02:55 +0200887 def test_get_default_verify_paths(self):
888 paths = ssl.get_default_verify_paths()
889 self.assertEqual(len(paths), 6)
890 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
891
892 with support.EnvironmentVarGuard() as env:
893 env["SSL_CERT_DIR"] = CAPATH
894 env["SSL_CERT_FILE"] = CERTFILE
895 paths = ssl.get_default_verify_paths()
896 self.assertEqual(paths.cafile, CERTFILE)
897 self.assertEqual(paths.capath, CAPATH)
898
Christian Heimes44109d72013-11-22 01:51:30 +0100899 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
900 def test_enum_certificates(self):
901 self.assertTrue(ssl.enum_certificates("CA"))
902 self.assertTrue(ssl.enum_certificates("ROOT"))
903
904 self.assertRaises(TypeError, ssl.enum_certificates)
905 self.assertRaises(WindowsError, ssl.enum_certificates, "")
906
Christian Heimesc2d65e12013-11-22 16:13:55 +0100907 trust_oids = set()
908 for storename in ("CA", "ROOT"):
909 store = ssl.enum_certificates(storename)
910 self.assertIsInstance(store, list)
911 for element in store:
912 self.assertIsInstance(element, tuple)
913 self.assertEqual(len(element), 3)
914 cert, enc, trust = element
915 self.assertIsInstance(cert, bytes)
916 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Steve Dowerfdd17ab2019-09-10 02:02:04 -0700917 self.assertIsInstance(trust, (frozenset, set, bool))
918 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100919 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100920
921 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100922 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200923
Christian Heimes46bebee2013-06-09 19:03:31 +0200924 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100925 def test_enum_crls(self):
926 self.assertTrue(ssl.enum_crls("CA"))
927 self.assertRaises(TypeError, ssl.enum_crls)
928 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200929
Christian Heimes44109d72013-11-22 01:51:30 +0100930 crls = ssl.enum_crls("CA")
931 self.assertIsInstance(crls, list)
932 for element in crls:
933 self.assertIsInstance(element, tuple)
934 self.assertEqual(len(element), 2)
935 self.assertIsInstance(element[0], bytes)
936 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200937
Christian Heimes46bebee2013-06-09 19:03:31 +0200938
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100939 def test_asn1object(self):
940 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
941 '1.3.6.1.5.5.7.3.1')
942
943 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
944 self.assertEqual(val, expected)
945 self.assertEqual(val.nid, 129)
946 self.assertEqual(val.shortname, 'serverAuth')
947 self.assertEqual(val.longname, 'TLS Web Server Authentication')
948 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
949 self.assertIsInstance(val, ssl._ASN1Object)
950 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
951
952 val = ssl._ASN1Object.fromnid(129)
953 self.assertEqual(val, expected)
954 self.assertIsInstance(val, ssl._ASN1Object)
955 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100956 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
957 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100958 for i in range(1000):
959 try:
960 obj = ssl._ASN1Object.fromnid(i)
961 except ValueError:
962 pass
963 else:
964 self.assertIsInstance(obj.nid, int)
965 self.assertIsInstance(obj.shortname, str)
966 self.assertIsInstance(obj.longname, str)
967 self.assertIsInstance(obj.oid, (str, type(None)))
968
969 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
970 self.assertEqual(val, expected)
971 self.assertIsInstance(val, ssl._ASN1Object)
972 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
973 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
974 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100975 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
976 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100977
Christian Heimes72d28502013-11-23 13:56:58 +0100978 def test_purpose_enum(self):
979 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
980 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
981 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
982 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
983 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
984 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
985 '1.3.6.1.5.5.7.3.1')
986
987 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
988 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
989 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
990 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
991 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
992 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
993 '1.3.6.1.5.5.7.3.2')
994
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100995 def test_unsupported_dtls(self):
996 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
997 self.addCleanup(s.close)
998 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200999 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001000 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001002 with self.assertRaises(NotImplementedError) as cx:
1003 ctx.wrap_socket(s)
1004 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1005
Antoine Pitrouc695c952014-04-28 20:57:36 +02001006 def cert_time_ok(self, timestring, timestamp):
1007 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1008
1009 def cert_time_fail(self, timestring):
1010 with self.assertRaises(ValueError):
1011 ssl.cert_time_to_seconds(timestring)
1012
1013 @unittest.skipUnless(utc_offset(),
1014 'local time needs to be different from UTC')
1015 def test_cert_time_to_seconds_timezone(self):
1016 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1017 # results if local timezone is not UTC
1018 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1019 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1020
1021 def test_cert_time_to_seconds(self):
1022 timestring = "Jan 5 09:34:43 2018 GMT"
1023 ts = 1515144883.0
1024 self.cert_time_ok(timestring, ts)
1025 # accept keyword parameter, assert its name
1026 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1027 # accept both %e and %d (space or zero generated by strftime)
1028 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1029 # case-insensitive
1030 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1031 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1032 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1033 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1034 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1035 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1036 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1037 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1038
1039 newyear_ts = 1230768000.0
1040 # leap seconds
1041 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1042 # same timestamp
1043 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1044
1045 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1046 # allow 60th second (even if it is not a leap second)
1047 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1048 # allow 2nd leap second for compatibility with time.strptime()
1049 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1050 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1051
Mike53f7a7c2017-12-14 14:04:53 +03001052 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001053 # 99991231235959Z (rfc 5280)
1054 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1055
1056 @support.run_with_locale('LC_ALL', '')
1057 def test_cert_time_to_seconds_locale(self):
1058 # `cert_time_to_seconds()` should be locale independent
1059
1060 def local_february_name():
1061 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1062
1063 if local_february_name().lower() == 'feb':
1064 self.skipTest("locale-specific month name needs to be "
1065 "different from C locale")
1066
1067 # locale-independent
1068 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1069 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1070
Martin Panter3840b2a2016-03-27 01:53:46 +00001071 def test_connect_ex_error(self):
1072 server = socket.socket(socket.AF_INET)
1073 self.addCleanup(server.close)
1074 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001075 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001076 cert_reqs=ssl.CERT_REQUIRED)
1077 self.addCleanup(s.close)
1078 rc = s.connect_ex((HOST, port))
1079 # Issue #19919: Windows machines or VMs hosted on Windows
1080 # machines sometimes return EWOULDBLOCK.
1081 errors = (
1082 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1083 errno.EWOULDBLOCK,
1084 )
1085 self.assertIn(rc, errors)
1086
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001087
Antoine Pitrou152efa22010-05-16 18:19:27 +00001088class ContextTests(unittest.TestCase):
1089
1090 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001091 for protocol in PROTOCOLS:
1092 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001093 ctx = ssl.SSLContext()
1094 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001095 self.assertRaises(ValueError, ssl.SSLContext, -1)
1096 self.assertRaises(ValueError, ssl.SSLContext, 42)
1097
1098 def test_protocol(self):
1099 for proto in PROTOCOLS:
1100 ctx = ssl.SSLContext(proto)
1101 self.assertEqual(ctx.protocol, proto)
1102
1103 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001104 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001105 ctx.set_ciphers("ALL")
1106 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001107 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001108 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001109
Christian Heimes892d66e2018-01-29 14:10:18 +01001110 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1111 "Test applies only to Python default ciphers")
1112 def test_python_ciphers(self):
1113 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1114 ciphers = ctx.get_ciphers()
1115 for suite in ciphers:
1116 name = suite['name']
1117 self.assertNotIn("PSK", name)
1118 self.assertNotIn("SRP", name)
1119 self.assertNotIn("MD5", name)
1120 self.assertNotIn("RC4", name)
1121 self.assertNotIn("3DES", name)
1122
Christian Heimes25bfcd52016-09-06 00:04:45 +02001123 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1124 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001125 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001126 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001127 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001128 self.assertIn('AES256-GCM-SHA384', names)
1129 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001130
Antoine Pitroub5218772010-05-21 09:56:06 +00001131 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001132 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001133 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001134 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001135 # SSLContext also enables these by default
1136 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001137 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1138 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001139 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001140 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001141 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001142 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001143 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1144 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001145 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001146 # Ubuntu has OP_NO_SSLv3 forced on by default
1147 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001148 else:
1149 with self.assertRaises(ValueError):
1150 ctx.options = 0
1151
Christian Heimesa170fa12017-09-15 20:27:30 +02001152 def test_verify_mode_protocol(self):
1153 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001154 # Default value
1155 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1156 ctx.verify_mode = ssl.CERT_OPTIONAL
1157 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1158 ctx.verify_mode = ssl.CERT_REQUIRED
1159 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1160 ctx.verify_mode = ssl.CERT_NONE
1161 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1162 with self.assertRaises(TypeError):
1163 ctx.verify_mode = None
1164 with self.assertRaises(ValueError):
1165 ctx.verify_mode = 42
1166
Christian Heimesa170fa12017-09-15 20:27:30 +02001167 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1168 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1169 self.assertFalse(ctx.check_hostname)
1170
1171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1172 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1173 self.assertTrue(ctx.check_hostname)
1174
Christian Heimes61d478c2018-01-27 15:51:38 +01001175 def test_hostname_checks_common_name(self):
1176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1177 self.assertTrue(ctx.hostname_checks_common_name)
1178 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1179 ctx.hostname_checks_common_name = True
1180 self.assertTrue(ctx.hostname_checks_common_name)
1181 ctx.hostname_checks_common_name = False
1182 self.assertFalse(ctx.hostname_checks_common_name)
1183 ctx.hostname_checks_common_name = True
1184 self.assertTrue(ctx.hostname_checks_common_name)
1185 else:
1186 with self.assertRaises(AttributeError):
1187 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001188
Christian Heimes19311322019-09-26 22:53:09 +02001189 @requires_minimum_version
Miss Islington (bot)d6ac67f2019-09-11 10:59:13 -07001190 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001191 def test_min_max_version(self):
1192 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001193 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1194 # Fedora override the setting to TLS 1.0.
Christian Heimes19311322019-09-26 22:53:09 +02001195 minimum_range = {
1196 # stock OpenSSL
1197 ssl.TLSVersion.MINIMUM_SUPPORTED,
1198 # Fedora 29 uses TLS 1.0 by default
1199 ssl.TLSVersion.TLSv1,
1200 # RHEL 8 uses TLS 1.2 by default
1201 ssl.TLSVersion.TLSv1_2
1202 }
1203
Christian Heimes34de2d32019-01-18 16:09:30 +01001204 self.assertIn(
Christian Heimes19311322019-09-26 22:53:09 +02001205 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001206 )
1207 self.assertEqual(
1208 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1209 )
1210
1211 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1212 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1213 self.assertEqual(
1214 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1215 )
1216 self.assertEqual(
1217 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1218 )
1219
1220 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1221 ctx.maximum_version = ssl.TLSVersion.TLSv1
1222 self.assertEqual(
1223 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1224 )
1225 self.assertEqual(
1226 ctx.maximum_version, ssl.TLSVersion.TLSv1
1227 )
1228
1229 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1230 self.assertEqual(
1231 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1232 )
1233
1234 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1235 self.assertIn(
1236 ctx.maximum_version,
1237 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1238 )
1239
1240 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1241 self.assertIn(
1242 ctx.minimum_version,
1243 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1244 )
1245
1246 with self.assertRaises(ValueError):
1247 ctx.minimum_version = 42
1248
1249 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1250
Christian Heimes19311322019-09-26 22:53:09 +02001251 self.assertIn(
1252 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001253 )
1254 self.assertEqual(
1255 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1256 )
1257 with self.assertRaises(ValueError):
1258 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1259 with self.assertRaises(ValueError):
1260 ctx.maximum_version = ssl.TLSVersion.TLSv1
1261
1262
Christian Heimes2427b502013-11-23 11:24:32 +01001263 @unittest.skipUnless(have_verify_flags(),
1264 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001265 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001266 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001267 # default value
1268 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1269 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001270 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1271 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1272 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1273 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1274 ctx.verify_flags = ssl.VERIFY_DEFAULT
1275 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1276 # supports any value
1277 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1278 self.assertEqual(ctx.verify_flags,
1279 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1280 with self.assertRaises(TypeError):
1281 ctx.verify_flags = None
1282
Antoine Pitrou152efa22010-05-16 18:19:27 +00001283 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001284 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001285 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001286 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001287 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1288 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001289 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001290 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001291 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001292 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001293 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001294 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001295 ctx.load_cert_chain(EMPTYCERT)
1296 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001297 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001298 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1299 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1300 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001301 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001302 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001303 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001304 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001305 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001306 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1307 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001308 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001309 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001310 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001311 # Password protected key and cert
1312 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1313 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1314 ctx.load_cert_chain(CERTFILE_PROTECTED,
1315 password=bytearray(KEY_PASSWORD.encode()))
1316 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1317 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1318 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1319 bytearray(KEY_PASSWORD.encode()))
1320 with self.assertRaisesRegex(TypeError, "should be a string"):
1321 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1322 with self.assertRaises(ssl.SSLError):
1323 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1324 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1325 # openssl has a fixed limit on the password buffer.
1326 # PEM_BUFSIZE is generally set to 1kb.
1327 # Return a string larger than this.
1328 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1329 # Password callback
1330 def getpass_unicode():
1331 return KEY_PASSWORD
1332 def getpass_bytes():
1333 return KEY_PASSWORD.encode()
1334 def getpass_bytearray():
1335 return bytearray(KEY_PASSWORD.encode())
1336 def getpass_badpass():
1337 return "badpass"
1338 def getpass_huge():
1339 return b'a' * (1024 * 1024)
1340 def getpass_bad_type():
1341 return 9
1342 def getpass_exception():
1343 raise Exception('getpass error')
1344 class GetPassCallable:
1345 def __call__(self):
1346 return KEY_PASSWORD
1347 def getpass(self):
1348 return KEY_PASSWORD
1349 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1350 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1351 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1352 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1353 ctx.load_cert_chain(CERTFILE_PROTECTED,
1354 password=GetPassCallable().getpass)
1355 with self.assertRaises(ssl.SSLError):
1356 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1357 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1358 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1359 with self.assertRaisesRegex(TypeError, "must return a string"):
1360 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1361 with self.assertRaisesRegex(Exception, "getpass error"):
1362 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1363 # Make sure the password function isn't called if it isn't needed
1364 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001365
1366 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001367 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001368 ctx.load_verify_locations(CERTFILE)
1369 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1370 ctx.load_verify_locations(BYTES_CERTFILE)
1371 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1372 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001373 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001374 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001375 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001376 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001377 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001378 ctx.load_verify_locations(BADCERT)
1379 ctx.load_verify_locations(CERTFILE, CAPATH)
1380 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1381
Victor Stinner80f75e62011-01-29 11:31:20 +00001382 # Issue #10989: crash if the second argument type is invalid
1383 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1384
Christian Heimesefff7062013-11-21 03:35:02 +01001385 def test_load_verify_cadata(self):
1386 # test cadata
1387 with open(CAFILE_CACERT) as f:
1388 cacert_pem = f.read()
1389 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1390 with open(CAFILE_NEURONIO) as f:
1391 neuronio_pem = f.read()
1392 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1393
1394 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001396 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1397 ctx.load_verify_locations(cadata=cacert_pem)
1398 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1399 ctx.load_verify_locations(cadata=neuronio_pem)
1400 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1401 # cert already in hash table
1402 ctx.load_verify_locations(cadata=neuronio_pem)
1403 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1404
1405 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001406 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001407 combined = "\n".join((cacert_pem, neuronio_pem))
1408 ctx.load_verify_locations(cadata=combined)
1409 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1410
1411 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001412 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001413 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1414 neuronio_pem, "tail"]
1415 ctx.load_verify_locations(cadata="\n".join(combined))
1416 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1417
1418 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001419 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001420 ctx.load_verify_locations(cadata=cacert_der)
1421 ctx.load_verify_locations(cadata=neuronio_der)
1422 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1423 # cert already in hash table
1424 ctx.load_verify_locations(cadata=cacert_der)
1425 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1426
1427 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001428 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001429 combined = b"".join((cacert_der, neuronio_der))
1430 ctx.load_verify_locations(cadata=combined)
1431 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1432
1433 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001434 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001435 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1436
1437 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1438 ctx.load_verify_locations(cadata="broken")
1439 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1440 ctx.load_verify_locations(cadata=b"broken")
1441
1442
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001443 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001445 ctx.load_dh_params(DHFILE)
1446 if os.name != 'nt':
1447 ctx.load_dh_params(BYTES_DHFILE)
1448 self.assertRaises(TypeError, ctx.load_dh_params)
1449 self.assertRaises(TypeError, ctx.load_dh_params, None)
1450 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001451 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001452 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001453 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001454 ctx.load_dh_params(CERTFILE)
1455
Antoine Pitroub0182c82010-10-12 20:09:02 +00001456 def test_session_stats(self):
1457 for proto in PROTOCOLS:
1458 ctx = ssl.SSLContext(proto)
1459 self.assertEqual(ctx.session_stats(), {
1460 'number': 0,
1461 'connect': 0,
1462 'connect_good': 0,
1463 'connect_renegotiate': 0,
1464 'accept': 0,
1465 'accept_good': 0,
1466 'accept_renegotiate': 0,
1467 'hits': 0,
1468 'misses': 0,
1469 'timeouts': 0,
1470 'cache_full': 0,
1471 })
1472
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001473 def test_set_default_verify_paths(self):
1474 # There's not much we can do to test that it acts as expected,
1475 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001476 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001477 ctx.set_default_verify_paths()
1478
Antoine Pitrou501da612011-12-21 09:27:41 +01001479 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001480 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001481 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001482 ctx.set_ecdh_curve("prime256v1")
1483 ctx.set_ecdh_curve(b"prime256v1")
1484 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1485 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1486 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1487 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1488
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001489 @needs_sni
1490 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001491 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001492
1493 # set_servername_callback expects a callable, or None
1494 self.assertRaises(TypeError, ctx.set_servername_callback)
1495 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1496 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1497 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1498
1499 def dummycallback(sock, servername, ctx):
1500 pass
1501 ctx.set_servername_callback(None)
1502 ctx.set_servername_callback(dummycallback)
1503
1504 @needs_sni
1505 def test_sni_callback_refcycle(self):
1506 # Reference cycles through the servername callback are detected
1507 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001508 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001509 def dummycallback(sock, servername, ctx, cycle=ctx):
1510 pass
1511 ctx.set_servername_callback(dummycallback)
1512 wr = weakref.ref(ctx)
1513 del ctx, dummycallback
1514 gc.collect()
1515 self.assertIs(wr(), None)
1516
Christian Heimes9a5395a2013-06-17 15:44:12 +02001517 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001519 self.assertEqual(ctx.cert_store_stats(),
1520 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1521 ctx.load_cert_chain(CERTFILE)
1522 self.assertEqual(ctx.cert_store_stats(),
1523 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1524 ctx.load_verify_locations(CERTFILE)
1525 self.assertEqual(ctx.cert_store_stats(),
1526 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001527 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001528 self.assertEqual(ctx.cert_store_stats(),
1529 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1530
1531 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001532 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001533 self.assertEqual(ctx.get_ca_certs(), [])
1534 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1535 ctx.load_verify_locations(CERTFILE)
1536 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001537 # but CAFILE_CACERT is a CA cert
1538 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001539 self.assertEqual(ctx.get_ca_certs(),
1540 [{'issuer': ((('organizationName', 'Root CA'),),
1541 (('organizationalUnitName', 'http://www.cacert.org'),),
1542 (('commonName', 'CA Cert Signing Authority'),),
1543 (('emailAddress', 'support@cacert.org'),)),
1544 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1545 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1546 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001547 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001548 'subject': ((('organizationName', 'Root CA'),),
1549 (('organizationalUnitName', 'http://www.cacert.org'),),
1550 (('commonName', 'CA Cert Signing Authority'),),
1551 (('emailAddress', 'support@cacert.org'),)),
1552 'version': 3}])
1553
Martin Panterb55f8b72016-01-14 12:53:56 +00001554 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001555 pem = f.read()
1556 der = ssl.PEM_cert_to_DER_cert(pem)
1557 self.assertEqual(ctx.get_ca_certs(True), [der])
1558
Christian Heimes72d28502013-11-23 13:56:58 +01001559 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001560 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001561 ctx.load_default_certs()
1562
Christian Heimesa170fa12017-09-15 20:27:30 +02001563 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001564 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1565 ctx.load_default_certs()
1566
Christian Heimesa170fa12017-09-15 20:27:30 +02001567 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001568 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1569
Christian Heimesa170fa12017-09-15 20:27:30 +02001570 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001571 self.assertRaises(TypeError, ctx.load_default_certs, None)
1572 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1573
Benjamin Peterson91244e02014-10-03 18:17:15 -04001574 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001575 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001576 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001577 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001578 with support.EnvironmentVarGuard() as env:
1579 env["SSL_CERT_DIR"] = CAPATH
1580 env["SSL_CERT_FILE"] = CERTFILE
1581 ctx.load_default_certs()
1582 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1583
Benjamin Peterson91244e02014-10-03 18:17:15 -04001584 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001585 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001586 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001587 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001588 ctx.load_default_certs()
1589 stats = ctx.cert_store_stats()
1590
Christian Heimesa170fa12017-09-15 20:27:30 +02001591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001592 with support.EnvironmentVarGuard() as env:
1593 env["SSL_CERT_DIR"] = CAPATH
1594 env["SSL_CERT_FILE"] = CERTFILE
1595 ctx.load_default_certs()
1596 stats["x509"] += 1
1597 self.assertEqual(ctx.cert_store_stats(), stats)
1598
Christian Heimes358cfd42016-09-10 22:43:48 +02001599 def _assert_context_options(self, ctx):
1600 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1601 if OP_NO_COMPRESSION != 0:
1602 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1603 OP_NO_COMPRESSION)
1604 if OP_SINGLE_DH_USE != 0:
1605 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1606 OP_SINGLE_DH_USE)
1607 if OP_SINGLE_ECDH_USE != 0:
1608 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1609 OP_SINGLE_ECDH_USE)
1610 if OP_CIPHER_SERVER_PREFERENCE != 0:
1611 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1612 OP_CIPHER_SERVER_PREFERENCE)
1613
Christian Heimes4c05b472013-11-23 15:58:30 +01001614 def test_create_default_context(self):
1615 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001616
Christian Heimesa170fa12017-09-15 20:27:30 +02001617 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001618 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001619 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001620 self._assert_context_options(ctx)
1621
Christian Heimes4c05b472013-11-23 15:58:30 +01001622 with open(SIGNING_CA) as f:
1623 cadata = f.read()
1624 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1625 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001626 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001627 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001628 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001629
1630 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001631 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001632 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001633 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001634
Christian Heimes67986f92013-11-23 22:43:47 +01001635 def test__create_stdlib_context(self):
1636 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001638 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001639 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001640 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001641
1642 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1643 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1644 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001645 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001646
1647 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001648 cert_reqs=ssl.CERT_REQUIRED,
1649 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001650 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1651 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001652 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001653 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001654
1655 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001656 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001657 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001658 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001659
Christian Heimes1aa9a752013-12-02 02:41:19 +01001660 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001661 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001662 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001663 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001664
Christian Heimese82c0342017-09-15 20:29:57 +02001665 # Auto set CERT_REQUIRED
1666 ctx.check_hostname = True
1667 self.assertTrue(ctx.check_hostname)
1668 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1669 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001670 ctx.verify_mode = ssl.CERT_REQUIRED
1671 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001672 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001673
Christian Heimese82c0342017-09-15 20:29:57 +02001674 # Changing verify_mode does not affect check_hostname
1675 ctx.check_hostname = False
1676 ctx.verify_mode = ssl.CERT_NONE
1677 ctx.check_hostname = False
1678 self.assertFalse(ctx.check_hostname)
1679 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1680 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001681 ctx.check_hostname = True
1682 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001683 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1684
1685 ctx.check_hostname = False
1686 ctx.verify_mode = ssl.CERT_OPTIONAL
1687 ctx.check_hostname = False
1688 self.assertFalse(ctx.check_hostname)
1689 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1690 # keep CERT_OPTIONAL
1691 ctx.check_hostname = True
1692 self.assertTrue(ctx.check_hostname)
1693 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001694
1695 # Cannot set CERT_NONE with check_hostname enabled
1696 with self.assertRaises(ValueError):
1697 ctx.verify_mode = ssl.CERT_NONE
1698 ctx.check_hostname = False
1699 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001700 ctx.verify_mode = ssl.CERT_NONE
1701 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001702
Christian Heimes5fe668c2016-09-12 00:01:11 +02001703 def test_context_client_server(self):
1704 # PROTOCOL_TLS_CLIENT has sane defaults
1705 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1706 self.assertTrue(ctx.check_hostname)
1707 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1708
1709 # PROTOCOL_TLS_SERVER has different but also sane defaults
1710 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1711 self.assertFalse(ctx.check_hostname)
1712 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1713
Christian Heimes4df60f12017-09-15 20:26:05 +02001714 def test_context_custom_class(self):
1715 class MySSLSocket(ssl.SSLSocket):
1716 pass
1717
1718 class MySSLObject(ssl.SSLObject):
1719 pass
1720
1721 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1722 ctx.sslsocket_class = MySSLSocket
1723 ctx.sslobject_class = MySSLObject
1724
1725 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1726 self.assertIsInstance(sock, MySSLSocket)
1727 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1728 self.assertIsInstance(obj, MySSLObject)
1729
Christian Heimes78c7d522019-06-03 21:00:10 +02001730 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1731 def test_num_tickest(self):
1732 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1733 self.assertEqual(ctx.num_tickets, 2)
1734 ctx.num_tickets = 1
1735 self.assertEqual(ctx.num_tickets, 1)
1736 ctx.num_tickets = 0
1737 self.assertEqual(ctx.num_tickets, 0)
1738 with self.assertRaises(ValueError):
1739 ctx.num_tickets = -1
1740 with self.assertRaises(TypeError):
1741 ctx.num_tickets = None
1742
1743 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1744 self.assertEqual(ctx.num_tickets, 2)
1745 with self.assertRaises(ValueError):
1746 ctx.num_tickets = 1
1747
Antoine Pitrou152efa22010-05-16 18:19:27 +00001748
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001749class SSLErrorTests(unittest.TestCase):
1750
1751 def test_str(self):
1752 # The str() of a SSLError doesn't include the errno
1753 e = ssl.SSLError(1, "foo")
1754 self.assertEqual(str(e), "foo")
1755 self.assertEqual(e.errno, 1)
1756 # Same for a subclass
1757 e = ssl.SSLZeroReturnError(1, "foo")
1758 self.assertEqual(str(e), "foo")
1759 self.assertEqual(e.errno, 1)
1760
1761 def test_lib_reason(self):
1762 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001763 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001764 with self.assertRaises(ssl.SSLError) as cm:
1765 ctx.load_dh_params(CERTFILE)
1766 self.assertEqual(cm.exception.library, 'PEM')
1767 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1768 s = str(cm.exception)
1769 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1770
1771 def test_subclass(self):
1772 # Check that the appropriate SSLError subclass is raised
1773 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001774 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1775 ctx.check_hostname = False
1776 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001777 with socket.create_server(("127.0.0.1", 0)) as s:
1778 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001779 c.setblocking(False)
1780 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001781 with self.assertRaises(ssl.SSLWantReadError) as cm:
1782 c.do_handshake()
1783 s = str(cm.exception)
1784 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1785 # For compatibility
1786 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1787
1788
Christian Heimes61d478c2018-01-27 15:51:38 +01001789 def test_bad_server_hostname(self):
1790 ctx = ssl.create_default_context()
1791 with self.assertRaises(ValueError):
1792 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1793 server_hostname="")
1794 with self.assertRaises(ValueError):
1795 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1796 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001797 with self.assertRaises(TypeError):
1798 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1799 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001800
1801
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001802class MemoryBIOTests(unittest.TestCase):
1803
1804 def test_read_write(self):
1805 bio = ssl.MemoryBIO()
1806 bio.write(b'foo')
1807 self.assertEqual(bio.read(), b'foo')
1808 self.assertEqual(bio.read(), b'')
1809 bio.write(b'foo')
1810 bio.write(b'bar')
1811 self.assertEqual(bio.read(), b'foobar')
1812 self.assertEqual(bio.read(), b'')
1813 bio.write(b'baz')
1814 self.assertEqual(bio.read(2), b'ba')
1815 self.assertEqual(bio.read(1), b'z')
1816 self.assertEqual(bio.read(1), b'')
1817
1818 def test_eof(self):
1819 bio = ssl.MemoryBIO()
1820 self.assertFalse(bio.eof)
1821 self.assertEqual(bio.read(), b'')
1822 self.assertFalse(bio.eof)
1823 bio.write(b'foo')
1824 self.assertFalse(bio.eof)
1825 bio.write_eof()
1826 self.assertFalse(bio.eof)
1827 self.assertEqual(bio.read(2), b'fo')
1828 self.assertFalse(bio.eof)
1829 self.assertEqual(bio.read(1), b'o')
1830 self.assertTrue(bio.eof)
1831 self.assertEqual(bio.read(), b'')
1832 self.assertTrue(bio.eof)
1833
1834 def test_pending(self):
1835 bio = ssl.MemoryBIO()
1836 self.assertEqual(bio.pending, 0)
1837 bio.write(b'foo')
1838 self.assertEqual(bio.pending, 3)
1839 for i in range(3):
1840 bio.read(1)
1841 self.assertEqual(bio.pending, 3-i-1)
1842 for i in range(3):
1843 bio.write(b'x')
1844 self.assertEqual(bio.pending, i+1)
1845 bio.read()
1846 self.assertEqual(bio.pending, 0)
1847
1848 def test_buffer_types(self):
1849 bio = ssl.MemoryBIO()
1850 bio.write(b'foo')
1851 self.assertEqual(bio.read(), b'foo')
1852 bio.write(bytearray(b'bar'))
1853 self.assertEqual(bio.read(), b'bar')
1854 bio.write(memoryview(b'baz'))
1855 self.assertEqual(bio.read(), b'baz')
1856
1857 def test_error_types(self):
1858 bio = ssl.MemoryBIO()
1859 self.assertRaises(TypeError, bio.write, 'foo')
1860 self.assertRaises(TypeError, bio.write, None)
1861 self.assertRaises(TypeError, bio.write, True)
1862 self.assertRaises(TypeError, bio.write, 1)
1863
1864
Christian Heimes9d50ab52018-02-27 10:17:30 +01001865class SSLObjectTests(unittest.TestCase):
1866 def test_private_init(self):
1867 bio = ssl.MemoryBIO()
1868 with self.assertRaisesRegex(TypeError, "public constructor"):
1869 ssl.SSLObject(bio, bio)
1870
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001871 def test_unwrap(self):
1872 client_ctx, server_ctx, hostname = testing_context()
1873 c_in = ssl.MemoryBIO()
1874 c_out = ssl.MemoryBIO()
1875 s_in = ssl.MemoryBIO()
1876 s_out = ssl.MemoryBIO()
1877 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1878 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1879
1880 # Loop on the handshake for a bit to get it settled
1881 for _ in range(5):
1882 try:
1883 client.do_handshake()
1884 except ssl.SSLWantReadError:
1885 pass
1886 if c_out.pending:
1887 s_in.write(c_out.read())
1888 try:
1889 server.do_handshake()
1890 except ssl.SSLWantReadError:
1891 pass
1892 if s_out.pending:
1893 c_in.write(s_out.read())
1894 # Now the handshakes should be complete (don't raise WantReadError)
1895 client.do_handshake()
1896 server.do_handshake()
1897
1898 # Now if we unwrap one side unilaterally, it should send close-notify
1899 # and raise WantReadError:
1900 with self.assertRaises(ssl.SSLWantReadError):
1901 client.unwrap()
1902
1903 # But server.unwrap() does not raise, because it reads the client's
1904 # close-notify:
1905 s_in.write(c_out.read())
1906 server.unwrap()
1907
1908 # And now that the client gets the server's close-notify, it doesn't
1909 # raise either.
1910 c_in.write(s_out.read())
1911 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001912
Martin Panter3840b2a2016-03-27 01:53:46 +00001913class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 """Tests that connect to a simple server running in the background"""
1915
1916 def setUp(self):
1917 server = ThreadedEchoServer(SIGNED_CERTFILE)
1918 self.server_addr = (HOST, server.port)
1919 server.__enter__()
1920 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001921
Antoine Pitrou480a1242010-04-28 21:37:09 +00001922 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001923 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 cert_reqs=ssl.CERT_NONE) as s:
1925 s.connect(self.server_addr)
1926 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001927 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001928
Martin Panter3840b2a2016-03-27 01:53:46 +00001929 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001930 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001931 cert_reqs=ssl.CERT_REQUIRED,
1932 ca_certs=SIGNING_CA) as s:
1933 s.connect(self.server_addr)
1934 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001935 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001936
Martin Panter3840b2a2016-03-27 01:53:46 +00001937 def test_connect_fail(self):
1938 # This should fail because we have no verification certs. Connection
1939 # failure crashes ThreadedEchoServer, so run this in an independent
1940 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001941 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 cert_reqs=ssl.CERT_REQUIRED)
1943 self.addCleanup(s.close)
1944 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1945 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001946
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001947 def test_connect_ex(self):
1948 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001949 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001950 cert_reqs=ssl.CERT_REQUIRED,
1951 ca_certs=SIGNING_CA)
1952 self.addCleanup(s.close)
1953 self.assertEqual(0, s.connect_ex(self.server_addr))
1954 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001955
1956 def test_non_blocking_connect_ex(self):
1957 # Issue #11326: non-blocking connect_ex() should allow handshake
1958 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001959 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 cert_reqs=ssl.CERT_REQUIRED,
1961 ca_certs=SIGNING_CA,
1962 do_handshake_on_connect=False)
1963 self.addCleanup(s.close)
1964 s.setblocking(False)
1965 rc = s.connect_ex(self.server_addr)
1966 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1967 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1968 # Wait for connect to finish
1969 select.select([], [s], [], 5.0)
1970 # Non-blocking handshake
1971 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001972 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001973 s.do_handshake()
1974 break
1975 except ssl.SSLWantReadError:
1976 select.select([s], [], [], 5.0)
1977 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001978 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001979 # SSL established
1980 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001981
Antoine Pitrou152efa22010-05-16 18:19:27 +00001982 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001983 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001984 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1986 s.connect(self.server_addr)
1987 self.assertEqual({}, s.getpeercert())
1988 # Same with a server hostname
1989 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1990 server_hostname="dummy") as s:
1991 s.connect(self.server_addr)
1992 ctx.verify_mode = ssl.CERT_REQUIRED
1993 # This should succeed because we specify the root cert
1994 ctx.load_verify_locations(SIGNING_CA)
1995 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1996 s.connect(self.server_addr)
1997 cert = s.getpeercert()
1998 self.assertTrue(cert)
1999
2000 def test_connect_with_context_fail(self):
2001 # This should fail because we have no verification certs. Connection
2002 # failure crashes ThreadedEchoServer, so run this in an independent
2003 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002005 ctx.verify_mode = ssl.CERT_REQUIRED
2006 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2007 self.addCleanup(s.close)
2008 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2009 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002010
2011 def test_connect_capath(self):
2012 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002013 # NOTE: the subject hashing algorithm has been changed between
2014 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2015 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002016 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002018 ctx.verify_mode = ssl.CERT_REQUIRED
2019 ctx.load_verify_locations(capath=CAPATH)
2020 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2021 s.connect(self.server_addr)
2022 cert = s.getpeercert()
2023 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002024
Martin Panter3840b2a2016-03-27 01:53:46 +00002025 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002026 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002027 ctx.verify_mode = ssl.CERT_REQUIRED
2028 ctx.load_verify_locations(capath=BYTES_CAPATH)
2029 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2030 s.connect(self.server_addr)
2031 cert = s.getpeercert()
2032 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002033
Christian Heimesefff7062013-11-21 03:35:02 +01002034 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002035 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002036 pem = f.read()
2037 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002038 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002039 ctx.verify_mode = ssl.CERT_REQUIRED
2040 ctx.load_verify_locations(cadata=pem)
2041 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2042 s.connect(self.server_addr)
2043 cert = s.getpeercert()
2044 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002045
Martin Panter3840b2a2016-03-27 01:53:46 +00002046 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002048 ctx.verify_mode = ssl.CERT_REQUIRED
2049 ctx.load_verify_locations(cadata=der)
2050 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2051 s.connect(self.server_addr)
2052 cert = s.getpeercert()
2053 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002054
Antoine Pitroue3220242010-04-24 11:13:53 +00002055 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2056 def test_makefile_close(self):
2057 # Issue #5238: creating a file-like object with makefile() shouldn't
2058 # delay closing the underlying "real socket" (here tested with its
2059 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002060 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002061 ss.connect(self.server_addr)
2062 fd = ss.fileno()
2063 f = ss.makefile()
2064 f.close()
2065 # The fd is still open
2066 os.read(fd, 0)
2067 # Closing the SSL socket should close the fd too
2068 ss.close()
2069 gc.collect()
2070 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002071 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002072 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002073
Antoine Pitrou480a1242010-04-28 21:37:09 +00002074 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002075 s = socket.socket(socket.AF_INET)
2076 s.connect(self.server_addr)
2077 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002078 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002079 cert_reqs=ssl.CERT_NONE,
2080 do_handshake_on_connect=False)
2081 self.addCleanup(s.close)
2082 count = 0
2083 while True:
2084 try:
2085 count += 1
2086 s.do_handshake()
2087 break
2088 except ssl.SSLWantReadError:
2089 select.select([s], [], [])
2090 except ssl.SSLWantWriteError:
2091 select.select([], [s], [])
2092 if support.verbose:
2093 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002094
Antoine Pitrou480a1242010-04-28 21:37:09 +00002095 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002096 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002097
Martin Panter3840b2a2016-03-27 01:53:46 +00002098 def test_get_server_certificate_fail(self):
2099 # Connection failure crashes ThreadedEchoServer, so run this in an
2100 # independent test method
2101 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002102
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002103 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002104 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002105 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2106 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002107 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002108 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2109 s.connect(self.server_addr)
2110 # Error checking can happen at instantiation or when connecting
2111 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2112 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002113 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002114 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2115 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002116
Christian Heimes9a5395a2013-06-17 15:44:12 +02002117 def test_get_ca_certs_capath(self):
2118 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002119 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002120 ctx.load_verify_locations(capath=CAPATH)
2121 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002122 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2123 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002124 s.connect(self.server_addr)
2125 cert = s.getpeercert()
2126 self.assertTrue(cert)
2127 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002128
Christian Heimes575596e2013-12-15 21:49:17 +01002129 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002130 def test_context_setget(self):
2131 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002132 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2133 ctx1.load_verify_locations(capath=CAPATH)
2134 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2135 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002136 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002137 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002138 ss.connect(self.server_addr)
2139 self.assertIs(ss.context, ctx1)
2140 self.assertIs(ss._sslobj.context, ctx1)
2141 ss.context = ctx2
2142 self.assertIs(ss.context, ctx2)
2143 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002144
2145 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2146 # A simple IO loop. Call func(*args) depending on the error we get
2147 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2148 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002149 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002150 count = 0
2151 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002152 if time.monotonic() > deadline:
2153 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002154 errno = None
2155 count += 1
2156 try:
2157 ret = func(*args)
2158 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002159 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002160 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002161 raise
2162 errno = e.errno
2163 # Get any data from the outgoing BIO irrespective of any error, and
2164 # send it to the socket.
2165 buf = outgoing.read()
2166 sock.sendall(buf)
2167 # If there's no error, we're done. For WANT_READ, we need to get
2168 # data from the socket and put it in the incoming BIO.
2169 if errno is None:
2170 break
2171 elif errno == ssl.SSL_ERROR_WANT_READ:
2172 buf = sock.recv(32768)
2173 if buf:
2174 incoming.write(buf)
2175 else:
2176 incoming.write_eof()
2177 if support.verbose:
2178 sys.stdout.write("Needed %d calls to complete %s().\n"
2179 % (count, func.__name__))
2180 return ret
2181
Martin Panter3840b2a2016-03-27 01:53:46 +00002182 def test_bio_handshake(self):
2183 sock = socket.socket(socket.AF_INET)
2184 self.addCleanup(sock.close)
2185 sock.connect(self.server_addr)
2186 incoming = ssl.MemoryBIO()
2187 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2189 self.assertTrue(ctx.check_hostname)
2190 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002191 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002192 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2193 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002194 self.assertIs(sslobj._sslobj.owner, sslobj)
2195 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002196 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002197 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002198 self.assertRaises(ValueError, sslobj.getpeercert)
2199 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2200 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2201 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2202 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002203 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002204 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002205 self.assertTrue(sslobj.getpeercert())
2206 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2207 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2208 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002209 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002210 except ssl.SSLSyscallError:
2211 # If the server shuts down the TCP connection without sending a
2212 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2213 pass
2214 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2215
2216 def test_bio_read_write_data(self):
2217 sock = socket.socket(socket.AF_INET)
2218 self.addCleanup(sock.close)
2219 sock.connect(self.server_addr)
2220 incoming = ssl.MemoryBIO()
2221 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002222 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002223 ctx.verify_mode = ssl.CERT_NONE
2224 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2225 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2226 req = b'FOO\n'
2227 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2228 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2229 self.assertEqual(buf, b'foo\n')
2230 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002231
2232
Martin Panter3840b2a2016-03-27 01:53:46 +00002233class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002234
Martin Panter3840b2a2016-03-27 01:53:46 +00002235 def test_timeout_connect_ex(self):
2236 # Issue #12065: on a timeout, connect_ex() should return the original
2237 # errno (mimicking the behaviour of non-SSL sockets).
2238 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002239 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002240 cert_reqs=ssl.CERT_REQUIRED,
2241 do_handshake_on_connect=False)
2242 self.addCleanup(s.close)
2243 s.settimeout(0.0000001)
2244 rc = s.connect_ex((REMOTE_HOST, 443))
2245 if rc == 0:
2246 self.skipTest("REMOTE_HOST responded too quickly")
2247 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2248
2249 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2250 def test_get_server_certificate_ipv6(self):
2251 with support.transient_internet('ipv6.google.com'):
2252 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2253 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2254
Martin Panter3840b2a2016-03-27 01:53:46 +00002255
2256def _test_get_server_certificate(test, host, port, cert=None):
2257 pem = ssl.get_server_certificate((host, port))
2258 if not pem:
2259 test.fail("No server certificate on %s:%s!" % (host, port))
2260
2261 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2262 if not pem:
2263 test.fail("No server certificate on %s:%s!" % (host, port))
2264 if support.verbose:
2265 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2266
2267def _test_get_server_certificate_fail(test, host, port):
2268 try:
2269 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2270 except ssl.SSLError as x:
2271 #should fail
2272 if support.verbose:
2273 sys.stdout.write("%s\n" % x)
2274 else:
2275 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2276
2277
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002278from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002279
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002281
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002282 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002283
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002284 """A mildly complicated class, because we want it to work both
2285 with and without the SSL wrapper around the socket connection, so
2286 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002288 def __init__(self, server, connsock, addr):
2289 self.server = server
2290 self.running = False
2291 self.sock = connsock
2292 self.addr = addr
2293 self.sock.setblocking(1)
2294 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002295 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002296 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002297
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002298 def wrap_conn(self):
2299 try:
2300 self.sslconn = self.server.context.wrap_socket(
2301 self.sock, server_side=True)
2302 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2303 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002304 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002305 # We treat ConnectionResetError as though it were an
2306 # SSLError - OpenSSL on Ubuntu abruptly closes the
2307 # connection when asked to use an unsupported protocol.
2308 #
Christian Heimes529525f2018-05-23 22:24:45 +02002309 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2310 # tries to send session tickets after handshake.
2311 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002312 #
2313 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2314 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002315 self.server.conn_errors.append(str(e))
2316 if self.server.chatty:
2317 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2318 self.running = False
2319 self.close()
2320 return False
2321 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002322 # OSError may occur with wrong protocols, e.g. both
2323 # sides use PROTOCOL_TLS_SERVER.
2324 #
2325 # XXX Various errors can have happened here, for example
2326 # a mismatching protocol version, an invalid certificate,
2327 # or a low-level bug. This should be made more discriminating.
2328 #
2329 # bpo-31323: Store the exception as string to prevent
2330 # a reference leak: server -> conn_errors -> exception
2331 # -> traceback -> self (ConnectionHandler) -> server
2332 self.server.conn_errors.append(str(e))
2333 if self.server.chatty:
2334 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2335 self.running = False
2336 self.server.stop()
2337 self.close()
2338 return False
2339 else:
2340 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2341 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2342 cert = self.sslconn.getpeercert()
2343 if support.verbose and self.server.chatty:
2344 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2345 cert_binary = self.sslconn.getpeercert(True)
2346 if support.verbose and self.server.chatty:
2347 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2348 cipher = self.sslconn.cipher()
2349 if support.verbose and self.server.chatty:
2350 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2351 sys.stdout.write(" server: selected protocol is now "
2352 + str(self.sslconn.selected_npn_protocol()) + "\n")
2353 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002354
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002355 def read(self):
2356 if self.sslconn:
2357 return self.sslconn.read()
2358 else:
2359 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002360
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002361 def write(self, bytes):
2362 if self.sslconn:
2363 return self.sslconn.write(bytes)
2364 else:
2365 return self.sock.send(bytes)
2366
2367 def close(self):
2368 if self.sslconn:
2369 self.sslconn.close()
2370 else:
2371 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002372
Antoine Pitrou480a1242010-04-28 21:37:09 +00002373 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002374 self.running = True
2375 if not self.server.starttls_server:
2376 if not self.wrap_conn():
2377 return
2378 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002379 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002380 msg = self.read()
2381 stripped = msg.strip()
2382 if not stripped:
2383 # eof, so quit this handler
2384 self.running = False
2385 try:
2386 self.sock = self.sslconn.unwrap()
2387 except OSError:
2388 # Many tests shut the TCP connection down
2389 # without an SSL shutdown. This causes
2390 # unwrap() to raise OSError with errno=0!
2391 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002392 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393 self.sslconn = None
2394 self.close()
2395 elif stripped == b'over':
2396 if support.verbose and self.server.connectionchatty:
2397 sys.stdout.write(" server: client closed connection\n")
2398 self.close()
2399 return
2400 elif (self.server.starttls_server and
2401 stripped == b'STARTTLS'):
2402 if support.verbose and self.server.connectionchatty:
2403 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2404 self.write(b"OK\n")
2405 if not self.wrap_conn():
2406 return
2407 elif (self.server.starttls_server and self.sslconn
2408 and stripped == b'ENDTLS'):
2409 if support.verbose and self.server.connectionchatty:
2410 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2411 self.write(b"OK\n")
2412 self.sock = self.sslconn.unwrap()
2413 self.sslconn = None
2414 if support.verbose and self.server.connectionchatty:
2415 sys.stdout.write(" server: connection is now unencrypted...\n")
2416 elif stripped == b'CB tls-unique':
2417 if support.verbose and self.server.connectionchatty:
2418 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2419 data = self.sslconn.get_channel_binding("tls-unique")
2420 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002421 elif stripped == b'PHA':
2422 if support.verbose and self.server.connectionchatty:
2423 sys.stdout.write(" server: initiating post handshake auth\n")
2424 try:
2425 self.sslconn.verify_client_post_handshake()
2426 except ssl.SSLError as e:
2427 self.write(repr(e).encode("us-ascii") + b"\n")
2428 else:
2429 self.write(b"OK\n")
2430 elif stripped == b'HASCERT':
2431 if self.sslconn.getpeercert() is not None:
2432 self.write(b'TRUE\n')
2433 else:
2434 self.write(b'FALSE\n')
2435 elif stripped == b'GETCERT':
2436 cert = self.sslconn.getpeercert()
2437 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002438 else:
2439 if (support.verbose and
2440 self.server.connectionchatty):
2441 ctype = (self.sslconn and "encrypted") or "unencrypted"
2442 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2443 % (msg, ctype, msg.lower(), ctype))
2444 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002445 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002446 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2447 # when connection is not shut down gracefully.
2448 if self.server.chatty and support.verbose:
2449 sys.stdout.write(
2450 " Connection reset by peer: {}\n".format(
2451 self.addr)
2452 )
2453 self.close()
2454 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002455 except ssl.SSLError as err:
2456 # On Windows sometimes test_pha_required_nocert receives the
2457 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2458 # before the 'tlsv13 alert certificate required' exception.
2459 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2460 # is received test_pha_required_nocert fails with ConnectionResetError
2461 # because the underlying socket is closed
2462 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2463 if self.server.chatty and support.verbose:
2464 sys.stdout.write(err.args[1])
2465 # test_pha_required_nocert is expecting this exception
2466 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002467 except OSError:
2468 if self.server.chatty:
2469 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002470 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002472
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002473 # normally, we'd just stop here, but for the test
2474 # harness, we want to stop the server
2475 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002476
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002477 def __init__(self, certificate=None, ssl_version=None,
2478 certreqs=None, cacerts=None,
2479 chatty=True, connectionchatty=False, starttls_server=False,
2480 npn_protocols=None, alpn_protocols=None,
2481 ciphers=None, context=None):
2482 if context:
2483 self.context = context
2484 else:
2485 self.context = ssl.SSLContext(ssl_version
2486 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002487 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002488 self.context.verify_mode = (certreqs if certreqs is not None
2489 else ssl.CERT_NONE)
2490 if cacerts:
2491 self.context.load_verify_locations(cacerts)
2492 if certificate:
2493 self.context.load_cert_chain(certificate)
2494 if npn_protocols:
2495 self.context.set_npn_protocols(npn_protocols)
2496 if alpn_protocols:
2497 self.context.set_alpn_protocols(alpn_protocols)
2498 if ciphers:
2499 self.context.set_ciphers(ciphers)
2500 self.chatty = chatty
2501 self.connectionchatty = connectionchatty
2502 self.starttls_server = starttls_server
2503 self.sock = socket.socket()
2504 self.port = support.bind_port(self.sock)
2505 self.flag = None
2506 self.active = False
2507 self.selected_npn_protocols = []
2508 self.selected_alpn_protocols = []
2509 self.shared_ciphers = []
2510 self.conn_errors = []
2511 threading.Thread.__init__(self)
2512 self.daemon = True
2513
2514 def __enter__(self):
2515 self.start(threading.Event())
2516 self.flag.wait()
2517 return self
2518
2519 def __exit__(self, *args):
2520 self.stop()
2521 self.join()
2522
2523 def start(self, flag=None):
2524 self.flag = flag
2525 threading.Thread.start(self)
2526
2527 def run(self):
2528 self.sock.settimeout(0.05)
2529 self.sock.listen()
2530 self.active = True
2531 if self.flag:
2532 # signal an event
2533 self.flag.set()
2534 while self.active:
2535 try:
2536 newconn, connaddr = self.sock.accept()
2537 if support.verbose and self.chatty:
2538 sys.stdout.write(' server: new connection from '
2539 + repr(connaddr) + '\n')
2540 handler = self.ConnectionHandler(self, newconn, connaddr)
2541 handler.start()
2542 handler.join()
2543 except socket.timeout:
2544 pass
2545 except KeyboardInterrupt:
2546 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002547 except BaseException as e:
2548 if support.verbose and self.chatty:
2549 sys.stdout.write(
2550 ' connection handling failed: ' + repr(e) + '\n')
2551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 self.sock.close()
2553
2554 def stop(self):
2555 self.active = False
2556
2557class AsyncoreEchoServer(threading.Thread):
2558
2559 # this one's based on asyncore.dispatcher
2560
2561 class EchoServer (asyncore.dispatcher):
2562
2563 class ConnectionHandler(asyncore.dispatcher_with_send):
2564
2565 def __init__(self, conn, certfile):
2566 self.socket = test_wrap_socket(conn, server_side=True,
2567 certfile=certfile,
2568 do_handshake_on_connect=False)
2569 asyncore.dispatcher_with_send.__init__(self, self.socket)
2570 self._ssl_accepting = True
2571 self._do_ssl_handshake()
2572
2573 def readable(self):
2574 if isinstance(self.socket, ssl.SSLSocket):
2575 while self.socket.pending() > 0:
2576 self.handle_read_event()
2577 return True
2578
2579 def _do_ssl_handshake(self):
2580 try:
2581 self.socket.do_handshake()
2582 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2583 return
2584 except ssl.SSLEOFError:
2585 return self.handle_close()
2586 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002587 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002588 except OSError as err:
2589 if err.args[0] == errno.ECONNABORTED:
2590 return self.handle_close()
2591 else:
2592 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002593
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 def handle_read(self):
2595 if self._ssl_accepting:
2596 self._do_ssl_handshake()
2597 else:
2598 data = self.recv(1024)
2599 if support.verbose:
2600 sys.stdout.write(" server: read %s from client\n" % repr(data))
2601 if not data:
2602 self.close()
2603 else:
2604 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002605
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002606 def handle_close(self):
2607 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002608 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002610
2611 def handle_error(self):
2612 raise
2613
Trent Nelson78520002008-04-10 20:54:35 +00002614 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002615 self.certfile = certfile
2616 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2617 self.port = support.bind_port(sock, '')
2618 asyncore.dispatcher.__init__(self, sock)
2619 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002620
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002621 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002622 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002623 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2624 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002625
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002626 def handle_error(self):
2627 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002628
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002629 def __init__(self, certfile):
2630 self.flag = None
2631 self.active = False
2632 self.server = self.EchoServer(certfile)
2633 self.port = self.server.port
2634 threading.Thread.__init__(self)
2635 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002636
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637 def __str__(self):
2638 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002639
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 def __enter__(self):
2641 self.start(threading.Event())
2642 self.flag.wait()
2643 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002644
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002645 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002646 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002647 sys.stdout.write(" cleanup: stopping server.\n")
2648 self.stop()
2649 if support.verbose:
2650 sys.stdout.write(" cleanup: joining server thread.\n")
2651 self.join()
2652 if support.verbose:
2653 sys.stdout.write(" cleanup: successfully joined.\n")
2654 # make sure that ConnectionHandler is removed from socket_map
2655 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002656
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002657 def start (self, flag=None):
2658 self.flag = flag
2659 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002661 def run(self):
2662 self.active = True
2663 if self.flag:
2664 self.flag.set()
2665 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002666 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002667 asyncore.loop(1)
2668 except:
2669 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002670
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671 def stop(self):
2672 self.active = False
2673 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002674
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675def server_params_test(client_context, server_context, indata=b"FOO\n",
2676 chatty=True, connectionchatty=False, sni_name=None,
2677 session=None):
2678 """
2679 Launch a server, connect a client to it and try various reads
2680 and writes.
2681 """
2682 stats = {}
2683 server = ThreadedEchoServer(context=server_context,
2684 chatty=chatty,
2685 connectionchatty=False)
2686 with server:
2687 with client_context.wrap_socket(socket.socket(),
2688 server_hostname=sni_name, session=session) as s:
2689 s.connect((HOST, server.port))
2690 for arg in [indata, bytearray(indata), memoryview(indata)]:
2691 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002692 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002693 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002694 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002696 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 if connectionchatty:
2698 if support.verbose:
2699 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002700 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002702 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2703 % (outdata[:20], len(outdata),
2704 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 s.write(b"over\n")
2706 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002707 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002708 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709 stats.update({
2710 'compression': s.compression(),
2711 'cipher': s.cipher(),
2712 'peercert': s.getpeercert(),
2713 'client_alpn_protocol': s.selected_alpn_protocol(),
2714 'client_npn_protocol': s.selected_npn_protocol(),
2715 'version': s.version(),
2716 'session_reused': s.session_reused,
2717 'session': s.session,
2718 })
2719 s.close()
2720 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2721 stats['server_npn_protocols'] = server.selected_npn_protocols
2722 stats['server_shared_ciphers'] = server.shared_ciphers
2723 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002724
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002725def try_protocol_combo(server_protocol, client_protocol, expect_success,
2726 certsreqs=None, server_options=0, client_options=0):
2727 """
2728 Try to SSL-connect using *client_protocol* to *server_protocol*.
2729 If *expect_success* is true, assert that the connection succeeds,
2730 if it's false, assert that the connection fails.
2731 Also, if *expect_success* is a string, assert that it is the protocol
2732 version actually used by the connection.
2733 """
2734 if certsreqs is None:
2735 certsreqs = ssl.CERT_NONE
2736 certtype = {
2737 ssl.CERT_NONE: "CERT_NONE",
2738 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2739 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2740 }[certsreqs]
2741 if support.verbose:
2742 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2743 sys.stdout.write(formatstr %
2744 (ssl.get_protocol_name(client_protocol),
2745 ssl.get_protocol_name(server_protocol),
2746 certtype))
2747 client_context = ssl.SSLContext(client_protocol)
2748 client_context.options |= client_options
2749 server_context = ssl.SSLContext(server_protocol)
2750 server_context.options |= server_options
2751
Victor Stinner3ef63442019-02-19 18:06:03 +01002752 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2753 if (min_version is not None
2754 # SSLContext.minimum_version is only available on recent OpenSSL
2755 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2756 and hasattr(server_context, 'minimum_version')
2757 and server_protocol == ssl.PROTOCOL_TLS
2758 and server_context.minimum_version > min_version):
2759 # If OpenSSL configuration is strict and requires more recent TLS
2760 # version, we have to change the minimum to test old TLS versions.
2761 server_context.minimum_version = min_version
2762
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002763 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2764 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2765 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002766 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002767 client_context.set_ciphers("ALL")
2768
2769 for ctx in (client_context, server_context):
2770 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002771 ctx.load_cert_chain(SIGNED_CERTFILE)
2772 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002773 try:
2774 stats = server_params_test(client_context, server_context,
2775 chatty=False, connectionchatty=False)
2776 # Protocol mismatch can result in either an SSLError, or a
2777 # "Connection reset by peer" error.
2778 except ssl.SSLError:
2779 if expect_success:
2780 raise
2781 except OSError as e:
2782 if expect_success or e.errno != errno.ECONNRESET:
2783 raise
2784 else:
2785 if not expect_success:
2786 raise AssertionError(
2787 "Client protocol %s succeeded with server protocol %s!"
2788 % (ssl.get_protocol_name(client_protocol),
2789 ssl.get_protocol_name(server_protocol)))
2790 elif (expect_success is not True
2791 and expect_success != stats['version']):
2792 raise AssertionError("version mismatch: expected %r, got %r"
2793 % (expect_success, stats['version']))
2794
2795
2796class ThreadedTests(unittest.TestCase):
2797
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002798 def test_echo(self):
2799 """Basic test of an SSL client connecting to a server"""
2800 if support.verbose:
2801 sys.stdout.write("\n")
2802 for protocol in PROTOCOLS:
2803 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2804 continue
Christian Heimes19311322019-09-26 22:53:09 +02002805 if not has_tls_protocol(protocol):
2806 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002807 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2808 context = ssl.SSLContext(protocol)
2809 context.load_cert_chain(CERTFILE)
2810 server_params_test(context, context,
2811 chatty=True, connectionchatty=True)
2812
Christian Heimesa170fa12017-09-15 20:27:30 +02002813 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002814
2815 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2816 server_params_test(client_context=client_context,
2817 server_context=server_context,
2818 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002819 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002820
2821 client_context.check_hostname = False
2822 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2823 with self.assertRaises(ssl.SSLError) as e:
2824 server_params_test(client_context=server_context,
2825 server_context=client_context,
2826 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002827 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002828 self.assertIn('called a function you should not call',
2829 str(e.exception))
2830
2831 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2832 with self.assertRaises(ssl.SSLError) as e:
2833 server_params_test(client_context=server_context,
2834 server_context=server_context,
2835 chatty=True, connectionchatty=True)
2836 self.assertIn('called a function you should not call',
2837 str(e.exception))
2838
2839 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2840 with self.assertRaises(ssl.SSLError) as e:
2841 server_params_test(client_context=server_context,
2842 server_context=client_context,
2843 chatty=True, connectionchatty=True)
2844 self.assertIn('called a function you should not call',
2845 str(e.exception))
2846
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002847 def test_getpeercert(self):
2848 if support.verbose:
2849 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002850
2851 client_context, server_context, hostname = testing_context()
2852 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002854 with client_context.wrap_socket(socket.socket(),
2855 do_handshake_on_connect=False,
2856 server_hostname=hostname) as s:
2857 s.connect((HOST, server.port))
2858 # getpeercert() raise ValueError while the handshake isn't
2859 # done.
2860 with self.assertRaises(ValueError):
2861 s.getpeercert()
2862 s.do_handshake()
2863 cert = s.getpeercert()
2864 self.assertTrue(cert, "Can't get peer certificate.")
2865 cipher = s.cipher()
2866 if support.verbose:
2867 sys.stdout.write(pprint.pformat(cert) + '\n')
2868 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2869 if 'subject' not in cert:
2870 self.fail("No subject field in certificate: %s." %
2871 pprint.pformat(cert))
2872 if ((('organizationName', 'Python Software Foundation'),)
2873 not in cert['subject']):
2874 self.fail(
2875 "Missing or invalid 'organizationName' field in certificate subject; "
2876 "should be 'Python Software Foundation'.")
2877 self.assertIn('notBefore', cert)
2878 self.assertIn('notAfter', cert)
2879 before = ssl.cert_time_to_seconds(cert['notBefore'])
2880 after = ssl.cert_time_to_seconds(cert['notAfter'])
2881 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002882
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002883 @unittest.skipUnless(have_verify_flags(),
2884 "verify_flags need OpenSSL > 0.9.8")
2885 def test_crl_check(self):
2886 if support.verbose:
2887 sys.stdout.write("\n")
2888
Christian Heimesa170fa12017-09-15 20:27:30 +02002889 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002890
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002891 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002892 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893
2894 # VERIFY_DEFAULT should pass
2895 server = ThreadedEchoServer(context=server_context, chatty=True)
2896 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002897 with client_context.wrap_socket(socket.socket(),
2898 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002899 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002900 cert = s.getpeercert()
2901 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002902
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002903 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002904 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002905
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906 server = ThreadedEchoServer(context=server_context, chatty=True)
2907 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002908 with client_context.wrap_socket(socket.socket(),
2909 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002910 with self.assertRaisesRegex(ssl.SSLError,
2911 "certificate verify failed"):
2912 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002913
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002914 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002915 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002916
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002917 server = ThreadedEchoServer(context=server_context, chatty=True)
2918 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002919 with client_context.wrap_socket(socket.socket(),
2920 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002921 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002922 cert = s.getpeercert()
2923 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002924
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002925 def test_check_hostname(self):
2926 if support.verbose:
2927 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002928
Christian Heimesa170fa12017-09-15 20:27:30 +02002929 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002930
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 # correct hostname should verify
2932 server = ThreadedEchoServer(context=server_context, chatty=True)
2933 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002934 with client_context.wrap_socket(socket.socket(),
2935 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002936 s.connect((HOST, server.port))
2937 cert = s.getpeercert()
2938 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002939
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002940 # incorrect hostname should raise an exception
2941 server = ThreadedEchoServer(context=server_context, chatty=True)
2942 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002943 with client_context.wrap_socket(socket.socket(),
2944 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002945 with self.assertRaisesRegex(
2946 ssl.CertificateError,
2947 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002949
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002950 # missing server_hostname arg should cause an exception, too
2951 server = ThreadedEchoServer(context=server_context, chatty=True)
2952 with server:
2953 with socket.socket() as s:
2954 with self.assertRaisesRegex(ValueError,
2955 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002956 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002957
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002958 def test_ecc_cert(self):
2959 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2960 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002961 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002962 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2963
2964 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2965 # load ECC cert
2966 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2967
2968 # correct hostname should verify
2969 server = ThreadedEchoServer(context=server_context, chatty=True)
2970 with server:
2971 with client_context.wrap_socket(socket.socket(),
2972 server_hostname=hostname) as s:
2973 s.connect((HOST, server.port))
2974 cert = s.getpeercert()
2975 self.assertTrue(cert, "Can't get peer certificate.")
2976 cipher = s.cipher()[0].split('-')
2977 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2978
2979 def test_dual_rsa_ecc(self):
2980 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2981 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002982 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2983 # algorithms.
2984 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002985 # only ECDSA certs
2986 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2987 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2988
2989 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2990 # load ECC and RSA key/cert pairs
2991 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2992 server_context.load_cert_chain(SIGNED_CERTFILE)
2993
2994 # correct hostname should verify
2995 server = ThreadedEchoServer(context=server_context, chatty=True)
2996 with server:
2997 with client_context.wrap_socket(socket.socket(),
2998 server_hostname=hostname) as s:
2999 s.connect((HOST, server.port))
3000 cert = s.getpeercert()
3001 self.assertTrue(cert, "Can't get peer certificate.")
3002 cipher = s.cipher()[0].split('-')
3003 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3004
Christian Heimes66e57422018-01-29 14:25:13 +01003005 def test_check_hostname_idn(self):
3006 if support.verbose:
3007 sys.stdout.write("\n")
3008
Christian Heimes11a14932018-02-24 02:35:08 +01003009 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003010 server_context.load_cert_chain(IDNSANSFILE)
3011
Christian Heimes11a14932018-02-24 02:35:08 +01003012 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003013 context.verify_mode = ssl.CERT_REQUIRED
3014 context.check_hostname = True
3015 context.load_verify_locations(SIGNING_CA)
3016
3017 # correct hostname should verify, when specified in several
3018 # different ways
3019 idn_hostnames = [
3020 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003021 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003022 ('xn--knig-5qa.idn.pythontest.net',
3023 'xn--knig-5qa.idn.pythontest.net'),
3024 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003025 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003026
3027 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003028 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003029 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3030 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3031 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003032 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3033
3034 # ('königsgäßchen.idna2008.pythontest.net',
3035 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3036 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3037 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3038 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3039 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3040
Christian Heimes66e57422018-01-29 14:25:13 +01003041 ]
3042 for server_hostname, expected_hostname in idn_hostnames:
3043 server = ThreadedEchoServer(context=server_context, chatty=True)
3044 with server:
3045 with context.wrap_socket(socket.socket(),
3046 server_hostname=server_hostname) as s:
3047 self.assertEqual(s.server_hostname, expected_hostname)
3048 s.connect((HOST, server.port))
3049 cert = s.getpeercert()
3050 self.assertEqual(s.server_hostname, expected_hostname)
3051 self.assertTrue(cert, "Can't get peer certificate.")
3052
Christian Heimes66e57422018-01-29 14:25:13 +01003053 # incorrect hostname should raise an exception
3054 server = ThreadedEchoServer(context=server_context, chatty=True)
3055 with server:
3056 with context.wrap_socket(socket.socket(),
3057 server_hostname="python.example.org") as s:
3058 with self.assertRaises(ssl.CertificateError):
3059 s.connect((HOST, server.port))
3060
Christian Heimes529525f2018-05-23 22:24:45 +02003061 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003062 """Connecting when the server rejects the client's certificate
3063
3064 Launch a server with CERT_REQUIRED, and check that trying to
3065 connect to it with a wrong client certificate fails.
3066 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003067 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003068 # load client cert that is not signed by trusted CA
3069 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003070 # require TLS client authentication
3071 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003072 # TLS 1.3 has different handshake
3073 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003074
3075 server = ThreadedEchoServer(
3076 context=server_context, chatty=True, connectionchatty=True,
3077 )
3078
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003079 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003080 client_context.wrap_socket(socket.socket(),
3081 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003082 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003083 # Expect either an SSL error about the server rejecting
3084 # the connection, or a low-level connection reset (which
3085 # sometimes happens on Windows)
3086 s.connect((HOST, server.port))
3087 except ssl.SSLError as e:
3088 if support.verbose:
3089 sys.stdout.write("\nSSLError is %r\n" % e)
3090 except OSError as e:
3091 if e.errno != errno.ECONNRESET:
3092 raise
3093 if support.verbose:
3094 sys.stdout.write("\nsocket.error is %r\n" % e)
3095 else:
3096 self.fail("Use of invalid cert should have failed!")
3097
Christian Heimes19311322019-09-26 22:53:09 +02003098 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003099 def test_wrong_cert_tls13(self):
3100 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003101 # load client cert that is not signed by trusted CA
3102 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003103 server_context.verify_mode = ssl.CERT_REQUIRED
3104 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3105 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3106
3107 server = ThreadedEchoServer(
3108 context=server_context, chatty=True, connectionchatty=True,
3109 )
3110 with server, \
3111 client_context.wrap_socket(socket.socket(),
3112 server_hostname=hostname) as s:
3113 # TLS 1.3 perform client cert exchange after handshake
3114 s.connect((HOST, server.port))
3115 try:
3116 s.write(b'data')
3117 s.read(4)
3118 except ssl.SSLError as e:
3119 if support.verbose:
3120 sys.stdout.write("\nSSLError is %r\n" % e)
3121 except OSError as e:
3122 if e.errno != errno.ECONNRESET:
3123 raise
3124 if support.verbose:
3125 sys.stdout.write("\nsocket.error is %r\n" % e)
3126 else:
3127 self.fail("Use of invalid cert should have failed!")
3128
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003129 def test_rude_shutdown(self):
3130 """A brutal shutdown of an SSL server should raise an OSError
3131 in the client when attempting handshake.
3132 """
3133 listener_ready = threading.Event()
3134 listener_gone = threading.Event()
3135
3136 s = socket.socket()
3137 port = support.bind_port(s, HOST)
3138
3139 # `listener` runs in a thread. It sits in an accept() until
3140 # the main thread connects. Then it rudely closes the socket,
3141 # and sets Event `listener_gone` to let the main thread know
3142 # the socket is gone.
3143 def listener():
3144 s.listen()
3145 listener_ready.set()
3146 newsock, addr = s.accept()
3147 newsock.close()
3148 s.close()
3149 listener_gone.set()
3150
3151 def connector():
3152 listener_ready.wait()
3153 with socket.socket() as c:
3154 c.connect((HOST, port))
3155 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003156 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 ssl_sock = test_wrap_socket(c)
3158 except OSError:
3159 pass
3160 else:
3161 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003162
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 t = threading.Thread(target=listener)
3164 t.start()
3165 try:
3166 connector()
3167 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003168 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003169
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003170 def test_ssl_cert_verify_error(self):
3171 if support.verbose:
3172 sys.stdout.write("\n")
3173
3174 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3175 server_context.load_cert_chain(SIGNED_CERTFILE)
3176
3177 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3178
3179 server = ThreadedEchoServer(context=server_context, chatty=True)
3180 with server:
3181 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003182 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003183 try:
3184 s.connect((HOST, server.port))
3185 except ssl.SSLError as e:
3186 msg = 'unable to get local issuer certificate'
3187 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3188 self.assertEqual(e.verify_code, 20)
3189 self.assertEqual(e.verify_message, msg)
3190 self.assertIn(msg, repr(e))
3191 self.assertIn('certificate verify failed', repr(e))
3192
Christian Heimes19311322019-09-26 22:53:09 +02003193 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003194 def test_protocol_sslv2(self):
3195 """Connecting to an SSLv2 server with various client options"""
3196 if support.verbose:
3197 sys.stdout.write("\n")
3198 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3199 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3200 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003201 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimes19311322019-09-26 22:53:09 +02003202 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3205 # SSLv23 client with specific SSL options
3206 if no_sslv2_implies_sslv3_hello():
3207 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003208 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003210 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003211 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003212 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003213 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003214
Christian Heimesa170fa12017-09-15 20:27:30 +02003215 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 """Connecting to an SSLv23 server with various client options"""
3217 if support.verbose:
3218 sys.stdout.write("\n")
Christian Heimes19311322019-09-26 22:53:09 +02003219 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003220 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003221 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 except OSError as x:
3223 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3224 if support.verbose:
3225 sys.stdout.write(
3226 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3227 % str(x))
Christian Heimes19311322019-09-26 22:53:09 +02003228 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003229 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3230 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimes19311322019-09-26 22:53:09 +02003231 if has_tls_version('TLSv1'):
3232 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003233
Christian Heimes19311322019-09-26 22:53:09 +02003234 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003235 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3236 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimes19311322019-09-26 22:53:09 +02003237 if has_tls_version('TLSv1'):
3238 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003239
Christian Heimes19311322019-09-26 22:53:09 +02003240 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003241 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3242 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimes19311322019-09-26 22:53:09 +02003243 if has_tls_version('TLSv1'):
3244 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003245
3246 # Server with specific SSL options
Christian Heimes19311322019-09-26 22:53:09 +02003247 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003248 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003249 server_options=ssl.OP_NO_SSLv3)
3250 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003251 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003252 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes19311322019-09-26 22:53:09 +02003253 if has_tls_version('TLSv1'):
3254 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3255 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003256
Christian Heimes19311322019-09-26 22:53:09 +02003257 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003258 def test_protocol_sslv3(self):
3259 """Connecting to an SSLv3 server with various client options"""
3260 if support.verbose:
3261 sys.stdout.write("\n")
3262 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3263 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3264 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimes19311322019-09-26 22:53:09 +02003265 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003266 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003267 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003268 client_options=ssl.OP_NO_SSLv3)
3269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3270 if no_sslv2_implies_sslv3_hello():
3271 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003273 False, client_options=ssl.OP_NO_SSLv2)
3274
Christian Heimes19311322019-09-26 22:53:09 +02003275 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003276 def test_protocol_tlsv1(self):
3277 """Connecting to a TLSv1 server with various client options"""
3278 if support.verbose:
3279 sys.stdout.write("\n")
3280 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3281 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3282 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimes19311322019-09-26 22:53:09 +02003283 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003284 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimes19311322019-09-26 22:53:09 +02003285 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003286 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003288 client_options=ssl.OP_NO_TLSv1)
3289
Christian Heimes19311322019-09-26 22:53:09 +02003290 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003291 def test_protocol_tlsv1_1(self):
3292 """Connecting to a TLSv1.1 server with various client options.
3293 Testing against older TLS versions."""
3294 if support.verbose:
3295 sys.stdout.write("\n")
3296 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimes19311322019-09-26 22:53:09 +02003297 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003298 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimes19311322019-09-26 22:53:09 +02003299 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003301 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003302 client_options=ssl.OP_NO_TLSv1_1)
3303
Christian Heimesa170fa12017-09-15 20:27:30 +02003304 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimes19311322019-09-26 22:53:09 +02003305 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3306 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307
Christian Heimes19311322019-09-26 22:53:09 +02003308 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003309 def test_protocol_tlsv1_2(self):
3310 """Connecting to a TLSv1.2 server with various client options.
3311 Testing against older TLS versions."""
3312 if support.verbose:
3313 sys.stdout.write("\n")
3314 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3315 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3316 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimes19311322019-09-26 22:53:09 +02003317 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003318 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimes19311322019-09-26 22:53:09 +02003319 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003320 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003321 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003322 client_options=ssl.OP_NO_TLSv1_2)
3323
Christian Heimesa170fa12017-09-15 20:27:30 +02003324 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3326 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3327 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3328 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3329
3330 def test_starttls(self):
3331 """Switching from clear text to encrypted and back again."""
3332 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3333
3334 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 starttls_server=True,
3336 chatty=True,
3337 connectionchatty=True)
3338 wrapped = False
3339 with server:
3340 s = socket.socket()
3341 s.setblocking(1)
3342 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003343 if support.verbose:
3344 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003345 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003346 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003347 sys.stdout.write(
3348 " client: sending %r...\n" % indata)
3349 if wrapped:
3350 conn.write(indata)
3351 outdata = conn.read()
3352 else:
3353 s.send(indata)
3354 outdata = s.recv(1024)
3355 msg = outdata.strip().lower()
3356 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3357 # STARTTLS ok, switch to secure mode
3358 if support.verbose:
3359 sys.stdout.write(
3360 " client: read %r from server, starting TLS...\n"
3361 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003362 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003363 wrapped = True
3364 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3365 # ENDTLS ok, switch back to clear text
3366 if support.verbose:
3367 sys.stdout.write(
3368 " client: read %r from server, ending TLS...\n"
3369 % msg)
3370 s = conn.unwrap()
3371 wrapped = False
3372 else:
3373 if support.verbose:
3374 sys.stdout.write(
3375 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003376 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003377 sys.stdout.write(" client: closing connection.\n")
3378 if wrapped:
3379 conn.write(b"over\n")
3380 else:
3381 s.send(b"over\n")
3382 if wrapped:
3383 conn.close()
3384 else:
3385 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003386
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003387 def test_socketserver(self):
3388 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003389 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003390 # try to connect
3391 if support.verbose:
3392 sys.stdout.write('\n')
3393 with open(CERTFILE, 'rb') as f:
3394 d1 = f.read()
3395 d2 = ''
3396 # now fetch the same data from the HTTPS server
3397 url = 'https://localhost:%d/%s' % (
3398 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003399 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003400 f = urllib.request.urlopen(url, context=context)
3401 try:
3402 dlen = f.info().get("content-length")
3403 if dlen and (int(dlen) > 0):
3404 d2 = f.read(int(dlen))
3405 if support.verbose:
3406 sys.stdout.write(
3407 " client: read %d bytes from remote server '%s'\n"
3408 % (len(d2), server))
3409 finally:
3410 f.close()
3411 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003412
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003413 def test_asyncore_server(self):
3414 """Check the example asyncore integration."""
3415 if support.verbose:
3416 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003417
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 indata = b"FOO\n"
3419 server = AsyncoreEchoServer(CERTFILE)
3420 with server:
3421 s = test_wrap_socket(socket.socket())
3422 s.connect(('127.0.0.1', server.port))
3423 if support.verbose:
3424 sys.stdout.write(
3425 " client: sending %r...\n" % indata)
3426 s.write(indata)
3427 outdata = s.read()
3428 if support.verbose:
3429 sys.stdout.write(" client: read %r\n" % outdata)
3430 if outdata != indata.lower():
3431 self.fail(
3432 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3433 % (outdata[:20], len(outdata),
3434 indata[:20].lower(), len(indata)))
3435 s.write(b"over\n")
3436 if support.verbose:
3437 sys.stdout.write(" client: closing connection.\n")
3438 s.close()
3439 if support.verbose:
3440 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003441
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003442 def test_recv_send(self):
3443 """Test recv(), send() and friends."""
3444 if support.verbose:
3445 sys.stdout.write("\n")
3446
3447 server = ThreadedEchoServer(CERTFILE,
3448 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003449 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003450 cacerts=CERTFILE,
3451 chatty=True,
3452 connectionchatty=False)
3453 with server:
3454 s = test_wrap_socket(socket.socket(),
3455 server_side=False,
3456 certfile=CERTFILE,
3457 ca_certs=CERTFILE,
3458 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003459 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003460 s.connect((HOST, server.port))
3461 # helper methods for standardising recv* method signatures
3462 def _recv_into():
3463 b = bytearray(b"\0"*100)
3464 count = s.recv_into(b)
3465 return b[:count]
3466
3467 def _recvfrom_into():
3468 b = bytearray(b"\0"*100)
3469 count, addr = s.recvfrom_into(b)
3470 return b[:count]
3471
3472 # (name, method, expect success?, *args, return value func)
3473 send_methods = [
3474 ('send', s.send, True, [], len),
3475 ('sendto', s.sendto, False, ["some.address"], len),
3476 ('sendall', s.sendall, True, [], lambda x: None),
3477 ]
3478 # (name, method, whether to expect success, *args)
3479 recv_methods = [
3480 ('recv', s.recv, True, []),
3481 ('recvfrom', s.recvfrom, False, ["some.address"]),
3482 ('recv_into', _recv_into, True, []),
3483 ('recvfrom_into', _recvfrom_into, False, []),
3484 ]
3485 data_prefix = "PREFIX_"
3486
3487 for (meth_name, send_meth, expect_success, args,
3488 ret_val_meth) in send_methods:
3489 indata = (data_prefix + meth_name).encode('ascii')
3490 try:
3491 ret = send_meth(indata, *args)
3492 msg = "sending with {}".format(meth_name)
3493 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3494 outdata = s.read()
3495 if outdata != indata.lower():
3496 self.fail(
3497 "While sending with <<{name:s}>> bad data "
3498 "<<{outdata:r}>> ({nout:d}) received; "
3499 "expected <<{indata:r}>> ({nin:d})\n".format(
3500 name=meth_name, outdata=outdata[:20],
3501 nout=len(outdata),
3502 indata=indata[:20], nin=len(indata)
3503 )
3504 )
3505 except ValueError as e:
3506 if expect_success:
3507 self.fail(
3508 "Failed to send with method <<{name:s}>>; "
3509 "expected to succeed.\n".format(name=meth_name)
3510 )
3511 if not str(e).startswith(meth_name):
3512 self.fail(
3513 "Method <<{name:s}>> failed with unexpected "
3514 "exception message: {exp:s}\n".format(
3515 name=meth_name, exp=e
3516 )
3517 )
3518
3519 for meth_name, recv_meth, expect_success, args in recv_methods:
3520 indata = (data_prefix + meth_name).encode('ascii')
3521 try:
3522 s.send(indata)
3523 outdata = recv_meth(*args)
3524 if outdata != indata.lower():
3525 self.fail(
3526 "While receiving with <<{name:s}>> bad data "
3527 "<<{outdata:r}>> ({nout:d}) received; "
3528 "expected <<{indata:r}>> ({nin:d})\n".format(
3529 name=meth_name, outdata=outdata[:20],
3530 nout=len(outdata),
3531 indata=indata[:20], nin=len(indata)
3532 )
3533 )
3534 except ValueError as e:
3535 if expect_success:
3536 self.fail(
3537 "Failed to receive with method <<{name:s}>>; "
3538 "expected to succeed.\n".format(name=meth_name)
3539 )
3540 if not str(e).startswith(meth_name):
3541 self.fail(
3542 "Method <<{name:s}>> failed with unexpected "
3543 "exception message: {exp:s}\n".format(
3544 name=meth_name, exp=e
3545 )
3546 )
3547 # consume data
3548 s.read()
3549
3550 # read(-1, buffer) is supported, even though read(-1) is not
3551 data = b"data"
3552 s.send(data)
3553 buffer = bytearray(len(data))
3554 self.assertEqual(s.read(-1, buffer), len(data))
3555 self.assertEqual(buffer, data)
3556
Christian Heimes888bbdc2017-09-07 14:18:21 -07003557 # sendall accepts bytes-like objects
3558 if ctypes is not None:
3559 ubyte = ctypes.c_ubyte * len(data)
3560 byteslike = ubyte.from_buffer_copy(data)
3561 s.sendall(byteslike)
3562 self.assertEqual(s.read(), data)
3563
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003564 # Make sure sendmsg et al are disallowed to avoid
3565 # inadvertent disclosure of data and/or corruption
3566 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003567 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003568 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3569 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3570 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003571 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003572 s.write(b"over\n")
3573
3574 self.assertRaises(ValueError, s.recv, -1)
3575 self.assertRaises(ValueError, s.read, -1)
3576
3577 s.close()
3578
3579 def test_recv_zero(self):
3580 server = ThreadedEchoServer(CERTFILE)
3581 server.__enter__()
3582 self.addCleanup(server.__exit__, None, None)
3583 s = socket.create_connection((HOST, server.port))
3584 self.addCleanup(s.close)
3585 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3586 self.addCleanup(s.close)
3587
3588 # recv/read(0) should return no data
3589 s.send(b"data")
3590 self.assertEqual(s.recv(0), b"")
3591 self.assertEqual(s.read(0), b"")
3592 self.assertEqual(s.read(), b"data")
3593
3594 # Should not block if the other end sends no data
3595 s.setblocking(False)
3596 self.assertEqual(s.recv(0), b"")
3597 self.assertEqual(s.recv_into(bytearray()), 0)
3598
3599 def test_nonblocking_send(self):
3600 server = ThreadedEchoServer(CERTFILE,
3601 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003602 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 cacerts=CERTFILE,
3604 chatty=True,
3605 connectionchatty=False)
3606 with server:
3607 s = test_wrap_socket(socket.socket(),
3608 server_side=False,
3609 certfile=CERTFILE,
3610 ca_certs=CERTFILE,
3611 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003612 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003613 s.connect((HOST, server.port))
3614 s.setblocking(False)
3615
3616 # If we keep sending data, at some point the buffers
3617 # will be full and the call will block
3618 buf = bytearray(8192)
3619 def fill_buffer():
3620 while True:
3621 s.send(buf)
3622 self.assertRaises((ssl.SSLWantWriteError,
3623 ssl.SSLWantReadError), fill_buffer)
3624
3625 # Now read all the output and discard it
3626 s.setblocking(True)
3627 s.close()
3628
3629 def test_handshake_timeout(self):
3630 # Issue #5103: SSL handshake must respect the socket timeout
3631 server = socket.socket(socket.AF_INET)
3632 host = "127.0.0.1"
3633 port = support.bind_port(server)
3634 started = threading.Event()
3635 finish = False
3636
3637 def serve():
3638 server.listen()
3639 started.set()
3640 conns = []
3641 while not finish:
3642 r, w, e = select.select([server], [], [], 0.1)
3643 if server in r:
3644 # Let the socket hang around rather than having
3645 # it closed by garbage collection.
3646 conns.append(server.accept()[0])
3647 for sock in conns:
3648 sock.close()
3649
3650 t = threading.Thread(target=serve)
3651 t.start()
3652 started.wait()
3653
3654 try:
3655 try:
3656 c = socket.socket(socket.AF_INET)
3657 c.settimeout(0.2)
3658 c.connect((host, port))
3659 # Will attempt handshake and time out
3660 self.assertRaisesRegex(socket.timeout, "timed out",
3661 test_wrap_socket, c)
3662 finally:
3663 c.close()
3664 try:
3665 c = socket.socket(socket.AF_INET)
3666 c = test_wrap_socket(c)
3667 c.settimeout(0.2)
3668 # Will attempt handshake and time out
3669 self.assertRaisesRegex(socket.timeout, "timed out",
3670 c.connect, (host, port))
3671 finally:
3672 c.close()
3673 finally:
3674 finish = True
3675 t.join()
3676 server.close()
3677
3678 def test_server_accept(self):
3679 # Issue #16357: accept() on a SSLSocket created through
3680 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003681 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003682 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003683 context.load_verify_locations(SIGNING_CA)
3684 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003685 server = socket.socket(socket.AF_INET)
3686 host = "127.0.0.1"
3687 port = support.bind_port(server)
3688 server = context.wrap_socket(server, server_side=True)
3689 self.assertTrue(server.server_side)
3690
3691 evt = threading.Event()
3692 remote = None
3693 peer = None
3694 def serve():
3695 nonlocal remote, peer
3696 server.listen()
3697 # Block on the accept and wait on the connection to close.
3698 evt.set()
3699 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003700 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003701
3702 t = threading.Thread(target=serve)
3703 t.start()
3704 # Client wait until server setup and perform a connect.
3705 evt.wait()
3706 client = context.wrap_socket(socket.socket())
3707 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003708 client.send(b'data')
3709 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003710 client_addr = client.getsockname()
3711 client.close()
3712 t.join()
3713 remote.close()
3714 server.close()
3715 # Sanity checks.
3716 self.assertIsInstance(remote, ssl.SSLSocket)
3717 self.assertEqual(peer, client_addr)
3718
3719 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003720 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003721 with context.wrap_socket(socket.socket()) as sock:
3722 with self.assertRaises(OSError) as cm:
3723 sock.getpeercert()
3724 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3725
3726 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003727 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003728 with context.wrap_socket(socket.socket()) as sock:
3729 with self.assertRaises(OSError) as cm:
3730 sock.do_handshake()
3731 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3732
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003733 def test_no_shared_ciphers(self):
3734 client_context, server_context, hostname = testing_context()
3735 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3736 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003737 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003738 client_context.set_ciphers("AES128")
3739 server_context.set_ciphers("AES256")
3740 with ThreadedEchoServer(context=server_context) as server:
3741 with client_context.wrap_socket(socket.socket(),
3742 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003743 with self.assertRaises(OSError):
3744 s.connect((HOST, server.port))
3745 self.assertIn("no shared cipher", server.conn_errors[0])
3746
3747 def test_version_basic(self):
3748 """
3749 Basic tests for SSLSocket.version().
3750 More tests are done in the test_protocol_*() methods.
3751 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003752 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3753 context.check_hostname = False
3754 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003755 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003756 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003757 chatty=False) as server:
3758 with context.wrap_socket(socket.socket()) as s:
3759 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003760 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 s.connect((HOST, server.port))
Christian Heimes19311322019-09-26 22:53:09 +02003762 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003763 self.assertEqual(s.version(), 'TLSv1.3')
3764 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003765 self.assertEqual(s.version(), 'TLSv1.2')
3766 else: # 0.9.8 to 1.0.1
3767 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003768 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 self.assertIs(s.version(), None)
3770
Christian Heimes19311322019-09-26 22:53:09 +02003771 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003772 def test_tls1_3(self):
3773 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3774 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003775 context.options |= (
3776 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3777 )
3778 with ThreadedEchoServer(context=context) as server:
3779 with context.wrap_socket(socket.socket()) as s:
3780 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003781 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003782 'TLS_AES_256_GCM_SHA384',
3783 'TLS_CHACHA20_POLY1305_SHA256',
3784 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003785 })
3786 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003787
Christian Heimes19311322019-09-26 22:53:09 +02003788 @requires_minimum_version
3789 @requires_tls_version('TLSv1_2')
3790 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003791 client_context, server_context, hostname = testing_context()
3792 # client TLSv1.0 to 1.2
3793 client_context.minimum_version = ssl.TLSVersion.TLSv1
3794 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3795 # server only TLSv1.2
3796 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3797 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3798
3799 with ThreadedEchoServer(context=server_context) as server:
3800 with client_context.wrap_socket(socket.socket(),
3801 server_hostname=hostname) as s:
3802 s.connect((HOST, server.port))
3803 self.assertEqual(s.version(), 'TLSv1.2')
3804
Christian Heimes19311322019-09-26 22:53:09 +02003805 @requires_minimum_version
3806 @requires_tls_version('TLSv1_1')
3807 def test_min_max_version_tlsv1_1(self):
3808 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003809 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimes19311322019-09-26 22:53:09 +02003810 client_context.minimum_version = ssl.TLSVersion.TLSv1
3811 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003812 server_context.minimum_version = ssl.TLSVersion.TLSv1
3813 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3814
3815 with ThreadedEchoServer(context=server_context) as server:
3816 with client_context.wrap_socket(socket.socket(),
3817 server_hostname=hostname) as s:
3818 s.connect((HOST, server.port))
3819 self.assertEqual(s.version(), 'TLSv1.1')
3820
Christian Heimes19311322019-09-26 22:53:09 +02003821 @requires_minimum_version
3822 @requires_tls_version('TLSv1_2')
3823 def test_min_max_version_mismatch(self):
3824 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003825 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003826 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)d6ac67f2019-09-11 10:59:13 -07003827 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003828 client_context.maximum_version = ssl.TLSVersion.TLSv1
Miss Islington (bot)8af4e0c2019-09-11 11:19:12 -07003829 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003830 with ThreadedEchoServer(context=server_context) as server:
3831 with client_context.wrap_socket(socket.socket(),
3832 server_hostname=hostname) as s:
3833 with self.assertRaises(ssl.SSLError) as e:
3834 s.connect((HOST, server.port))
3835 self.assertIn("alert", str(e.exception))
3836
Christian Heimes19311322019-09-26 22:53:09 +02003837 @requires_minimum_version
3838 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003839 def test_min_max_version_sslv3(self):
3840 client_context, server_context, hostname = testing_context()
3841 server_context.minimum_version = ssl.TLSVersion.SSLv3
3842 client_context.minimum_version = ssl.TLSVersion.SSLv3
3843 client_context.maximum_version = ssl.TLSVersion.SSLv3
3844 with ThreadedEchoServer(context=server_context) as server:
3845 with client_context.wrap_socket(socket.socket(),
3846 server_hostname=hostname) as s:
3847 s.connect((HOST, server.port))
3848 self.assertEqual(s.version(), 'SSLv3')
3849
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003850 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3851 def test_default_ecdh_curve(self):
3852 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3853 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003854 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003855 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003856 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3857 # cipher name.
3858 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003859 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3860 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3861 # our default cipher list should prefer ECDH-based ciphers
3862 # automatically.
3863 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3864 context.set_ciphers("ECCdraft:ECDH")
3865 with ThreadedEchoServer(context=context) as server:
3866 with context.wrap_socket(socket.socket()) as s:
3867 s.connect((HOST, server.port))
3868 self.assertIn("ECDH", s.cipher()[0])
3869
3870 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3871 "'tls-unique' channel binding not available")
3872 def test_tls_unique_channel_binding(self):
3873 """Test tls-unique channel binding."""
3874 if support.verbose:
3875 sys.stdout.write("\n")
3876
Christian Heimes05d9fe32018-02-27 08:55:39 +01003877 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003878
3879 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003880 chatty=True,
3881 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003882
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003883 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003884 with client_context.wrap_socket(
3885 socket.socket(),
3886 server_hostname=hostname) as s:
3887 s.connect((HOST, server.port))
3888 # get the data
3889 cb_data = s.get_channel_binding("tls-unique")
3890 if support.verbose:
3891 sys.stdout.write(
3892 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003893
Christian Heimes05d9fe32018-02-27 08:55:39 +01003894 # check if it is sane
3895 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003896 if s.version() == 'TLSv1.3':
3897 self.assertEqual(len(cb_data), 48)
3898 else:
3899 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003900
Christian Heimes05d9fe32018-02-27 08:55:39 +01003901 # and compare with the peers version
3902 s.write(b"CB tls-unique\n")
3903 peer_data_repr = s.read().strip()
3904 self.assertEqual(peer_data_repr,
3905 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003906
3907 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003908 with client_context.wrap_socket(
3909 socket.socket(),
3910 server_hostname=hostname) as s:
3911 s.connect((HOST, server.port))
3912 new_cb_data = s.get_channel_binding("tls-unique")
3913 if support.verbose:
3914 sys.stdout.write(
3915 "got another channel binding data: {0!r}\n".format(
3916 new_cb_data)
3917 )
3918 # is it really unique
3919 self.assertNotEqual(cb_data, new_cb_data)
3920 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003921 if s.version() == 'TLSv1.3':
3922 self.assertEqual(len(cb_data), 48)
3923 else:
3924 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003925 s.write(b"CB tls-unique\n")
3926 peer_data_repr = s.read().strip()
3927 self.assertEqual(peer_data_repr,
3928 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003929
3930 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003931 client_context, server_context, hostname = testing_context()
3932 stats = server_params_test(client_context, server_context,
3933 chatty=True, connectionchatty=True,
3934 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003935 if support.verbose:
3936 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3937 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3938
3939 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3940 "ssl.OP_NO_COMPRESSION needed for this test")
3941 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003942 client_context, server_context, hostname = testing_context()
3943 client_context.options |= ssl.OP_NO_COMPRESSION
3944 server_context.options |= ssl.OP_NO_COMPRESSION
3945 stats = server_params_test(client_context, server_context,
3946 chatty=True, connectionchatty=True,
3947 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003948 self.assertIs(stats['compression'], None)
3949
3950 def test_dh_params(self):
3951 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003952 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003953 # test scenario needs TLS <= 1.2
3954 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003955 server_context.load_dh_params(DHFILE)
3956 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003957 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003958 stats = server_params_test(client_context, server_context,
3959 chatty=True, connectionchatty=True,
3960 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003961 cipher = stats["cipher"][0]
3962 parts = cipher.split("-")
3963 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3964 self.fail("Non-DH cipher: " + cipher[0])
3965
Christian Heimesb7b92252018-02-25 09:49:31 +01003966 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003967 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003968 def test_ecdh_curve(self):
3969 # server secp384r1, client auto
3970 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003971
Christian Heimesb7b92252018-02-25 09:49:31 +01003972 server_context.set_ecdh_curve("secp384r1")
3973 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3974 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3975 stats = server_params_test(client_context, server_context,
3976 chatty=True, connectionchatty=True,
3977 sni_name=hostname)
3978
3979 # server auto, client secp384r1
3980 client_context, server_context, hostname = testing_context()
3981 client_context.set_ecdh_curve("secp384r1")
3982 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3983 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3984 stats = server_params_test(client_context, server_context,
3985 chatty=True, connectionchatty=True,
3986 sni_name=hostname)
3987
3988 # server / client curve mismatch
3989 client_context, server_context, hostname = testing_context()
3990 client_context.set_ecdh_curve("prime256v1")
3991 server_context.set_ecdh_curve("secp384r1")
3992 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3993 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3994 try:
3995 stats = server_params_test(client_context, server_context,
3996 chatty=True, connectionchatty=True,
3997 sni_name=hostname)
3998 except ssl.SSLError:
3999 pass
4000 else:
4001 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004002 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004003 self.fail("mismatch curve did not fail")
4004
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004005 def test_selected_alpn_protocol(self):
4006 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004007 client_context, server_context, hostname = testing_context()
4008 stats = server_params_test(client_context, server_context,
4009 chatty=True, connectionchatty=True,
4010 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 self.assertIs(stats['client_alpn_protocol'], None)
4012
4013 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4014 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4015 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 server_context.set_alpn_protocols(['foo', 'bar'])
4018 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004019 chatty=True, connectionchatty=True,
4020 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004021 self.assertIs(stats['client_alpn_protocol'], None)
4022
4023 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4024 def test_alpn_protocols(self):
4025 server_protocols = ['foo', 'bar', 'milkshake']
4026 protocol_tests = [
4027 (['foo', 'bar'], 'foo'),
4028 (['bar', 'foo'], 'foo'),
4029 (['milkshake'], 'milkshake'),
4030 (['http/3.0', 'http/4.0'], None)
4031 ]
4032 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004033 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004034 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004035 client_context.set_alpn_protocols(client_protocols)
4036
4037 try:
4038 stats = server_params_test(client_context,
4039 server_context,
4040 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004041 connectionchatty=True,
4042 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 except ssl.SSLError as e:
4044 stats = e
4045
Christian Heimes05d9fe32018-02-27 08:55:39 +01004046 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4048 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4049 self.assertIsInstance(stats, ssl.SSLError)
4050 else:
4051 msg = "failed trying %s (s) and %s (c).\n" \
4052 "was expecting %s, but got %%s from the %%s" \
4053 % (str(server_protocols), str(client_protocols),
4054 str(expected))
4055 client_result = stats['client_alpn_protocol']
4056 self.assertEqual(client_result, expected,
4057 msg % (client_result, "client"))
4058 server_result = stats['server_alpn_protocols'][-1] \
4059 if len(stats['server_alpn_protocols']) else 'nothing'
4060 self.assertEqual(server_result, expected,
4061 msg % (server_result, "server"))
4062
4063 def test_selected_npn_protocol(self):
4064 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004065 client_context, server_context, hostname = testing_context()
4066 stats = server_params_test(client_context, server_context,
4067 chatty=True, connectionchatty=True,
4068 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004069 self.assertIs(stats['client_npn_protocol'], None)
4070
4071 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4072 def test_npn_protocols(self):
4073 server_protocols = ['http/1.1', 'spdy/2']
4074 protocol_tests = [
4075 (['http/1.1', 'spdy/2'], 'http/1.1'),
4076 (['spdy/2', 'http/1.1'], 'http/1.1'),
4077 (['spdy/2', 'test'], 'spdy/2'),
4078 (['abc', 'def'], 'abc')
4079 ]
4080 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004081 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004082 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004083 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004084 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004085 chatty=True, connectionchatty=True,
4086 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004087 msg = "failed trying %s (s) and %s (c).\n" \
4088 "was expecting %s, but got %%s from the %%s" \
4089 % (str(server_protocols), str(client_protocols),
4090 str(expected))
4091 client_result = stats['client_npn_protocol']
4092 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4093 server_result = stats['server_npn_protocols'][-1] \
4094 if len(stats['server_npn_protocols']) else 'nothing'
4095 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4096
4097 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004098 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004099 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004100 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004101 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004102 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 client_context.load_verify_locations(SIGNING_CA)
4104 return server_context, other_context, client_context
4105
4106 def check_common_name(self, stats, name):
4107 cert = stats['peercert']
4108 self.assertIn((('commonName', name),), cert['subject'])
4109
4110 @needs_sni
4111 def test_sni_callback(self):
4112 calls = []
4113 server_context, other_context, client_context = self.sni_contexts()
4114
Christian Heimesa170fa12017-09-15 20:27:30 +02004115 client_context.check_hostname = False
4116
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004117 def servername_cb(ssl_sock, server_name, initial_context):
4118 calls.append((server_name, initial_context))
4119 if server_name is not None:
4120 ssl_sock.context = other_context
4121 server_context.set_servername_callback(servername_cb)
4122
4123 stats = server_params_test(client_context, server_context,
4124 chatty=True,
4125 sni_name='supermessage')
4126 # The hostname was fetched properly, and the certificate was
4127 # changed for the connection.
4128 self.assertEqual(calls, [("supermessage", server_context)])
4129 # CERTFILE4 was selected
4130 self.check_common_name(stats, 'fakehostname')
4131
4132 calls = []
4133 # The callback is called with server_name=None
4134 stats = server_params_test(client_context, server_context,
4135 chatty=True,
4136 sni_name=None)
4137 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004138 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139
4140 # Check disabling the callback
4141 calls = []
4142 server_context.set_servername_callback(None)
4143
4144 stats = server_params_test(client_context, server_context,
4145 chatty=True,
4146 sni_name='notfunny')
4147 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004148 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004149 self.assertEqual(calls, [])
4150
4151 @needs_sni
4152 def test_sni_callback_alert(self):
4153 # Returning a TLS alert is reflected to the connecting client
4154 server_context, other_context, client_context = self.sni_contexts()
4155
4156 def cb_returning_alert(ssl_sock, server_name, initial_context):
4157 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4158 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004159 with self.assertRaises(ssl.SSLError) as cm:
4160 stats = server_params_test(client_context, server_context,
4161 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004162 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004163 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004165 @needs_sni
4166 def test_sni_callback_raising(self):
4167 # Raising fails the connection with a TLS handshake failure alert.
4168 server_context, other_context, client_context = self.sni_contexts()
4169
4170 def cb_raising(ssl_sock, server_name, initial_context):
4171 1/0
4172 server_context.set_servername_callback(cb_raising)
4173
Victor Stinner00253502019-06-03 03:51:43 +02004174 with support.catch_unraisable_exception() as catch:
4175 with self.assertRaises(ssl.SSLError) as cm:
4176 stats = server_params_test(client_context, server_context,
4177 chatty=False,
4178 sni_name='supermessage')
4179
4180 self.assertEqual(cm.exception.reason,
4181 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4182 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004184 @needs_sni
4185 def test_sni_callback_wrong_return_type(self):
4186 # Returning the wrong return type terminates the TLS connection
4187 # with an internal error alert.
4188 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004190 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4191 return "foo"
4192 server_context.set_servername_callback(cb_wrong_return_type)
4193
Victor Stinner00253502019-06-03 03:51:43 +02004194 with support.catch_unraisable_exception() as catch:
4195 with self.assertRaises(ssl.SSLError) as cm:
4196 stats = server_params_test(client_context, server_context,
4197 chatty=False,
4198 sni_name='supermessage')
4199
4200
4201 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4202 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004203
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004204 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004205 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004206 client_context.set_ciphers("AES128:AES256")
4207 server_context.set_ciphers("AES256")
4208 expected_algs = [
4209 "AES256", "AES-256",
4210 # TLS 1.3 ciphers are always enabled
4211 "TLS_CHACHA20", "TLS_AES",
4212 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004213
Christian Heimesa170fa12017-09-15 20:27:30 +02004214 stats = server_params_test(client_context, server_context,
4215 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004216 ciphers = stats['server_shared_ciphers'][0]
4217 self.assertGreater(len(ciphers), 0)
4218 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004219 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004220 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004221
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004222 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004223 client_context, server_context, hostname = testing_context()
4224 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004225
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004226 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004227 s = client_context.wrap_socket(socket.socket(),
4228 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004229 s.connect((HOST, server.port))
4230 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004231
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004232 self.assertRaises(ValueError, s.read, 1024)
4233 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004234
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004235 def test_sendfile(self):
4236 TEST_DATA = b"x" * 512
4237 with open(support.TESTFN, 'wb') as f:
4238 f.write(TEST_DATA)
4239 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004240 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004241 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004242 context.load_verify_locations(SIGNING_CA)
4243 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004244 server = ThreadedEchoServer(context=context, chatty=False)
4245 with server:
4246 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004247 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004248 with open(support.TESTFN, 'rb') as file:
4249 s.sendfile(file)
4250 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004251
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004252 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004253 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004254 # TODO: sessions aren't compatible with TLSv1.3 yet
4255 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004256
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004257 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004258 stats = server_params_test(client_context, server_context,
4259 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004260 session = stats['session']
4261 self.assertTrue(session.id)
4262 self.assertGreater(session.time, 0)
4263 self.assertGreater(session.timeout, 0)
4264 self.assertTrue(session.has_ticket)
4265 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4266 self.assertGreater(session.ticket_lifetime_hint, 0)
4267 self.assertFalse(stats['session_reused'])
4268 sess_stat = server_context.session_stats()
4269 self.assertEqual(sess_stat['accept'], 1)
4270 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004271
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004272 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004273 stats = server_params_test(client_context, server_context,
4274 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004275 sess_stat = server_context.session_stats()
4276 self.assertEqual(sess_stat['accept'], 2)
4277 self.assertEqual(sess_stat['hits'], 1)
4278 self.assertTrue(stats['session_reused'])
4279 session2 = stats['session']
4280 self.assertEqual(session2.id, session.id)
4281 self.assertEqual(session2, session)
4282 self.assertIsNot(session2, session)
4283 self.assertGreaterEqual(session2.time, session.time)
4284 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004285
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004286 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004287 stats = server_params_test(client_context, server_context,
4288 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004289 self.assertFalse(stats['session_reused'])
4290 session3 = stats['session']
4291 self.assertNotEqual(session3.id, session.id)
4292 self.assertNotEqual(session3, session)
4293 sess_stat = server_context.session_stats()
4294 self.assertEqual(sess_stat['accept'], 3)
4295 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004296
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004297 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004298 stats = server_params_test(client_context, server_context,
4299 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004300 self.assertTrue(stats['session_reused'])
4301 session4 = stats['session']
4302 self.assertEqual(session4.id, session.id)
4303 self.assertEqual(session4, session)
4304 self.assertGreaterEqual(session4.time, session.time)
4305 self.assertGreaterEqual(session4.timeout, session.timeout)
4306 sess_stat = server_context.session_stats()
4307 self.assertEqual(sess_stat['accept'], 4)
4308 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004309
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004310 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004311 client_context, server_context, hostname = testing_context()
4312 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004313
Christian Heimes05d9fe32018-02-27 08:55:39 +01004314 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004315 client_context.options |= ssl.OP_NO_TLSv1_3
4316 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004317
Christian Heimesa170fa12017-09-15 20:27:30 +02004318 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004319 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004320 with client_context.wrap_socket(socket.socket(),
4321 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004322 # session is None before handshake
4323 self.assertEqual(s.session, None)
4324 self.assertEqual(s.session_reused, None)
4325 s.connect((HOST, server.port))
4326 session = s.session
4327 self.assertTrue(session)
4328 with self.assertRaises(TypeError) as e:
4329 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004330 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004331
Christian Heimesa170fa12017-09-15 20:27:30 +02004332 with client_context.wrap_socket(socket.socket(),
4333 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004334 s.connect((HOST, server.port))
4335 # cannot set session after handshake
4336 with self.assertRaises(ValueError) as e:
4337 s.session = session
4338 self.assertEqual(str(e.exception),
4339 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004340
Christian Heimesa170fa12017-09-15 20:27:30 +02004341 with client_context.wrap_socket(socket.socket(),
4342 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004343 # can set session before handshake and before the
4344 # connection was established
4345 s.session = session
4346 s.connect((HOST, server.port))
4347 self.assertEqual(s.session.id, session.id)
4348 self.assertEqual(s.session, session)
4349 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004350
Christian Heimesa170fa12017-09-15 20:27:30 +02004351 with client_context2.wrap_socket(socket.socket(),
4352 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004353 # cannot re-use session with a different SSLContext
4354 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004355 s.session = session
4356 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004357 self.assertEqual(str(e.exception),
4358 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004359
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004360
Christian Heimes19311322019-09-26 22:53:09 +02004361@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004362class TestPostHandshakeAuth(unittest.TestCase):
4363 def test_pha_setter(self):
4364 protocols = [
4365 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4366 ]
4367 for protocol in protocols:
4368 ctx = ssl.SSLContext(protocol)
4369 self.assertEqual(ctx.post_handshake_auth, False)
4370
4371 ctx.post_handshake_auth = True
4372 self.assertEqual(ctx.post_handshake_auth, True)
4373
4374 ctx.verify_mode = ssl.CERT_REQUIRED
4375 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4376 self.assertEqual(ctx.post_handshake_auth, True)
4377
4378 ctx.post_handshake_auth = False
4379 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4380 self.assertEqual(ctx.post_handshake_auth, False)
4381
4382 ctx.verify_mode = ssl.CERT_OPTIONAL
4383 ctx.post_handshake_auth = True
4384 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4385 self.assertEqual(ctx.post_handshake_auth, True)
4386
4387 def test_pha_required(self):
4388 client_context, server_context, hostname = testing_context()
4389 server_context.post_handshake_auth = True
4390 server_context.verify_mode = ssl.CERT_REQUIRED
4391 client_context.post_handshake_auth = True
4392 client_context.load_cert_chain(SIGNED_CERTFILE)
4393
4394 server = ThreadedEchoServer(context=server_context, chatty=False)
4395 with server:
4396 with client_context.wrap_socket(socket.socket(),
4397 server_hostname=hostname) as s:
4398 s.connect((HOST, server.port))
4399 s.write(b'HASCERT')
4400 self.assertEqual(s.recv(1024), b'FALSE\n')
4401 s.write(b'PHA')
4402 self.assertEqual(s.recv(1024), b'OK\n')
4403 s.write(b'HASCERT')
4404 self.assertEqual(s.recv(1024), b'TRUE\n')
4405 # PHA method just returns true when cert is already available
4406 s.write(b'PHA')
4407 self.assertEqual(s.recv(1024), b'OK\n')
4408 s.write(b'GETCERT')
4409 cert_text = s.recv(4096).decode('us-ascii')
4410 self.assertIn('Python Software Foundation CA', cert_text)
4411
4412 def test_pha_required_nocert(self):
4413 client_context, server_context, hostname = testing_context()
4414 server_context.post_handshake_auth = True
4415 server_context.verify_mode = ssl.CERT_REQUIRED
4416 client_context.post_handshake_auth = True
4417
Miss Islington (bot)4c403b82019-07-09 05:55:08 -07004418 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4419 # (it is only raised sometimes on Windows)
4420 with support.catch_threading_exception() as cm:
4421 server = ThreadedEchoServer(context=server_context, chatty=False)
4422 with server:
4423 with client_context.wrap_socket(socket.socket(),
4424 server_hostname=hostname) as s:
4425 s.connect((HOST, server.port))
4426 s.write(b'PHA')
4427 # receive CertificateRequest
4428 self.assertEqual(s.recv(1024), b'OK\n')
4429 # send empty Certificate + Finish
4430 s.write(b'HASCERT')
4431 # receive alert
4432 with self.assertRaisesRegex(
4433 ssl.SSLError,
4434 'tlsv13 alert certificate required'):
4435 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004436
4437 def test_pha_optional(self):
4438 if support.verbose:
4439 sys.stdout.write("\n")
4440
4441 client_context, server_context, hostname = testing_context()
4442 server_context.post_handshake_auth = True
4443 server_context.verify_mode = ssl.CERT_REQUIRED
4444 client_context.post_handshake_auth = True
4445 client_context.load_cert_chain(SIGNED_CERTFILE)
4446
4447 # check CERT_OPTIONAL
4448 server_context.verify_mode = ssl.CERT_OPTIONAL
4449 server = ThreadedEchoServer(context=server_context, chatty=False)
4450 with server:
4451 with client_context.wrap_socket(socket.socket(),
4452 server_hostname=hostname) as s:
4453 s.connect((HOST, server.port))
4454 s.write(b'HASCERT')
4455 self.assertEqual(s.recv(1024), b'FALSE\n')
4456 s.write(b'PHA')
4457 self.assertEqual(s.recv(1024), b'OK\n')
4458 s.write(b'HASCERT')
4459 self.assertEqual(s.recv(1024), b'TRUE\n')
4460
4461 def test_pha_optional_nocert(self):
4462 if support.verbose:
4463 sys.stdout.write("\n")
4464
4465 client_context, server_context, hostname = testing_context()
4466 server_context.post_handshake_auth = True
4467 server_context.verify_mode = ssl.CERT_OPTIONAL
4468 client_context.post_handshake_auth = True
4469
4470 server = ThreadedEchoServer(context=server_context, chatty=False)
4471 with server:
4472 with client_context.wrap_socket(socket.socket(),
4473 server_hostname=hostname) as s:
4474 s.connect((HOST, server.port))
4475 s.write(b'HASCERT')
4476 self.assertEqual(s.recv(1024), b'FALSE\n')
4477 s.write(b'PHA')
4478 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004479 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004480 s.write(b'HASCERT')
4481 self.assertEqual(s.recv(1024), b'FALSE\n')
4482
4483 def test_pha_no_pha_client(self):
4484 client_context, server_context, hostname = testing_context()
4485 server_context.post_handshake_auth = True
4486 server_context.verify_mode = ssl.CERT_REQUIRED
4487 client_context.load_cert_chain(SIGNED_CERTFILE)
4488
4489 server = ThreadedEchoServer(context=server_context, chatty=False)
4490 with server:
4491 with client_context.wrap_socket(socket.socket(),
4492 server_hostname=hostname) as s:
4493 s.connect((HOST, server.port))
4494 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4495 s.verify_client_post_handshake()
4496 s.write(b'PHA')
4497 self.assertIn(b'extension not received', s.recv(1024))
4498
4499 def test_pha_no_pha_server(self):
4500 # server doesn't have PHA enabled, cert is requested in handshake
4501 client_context, server_context, hostname = testing_context()
4502 server_context.verify_mode = ssl.CERT_REQUIRED
4503 client_context.post_handshake_auth = True
4504 client_context.load_cert_chain(SIGNED_CERTFILE)
4505
4506 server = ThreadedEchoServer(context=server_context, chatty=False)
4507 with server:
4508 with client_context.wrap_socket(socket.socket(),
4509 server_hostname=hostname) as s:
4510 s.connect((HOST, server.port))
4511 s.write(b'HASCERT')
4512 self.assertEqual(s.recv(1024), b'TRUE\n')
4513 # PHA doesn't fail if there is already a cert
4514 s.write(b'PHA')
4515 self.assertEqual(s.recv(1024), b'OK\n')
4516 s.write(b'HASCERT')
4517 self.assertEqual(s.recv(1024), b'TRUE\n')
4518
4519 def test_pha_not_tls13(self):
4520 # TLS 1.2
4521 client_context, server_context, hostname = testing_context()
4522 server_context.verify_mode = ssl.CERT_REQUIRED
4523 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4524 client_context.post_handshake_auth = True
4525 client_context.load_cert_chain(SIGNED_CERTFILE)
4526
4527 server = ThreadedEchoServer(context=server_context, chatty=False)
4528 with server:
4529 with client_context.wrap_socket(socket.socket(),
4530 server_hostname=hostname) as s:
4531 s.connect((HOST, server.port))
4532 # PHA fails for TLS != 1.3
4533 s.write(b'PHA')
4534 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4535
Christian Heimesf22c4cf2019-07-01 09:25:48 +02004536 def test_bpo37428_pha_cert_none(self):
4537 # verify that post_handshake_auth does not implicitly enable cert
4538 # validation.
4539 hostname = SIGNED_CERTFILE_HOSTNAME
4540 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4541 client_context.post_handshake_auth = True
4542 client_context.load_cert_chain(SIGNED_CERTFILE)
4543 # no cert validation and CA on client side
4544 client_context.check_hostname = False
4545 client_context.verify_mode = ssl.CERT_NONE
4546
4547 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4548 server_context.load_cert_chain(SIGNED_CERTFILE)
4549 server_context.load_verify_locations(SIGNING_CA)
4550 server_context.post_handshake_auth = True
4551 server_context.verify_mode = ssl.CERT_REQUIRED
4552
4553 server = ThreadedEchoServer(context=server_context, chatty=False)
4554 with server:
4555 with client_context.wrap_socket(socket.socket(),
4556 server_hostname=hostname) as s:
4557 s.connect((HOST, server.port))
4558 s.write(b'HASCERT')
4559 self.assertEqual(s.recv(1024), b'FALSE\n')
4560 s.write(b'PHA')
4561 self.assertEqual(s.recv(1024), b'OK\n')
4562 s.write(b'HASCERT')
4563 self.assertEqual(s.recv(1024), b'TRUE\n')
4564 # server cert has not been validated
4565 self.assertEqual(s.getpeercert(), {})
4566
Christian Heimes9fb051f2018-09-23 08:32:31 +02004567
Christian Heimesc7f70692019-05-31 11:44:05 +02004568HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4569requires_keylog = unittest.skipUnless(
4570 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4571
4572class TestSSLDebug(unittest.TestCase):
4573
4574 def keylog_lines(self, fname=support.TESTFN):
4575 with open(fname) as f:
4576 return len(list(f))
4577
4578 @requires_keylog
4579 def test_keylog_defaults(self):
4580 self.addCleanup(support.unlink, support.TESTFN)
4581 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4582 self.assertEqual(ctx.keylog_filename, None)
4583
4584 self.assertFalse(os.path.isfile(support.TESTFN))
4585 ctx.keylog_filename = support.TESTFN
4586 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4587 self.assertTrue(os.path.isfile(support.TESTFN))
4588 self.assertEqual(self.keylog_lines(), 1)
4589
4590 ctx.keylog_filename = None
4591 self.assertEqual(ctx.keylog_filename, None)
4592
4593 with self.assertRaises((IsADirectoryError, PermissionError)):
4594 # Windows raises PermissionError
4595 ctx.keylog_filename = os.path.dirname(
4596 os.path.abspath(support.TESTFN))
4597
4598 with self.assertRaises(TypeError):
4599 ctx.keylog_filename = 1
4600
4601 @requires_keylog
4602 def test_keylog_filename(self):
4603 self.addCleanup(support.unlink, support.TESTFN)
4604 client_context, server_context, hostname = testing_context()
4605
4606 client_context.keylog_filename = support.TESTFN
4607 server = ThreadedEchoServer(context=server_context, chatty=False)
4608 with server:
4609 with client_context.wrap_socket(socket.socket(),
4610 server_hostname=hostname) as s:
4611 s.connect((HOST, server.port))
4612 # header, 5 lines for TLS 1.3
4613 self.assertEqual(self.keylog_lines(), 6)
4614
4615 client_context.keylog_filename = None
4616 server_context.keylog_filename = support.TESTFN
4617 server = ThreadedEchoServer(context=server_context, chatty=False)
4618 with server:
4619 with client_context.wrap_socket(socket.socket(),
4620 server_hostname=hostname) as s:
4621 s.connect((HOST, server.port))
4622 self.assertGreaterEqual(self.keylog_lines(), 11)
4623
4624 client_context.keylog_filename = support.TESTFN
4625 server_context.keylog_filename = support.TESTFN
4626 server = ThreadedEchoServer(context=server_context, chatty=False)
4627 with server:
4628 with client_context.wrap_socket(socket.socket(),
4629 server_hostname=hostname) as s:
4630 s.connect((HOST, server.port))
4631 self.assertGreaterEqual(self.keylog_lines(), 21)
4632
4633 client_context.keylog_filename = None
4634 server_context.keylog_filename = None
4635
4636 @requires_keylog
4637 @unittest.skipIf(sys.flags.ignore_environment,
4638 "test is not compatible with ignore_environment")
4639 def test_keylog_env(self):
4640 self.addCleanup(support.unlink, support.TESTFN)
4641 with unittest.mock.patch.dict(os.environ):
4642 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4643 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4644
4645 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4646 self.assertEqual(ctx.keylog_filename, None)
4647
4648 ctx = ssl.create_default_context()
4649 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4650
4651 ctx = ssl._create_stdlib_context()
4652 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4653
4654 def test_msg_callback(self):
4655 client_context, server_context, hostname = testing_context()
4656
4657 def msg_cb(conn, direction, version, content_type, msg_type, data):
4658 pass
4659
4660 self.assertIs(client_context._msg_callback, None)
4661 client_context._msg_callback = msg_cb
4662 self.assertIs(client_context._msg_callback, msg_cb)
4663 with self.assertRaises(TypeError):
4664 client_context._msg_callback = object()
4665
4666 def test_msg_callback_tls12(self):
4667 client_context, server_context, hostname = testing_context()
4668 client_context.options |= ssl.OP_NO_TLSv1_3
4669
4670 msg = []
4671
4672 def msg_cb(conn, direction, version, content_type, msg_type, data):
4673 self.assertIsInstance(conn, ssl.SSLSocket)
4674 self.assertIsInstance(data, bytes)
4675 self.assertIn(direction, {'read', 'write'})
4676 msg.append((direction, version, content_type, msg_type))
4677
4678 client_context._msg_callback = msg_cb
4679
4680 server = ThreadedEchoServer(context=server_context, chatty=False)
4681 with server:
4682 with client_context.wrap_socket(socket.socket(),
4683 server_hostname=hostname) as s:
4684 s.connect((HOST, server.port))
4685
Christian Heimese35d1ba2019-06-03 20:40:15 +02004686 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004687 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4688 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004689 msg
4690 )
4691 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004692 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4693 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004694 msg
4695 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004696
4697
Thomas Woutersed03b412007-08-28 21:37:11 +00004698def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004699 if support.verbose:
4700 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004701 'Mac': platform.mac_ver,
4702 'Windows': platform.win32_ver,
4703 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004704 for name, func in plats.items():
4705 plat = func()
4706 if plat and plat[0]:
4707 plat = '%s %r' % (name, plat)
4708 break
4709 else:
4710 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004711 print("test_ssl: testing with %r %r" %
4712 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4713 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004714 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004715 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4716 try:
4717 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4718 except AttributeError:
4719 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004720
Antoine Pitrou152efa22010-05-16 18:19:27 +00004721 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004722 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004723 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004724 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004725 BADCERT, BADKEY, EMPTYCERT]:
4726 if not os.path.exists(filename):
4727 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004728
Martin Panter3840b2a2016-03-27 01:53:46 +00004729 tests = [
4730 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004731 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004732 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004733 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004734
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004735 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004736 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004737
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004738 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004739 try:
4740 support.run_unittest(*tests)
4741 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004742 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004743
4744if __name__ == "__main__":
4745 test_main()