blob: 539cb7751db89d677d6c1f660c18faa230f62632 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes19311322019-09-26 22:53:09 +020022import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070023try:
24 import ctypes
25except ImportError:
26 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000027
Antoine Pitrou05d936d2010-10-13 11:38:36 +000028ssl = support.import_module("ssl")
29
Victor Stinner466e18e2019-07-01 19:01:52 +020030from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000031
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010032PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000033HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020034IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010035IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
36IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010037PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Victor Stinner3ef63442019-02-19 18:06:03 +010039PROTOCOL_TO_TLS_VERSION = {}
40for proto, ver in (
41 ("PROTOCOL_SSLv23", "SSLv3"),
42 ("PROTOCOL_TLSv1", "TLSv1"),
43 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
44):
45 try:
46 proto = getattr(ssl, proto)
47 ver = getattr(ssl.TLSVersion, ver)
48 except AttributeError:
49 continue
50 PROTOCOL_TO_TLS_VERSION[proto] = ver
51
Christian Heimesefff7062013-11-21 03:35:02 +010052def data_file(*name):
53 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000054
Antoine Pitrou81564092010-10-08 23:06:24 +000055# The custom key and certificate files used in test_ssl are generated
56# using Lib/test/make_ssl_certs.py.
57# Other certificates are simply fetched from the Internet servers they
58# are meant to authenticate.
59
Antoine Pitrou152efa22010-05-16 18:19:27 +000060CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000061BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000062ONLYCERT = data_file("ssl_cert.pem")
63ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000064BYTES_ONLYCERT = os.fsencode(ONLYCERT)
65BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020066CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
67ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
68KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000069CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000070BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010071CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
72CAFILE_CACERT = data_file("capath", "5ed36f99.0")
73
Christian Heimesbd5c7d22018-01-20 15:16:30 +010074CERTFILE_INFO = {
75 'issuer': ((('countryName', 'XY'),),
76 (('localityName', 'Castle Anthrax'),),
77 (('organizationName', 'Python Software Foundation'),),
78 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020079 'notAfter': 'Aug 26 14:23:15 2028 GMT',
80 'notBefore': 'Aug 29 14:23:15 2018 GMT',
81 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082 'subject': ((('countryName', 'XY'),),
83 (('localityName', 'Castle Anthrax'),),
84 (('organizationName', 'Python Software Foundation'),),
85 (('commonName', 'localhost'),)),
86 'subjectAltName': (('DNS', 'localhost'),),
87 'version': 3
88}
Antoine Pitrou152efa22010-05-16 18:19:27 +000089
Christian Heimes22587792013-11-21 23:56:13 +010090# empty CRL
91CRLFILE = data_file("revocation.crl")
92
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010093# Two keys and certs signed by the same CA (for SNI tests)
94SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020095SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010096
97SIGNED_CERTFILE_INFO = {
98 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
99 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
100 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
101 'issuer': ((('countryName', 'XY'),),
102 (('organizationName', 'Python Software Foundation CA'),),
103 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200104 'notAfter': 'Jul 7 14:23:16 2028 GMT',
105 'notBefore': 'Aug 29 14:23:16 2018 GMT',
106 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100107 'subject': ((('countryName', 'XY'),),
108 (('localityName', 'Castle Anthrax'),),
109 (('organizationName', 'Python Software Foundation'),),
110 (('commonName', 'localhost'),)),
111 'subjectAltName': (('DNS', 'localhost'),),
112 'version': 3
113}
114
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100115SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200116SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100117SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
118SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
119
Martin Panter3840b2a2016-03-27 01:53:46 +0000120# Same certificate as pycacert.pem, but without extra text in file
121SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200122# cert with all kinds of subject alt names
123ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100124IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100125
Martin Panter3d81d932016-01-14 09:36:00 +0000126REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000127
128EMPTYCERT = data_file("nullcert.pem")
129BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000130NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000131BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200132NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200133NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100134TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000135
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200136DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100137BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000138
Christian Heimes358cfd42016-09-10 22:43:48 +0200139# Not defined in all versions of OpenSSL
140OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
141OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
142OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
143OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100144OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200145
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100146
Christian Heimes19311322019-09-26 22:53:09 +0200147def has_tls_protocol(protocol):
148 """Check if a TLS protocol is available and enabled
149
150 :param protocol: enum ssl._SSLMethod member or name
151 :return: bool
152 """
153 if isinstance(protocol, str):
154 assert protocol.startswith('PROTOCOL_')
155 protocol = getattr(ssl, protocol, None)
156 if protocol is None:
157 return False
158 if protocol in {
159 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
160 ssl.PROTOCOL_TLS_CLIENT
161 }:
162 # auto-negotiate protocols are always available
163 return True
164 name = protocol.name
165 return has_tls_version(name[len('PROTOCOL_'):])
166
167
168@functools.lru_cache
169def has_tls_version(version):
170 """Check if a TLS/SSL version is enabled
171
172 :param version: TLS version name or ssl.TLSVersion member
173 :return: bool
174 """
175 if version == "SSLv2":
176 # never supported and not even in TLSVersion enum
177 return False
178
179 if isinstance(version, str):
180 version = ssl.TLSVersion.__members__[version]
181
182 # check compile time flags like ssl.HAS_TLSv1_2
183 if not getattr(ssl, f'HAS_{version.name}'):
184 return False
185
186 # check runtime and dynamic crypto policy settings. A TLS version may
187 # be compiled in but disabled by a policy or config option.
188 ctx = ssl.SSLContext()
189 if (
190 hasattr(ctx, 'minimum_version') and
191 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
192 version < ctx.minimum_version
193 ):
194 return False
195 if (
196 hasattr(ctx, 'maximum_version') and
197 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
198 version > ctx.maximum_version
199 ):
200 return False
201
202 return True
203
204
205def requires_tls_version(version):
206 """Decorator to skip tests when a required TLS version is not available
207
208 :param version: TLS version name or ssl.TLSVersion member
209 :return:
210 """
211 def decorator(func):
212 @functools.wraps(func)
213 def wrapper(*args, **kw):
214 if not has_tls_version(version):
215 raise unittest.SkipTest(f"{version} is not available.")
216 else:
217 return func(*args, **kw)
218 return wrapper
219 return decorator
220
221
222requires_minimum_version = unittest.skipUnless(
223 hasattr(ssl.SSLContext, 'minimum_version'),
224 "required OpenSSL >= 1.1.0g"
225)
226
227
Thomas Woutersed03b412007-08-28 21:37:11 +0000228def handle_error(prefix):
229 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000230 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000231 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000232
Antoine Pitroub5218772010-05-21 09:56:06 +0000233def can_clear_options():
234 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200235 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000236
237def no_sslv2_implies_sslv3_hello():
238 # 0.9.7h or higher
239 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
240
Christian Heimes2427b502013-11-23 11:24:32 +0100241def have_verify_flags():
242 # 0.9.8 or higher
243 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
244
Christian Heimesb7b92252018-02-25 09:49:31 +0100245def _have_secp_curves():
246 if not ssl.HAS_ECDH:
247 return False
248 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
249 try:
250 ctx.set_ecdh_curve("secp384r1")
251 except ValueError:
252 return False
253 else:
254 return True
255
256
257HAVE_SECP_CURVES = _have_secp_curves()
258
259
Antoine Pitrouc695c952014-04-28 20:57:36 +0200260def utc_offset(): #NOTE: ignore issues like #1647654
261 # local time = utc time + utc offset
262 if time.daylight and time.localtime().tm_isdst > 0:
263 return -time.altzone # seconds
264 return -time.timezone
265
Christian Heimes9424bb42013-06-17 15:32:57 +0200266def asn1time(cert_time):
267 # Some versions of OpenSSL ignore seconds, see #18207
268 # 0.9.8.i
269 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
270 fmt = "%b %d %H:%M:%S %Y GMT"
271 dt = datetime.datetime.strptime(cert_time, fmt)
272 dt = dt.replace(second=0)
273 cert_time = dt.strftime(fmt)
274 # %d adds leading zero but ASN1_TIME_print() uses leading space
275 if cert_time[4] == "0":
276 cert_time = cert_time[:4] + " " + cert_time[5:]
277
278 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000279
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100280needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
281
Antoine Pitrou23df4832010-08-04 17:14:06 +0000282
Christian Heimesd0486372016-09-10 23:23:33 +0200283def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
284 cert_reqs=ssl.CERT_NONE, ca_certs=None,
285 ciphers=None, certfile=None, keyfile=None,
286 **kwargs):
287 context = ssl.SSLContext(ssl_version)
288 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200289 if cert_reqs == ssl.CERT_NONE:
290 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200291 context.verify_mode = cert_reqs
292 if ca_certs is not None:
293 context.load_verify_locations(ca_certs)
294 if certfile is not None or keyfile is not None:
295 context.load_cert_chain(certfile, keyfile)
296 if ciphers is not None:
297 context.set_ciphers(ciphers)
298 return context.wrap_socket(sock, **kwargs)
299
Christian Heimesa170fa12017-09-15 20:27:30 +0200300
301def testing_context(server_cert=SIGNED_CERTFILE):
302 """Create context
303
304 client_context, server_context, hostname = testing_context()
305 """
306 if server_cert == SIGNED_CERTFILE:
307 hostname = SIGNED_CERTFILE_HOSTNAME
308 elif server_cert == SIGNED_CERTFILE2:
309 hostname = SIGNED_CERTFILE2_HOSTNAME
310 else:
311 raise ValueError(server_cert)
312
313 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
314 client_context.load_verify_locations(SIGNING_CA)
315
316 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
317 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200318 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200319
320 return client_context, server_context, hostname
321
322
Antoine Pitrou152efa22010-05-16 18:19:27 +0000323class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000324
Antoine Pitrou480a1242010-04-28 21:37:09 +0000325 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000326 ssl.CERT_NONE
327 ssl.CERT_OPTIONAL
328 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100329 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100330 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100331 if ssl.HAS_ECDH:
332 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100333 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
334 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000335 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100336 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700337 ssl.OP_NO_SSLv2
338 ssl.OP_NO_SSLv3
339 ssl.OP_NO_TLSv1
340 ssl.OP_NO_TLSv1_3
341 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
342 ssl.OP_NO_TLSv1_1
343 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200344 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000345
Christian Heimes9d50ab52018-02-27 10:17:30 +0100346 def test_private_init(self):
347 with self.assertRaisesRegex(TypeError, "public constructor"):
348 with socket.socket() as s:
349 ssl.SSLSocket(s)
350
Antoine Pitrou172f0252014-04-18 20:33:08 +0200351 def test_str_for_enums(self):
352 # Make sure that the PROTOCOL_* constants have enum-like string
353 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200354 proto = ssl.PROTOCOL_TLS
355 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200356 ctx = ssl.SSLContext(proto)
357 self.assertIs(ctx.protocol, proto)
358
Antoine Pitrou480a1242010-04-28 21:37:09 +0000359 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000360 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000361 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000362 sys.stdout.write("\n RAND_status is %d (%s)\n"
363 % (v, (v and "sufficient randomness") or
364 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200365
366 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
367 self.assertEqual(len(data), 16)
368 self.assertEqual(is_cryptographic, v == 1)
369 if v:
370 data = ssl.RAND_bytes(16)
371 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200372 else:
373 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200374
Victor Stinner1e81a392013-12-19 16:47:04 +0100375 # negative num is invalid
376 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
377 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
378
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100379 if hasattr(ssl, 'RAND_egd'):
380 self.assertRaises(TypeError, ssl.RAND_egd, 1)
381 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000382 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200383 ssl.RAND_add(b"this is a random bytes object", 75.0)
384 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000385
Christian Heimesf77b4b22013-08-21 13:26:05 +0200386 @unittest.skipUnless(os.name == 'posix', 'requires posix')
387 def test_random_fork(self):
388 status = ssl.RAND_status()
389 if not status:
390 self.fail("OpenSSL's PRNG has insufficient randomness")
391
392 rfd, wfd = os.pipe()
393 pid = os.fork()
394 if pid == 0:
395 try:
396 os.close(rfd)
397 child_random = ssl.RAND_pseudo_bytes(16)[0]
398 self.assertEqual(len(child_random), 16)
399 os.write(wfd, child_random)
400 os.close(wfd)
401 except BaseException:
402 os._exit(1)
403 else:
404 os._exit(0)
405 else:
406 os.close(wfd)
407 self.addCleanup(os.close, rfd)
408 _, status = os.waitpid(pid, 0)
409 self.assertEqual(status, 0)
410
411 child_random = os.read(rfd, 16)
412 self.assertEqual(len(child_random), 16)
413 parent_random = ssl.RAND_pseudo_bytes(16)[0]
414 self.assertEqual(len(parent_random), 16)
415
416 self.assertNotEqual(child_random, parent_random)
417
Christian Heimese6dac002018-08-30 07:25:49 +0200418 maxDiff = None
419
Antoine Pitrou480a1242010-04-28 21:37:09 +0000420 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000421 # note that this uses an 'unofficial' function in _ssl.c,
422 # provided solely for this test, to exercise the certificate
423 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100424 self.assertEqual(
425 ssl._ssl._test_decode_cert(CERTFILE),
426 CERTFILE_INFO
427 )
428 self.assertEqual(
429 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
430 SIGNED_CERTFILE_INFO
431 )
432
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200433 # Issue #13034: the subjectAltName in some certificates
434 # (notably projects.developer.nokia.com:443) wasn't parsed
435 p = ssl._ssl._test_decode_cert(NOKIACERT)
436 if support.verbose:
437 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
438 self.assertEqual(p['subjectAltName'],
439 (('DNS', 'projects.developer.nokia.com'),
440 ('DNS', 'projects.forum.nokia.com'))
441 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100442 # extra OCSP and AIA fields
443 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
444 self.assertEqual(p['caIssuers'],
445 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
446 self.assertEqual(p['crlDistributionPoints'],
447 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000448
Christian Heimesa37f5242019-01-15 23:47:42 +0100449 def test_parse_cert_CVE_2019_5010(self):
450 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
451 if support.verbose:
452 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
453 self.assertEqual(
454 p,
455 {
456 'issuer': (
457 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
458 'notAfter': 'Jun 14 18:00:58 2028 GMT',
459 'notBefore': 'Jun 18 18:00:58 2018 GMT',
460 'serialNumber': '02',
461 'subject': ((('countryName', 'UK'),),
462 (('commonName',
463 'codenomicon-vm-2.test.lal.cisco.com'),)),
464 'subjectAltName': (
465 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
466 'version': 3
467 }
468 )
469
Christian Heimes824f7f32013-08-17 00:54:47 +0200470 def test_parse_cert_CVE_2013_4238(self):
471 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
472 if support.verbose:
473 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
474 subject = ((('countryName', 'US'),),
475 (('stateOrProvinceName', 'Oregon'),),
476 (('localityName', 'Beaverton'),),
477 (('organizationName', 'Python Software Foundation'),),
478 (('organizationalUnitName', 'Python Core Development'),),
479 (('commonName', 'null.python.org\x00example.org'),),
480 (('emailAddress', 'python-dev@python.org'),))
481 self.assertEqual(p['subject'], subject)
482 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200483 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
484 san = (('DNS', 'altnull.python.org\x00example.com'),
485 ('email', 'null@python.org\x00user@example.org'),
486 ('URI', 'http://null.python.org\x00http://example.org'),
487 ('IP Address', '192.0.2.1'),
488 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
489 else:
490 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
491 san = (('DNS', 'altnull.python.org\x00example.com'),
492 ('email', 'null@python.org\x00user@example.org'),
493 ('URI', 'http://null.python.org\x00http://example.org'),
494 ('IP Address', '192.0.2.1'),
495 ('IP Address', '<invalid>'))
496
497 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200498
Christian Heimes1c03abd2016-09-06 23:25:35 +0200499 def test_parse_all_sans(self):
500 p = ssl._ssl._test_decode_cert(ALLSANFILE)
501 self.assertEqual(p['subjectAltName'],
502 (
503 ('DNS', 'allsans'),
504 ('othername', '<unsupported>'),
505 ('othername', '<unsupported>'),
506 ('email', 'user@example.org'),
507 ('DNS', 'www.example.org'),
508 ('DirName',
509 ((('countryName', 'XY'),),
510 (('localityName', 'Castle Anthrax'),),
511 (('organizationName', 'Python Software Foundation'),),
512 (('commonName', 'dirname example'),))),
513 ('URI', 'https://www.python.org/'),
514 ('IP Address', '127.0.0.1'),
515 ('IP Address', '0:0:0:0:0:0:0:1\n'),
516 ('Registered ID', '1.2.3.4.5')
517 )
518 )
519
Antoine Pitrou480a1242010-04-28 21:37:09 +0000520 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000521 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000522 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000523 d1 = ssl.PEM_cert_to_DER_cert(pem)
524 p2 = ssl.DER_cert_to_PEM_cert(d1)
525 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000526 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000527 if not p2.startswith(ssl.PEM_HEADER + '\n'):
528 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
529 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
530 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000531
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000532 def test_openssl_version(self):
533 n = ssl.OPENSSL_VERSION_NUMBER
534 t = ssl.OPENSSL_VERSION_INFO
535 s = ssl.OPENSSL_VERSION
536 self.assertIsInstance(n, int)
537 self.assertIsInstance(t, tuple)
538 self.assertIsInstance(s, str)
539 # Some sanity checks follow
540 # >= 0.9
541 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400542 # < 3.0
543 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000544 major, minor, fix, patch, status = t
545 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400546 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000547 self.assertGreaterEqual(minor, 0)
548 self.assertLess(minor, 256)
549 self.assertGreaterEqual(fix, 0)
550 self.assertLess(fix, 256)
551 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100552 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000553 self.assertGreaterEqual(status, 0)
554 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400555 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200556 if IS_LIBRESSL:
557 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100558 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400559 else:
560 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100561 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000562
Antoine Pitrou9d543662010-04-23 23:10:32 +0000563 @support.cpython_only
564 def test_refcycle(self):
565 # Issue #7943: an SSL object doesn't create reference cycles with
566 # itself.
567 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200568 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000569 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100570 with support.check_warnings(("", ResourceWarning)):
571 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100572 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000573
Antoine Pitroua468adc2010-09-14 14:43:44 +0000574 def test_wrapped_unconnected(self):
575 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200576 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000577 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200578 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100579 self.assertRaises(OSError, ss.recv, 1)
580 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
581 self.assertRaises(OSError, ss.recvfrom, 1)
582 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
583 self.assertRaises(OSError, ss.send, b'x')
584 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200585 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100586 self.assertRaises(NotImplementedError, ss.sendmsg,
587 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200588 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
589 self.assertRaises(NotImplementedError, ss.recvmsg_into,
590 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000591
Antoine Pitrou40f08742010-04-24 22:04:40 +0000592 def test_timeout(self):
593 # Issue #8524: when creating an SSL socket, the timeout of the
594 # original socket should be retained.
595 for timeout in (None, 0.0, 5.0):
596 s = socket.socket(socket.AF_INET)
597 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200598 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100599 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000600
Christian Heimesd0486372016-09-10 23:23:33 +0200601 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000602 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000603 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000604 "certfile must be specified",
605 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000606 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000607 "certfile must be specified for server-side operations",
608 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000609 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000610 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200611 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100612 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
613 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200614 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200615 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000616 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000617 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000618 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200619 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000620 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000621 ssl.wrap_socket(sock,
622 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000623 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200624 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000625 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000626 ssl.wrap_socket(sock,
627 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000628 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000629
Martin Panter3464ea22016-02-01 21:58:11 +0000630 def bad_cert_test(self, certfile):
631 """Check that trying to use the given client certificate fails"""
632 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
633 certfile)
634 sock = socket.socket()
635 self.addCleanup(sock.close)
636 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200637 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200638 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000639
640 def test_empty_cert(self):
641 """Wrapping with an empty cert file"""
642 self.bad_cert_test("nullcert.pem")
643
644 def test_malformed_cert(self):
645 """Wrapping with a badly formatted certificate (syntax error)"""
646 self.bad_cert_test("badcert.pem")
647
648 def test_malformed_key(self):
649 """Wrapping with a badly formatted key (syntax error)"""
650 self.bad_cert_test("badkey.pem")
651
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000652 def test_match_hostname(self):
653 def ok(cert, hostname):
654 ssl.match_hostname(cert, hostname)
655 def fail(cert, hostname):
656 self.assertRaises(ssl.CertificateError,
657 ssl.match_hostname, cert, hostname)
658
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100659 # -- Hostname matching --
660
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000661 cert = {'subject': ((('commonName', 'example.com'),),)}
662 ok(cert, 'example.com')
663 ok(cert, 'ExAmple.cOm')
664 fail(cert, 'www.example.com')
665 fail(cert, '.example.com')
666 fail(cert, 'example.org')
667 fail(cert, 'exampleXcom')
668
669 cert = {'subject': ((('commonName', '*.a.com'),),)}
670 ok(cert, 'foo.a.com')
671 fail(cert, 'bar.foo.a.com')
672 fail(cert, 'a.com')
673 fail(cert, 'Xa.com')
674 fail(cert, '.a.com')
675
Mandeep Singhede2ac92017-11-27 04:01:27 +0530676 # only match wildcards when they are the only thing
677 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000678 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530679 fail(cert, 'foo.com')
680 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000681 fail(cert, 'bar.com')
682 fail(cert, 'foo.a.com')
683 fail(cert, 'bar.foo.com')
684
Christian Heimes824f7f32013-08-17 00:54:47 +0200685 # NULL bytes are bad, CVE-2013-4073
686 cert = {'subject': ((('commonName',
687 'null.python.org\x00example.org'),),)}
688 ok(cert, 'null.python.org\x00example.org') # or raise an error?
689 fail(cert, 'example.org')
690 fail(cert, 'null.python.org')
691
Georg Brandl72c98d32013-10-27 07:16:53 +0100692 # error cases with wildcards
693 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
694 fail(cert, 'bar.foo.a.com')
695 fail(cert, 'a.com')
696 fail(cert, 'Xa.com')
697 fail(cert, '.a.com')
698
699 cert = {'subject': ((('commonName', 'a.*.com'),),)}
700 fail(cert, 'a.foo.com')
701 fail(cert, 'a..com')
702 fail(cert, 'a.com')
703
704 # wildcard doesn't match IDNA prefix 'xn--'
705 idna = 'püthon.python.org'.encode("idna").decode("ascii")
706 cert = {'subject': ((('commonName', idna),),)}
707 ok(cert, idna)
708 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
709 fail(cert, idna)
710 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
711 fail(cert, idna)
712
713 # wildcard in first fragment and IDNA A-labels in sequent fragments
714 # are supported.
715 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
716 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530717 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
718 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100719 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
720 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
721
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000722 # Slightly fake real-world example
723 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
724 'subject': ((('commonName', 'linuxfrz.org'),),),
725 'subjectAltName': (('DNS', 'linuxfr.org'),
726 ('DNS', 'linuxfr.com'),
727 ('othername', '<unsupported>'))}
728 ok(cert, 'linuxfr.org')
729 ok(cert, 'linuxfr.com')
730 # Not a "DNS" entry
731 fail(cert, '<unsupported>')
732 # When there is a subjectAltName, commonName isn't used
733 fail(cert, 'linuxfrz.org')
734
735 # A pristine real-world example
736 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
737 'subject': ((('countryName', 'US'),),
738 (('stateOrProvinceName', 'California'),),
739 (('localityName', 'Mountain View'),),
740 (('organizationName', 'Google Inc'),),
741 (('commonName', 'mail.google.com'),))}
742 ok(cert, 'mail.google.com')
743 fail(cert, 'gmail.com')
744 # Only commonName is considered
745 fail(cert, 'California')
746
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100747 # -- IPv4 matching --
748 cert = {'subject': ((('commonName', 'example.com'),),),
749 'subjectAltName': (('DNS', 'example.com'),
750 ('IP Address', '10.11.12.13'),
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700751 ('IP Address', '14.15.16.17'),
752 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100753 ok(cert, '10.11.12.13')
754 ok(cert, '14.15.16.17')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700755 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
756 fail(cert, '127.1')
757 fail(cert, '14.15.16.17 ')
758 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100759 fail(cert, '14.15.16.18')
760 fail(cert, 'example.net')
761
762 # -- IPv6 matching --
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700763 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100764 cert = {'subject': ((('commonName', 'example.com'),),),
765 'subjectAltName': (
766 ('DNS', 'example.com'),
767 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
768 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
769 ok(cert, '2001::cafe')
770 ok(cert, '2003::baba')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700771 fail(cert, '2003::baba ')
772 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100773 fail(cert, '2003::bebe')
774 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100775
776 # -- Miscellaneous --
777
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000778 # Neither commonName nor subjectAltName
779 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
780 'subject': ((('countryName', 'US'),),
781 (('stateOrProvinceName', 'California'),),
782 (('localityName', 'Mountain View'),),
783 (('organizationName', 'Google Inc'),))}
784 fail(cert, 'mail.google.com')
785
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200786 # No DNS entry in subjectAltName but a commonName
787 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
788 'subject': ((('countryName', 'US'),),
789 (('stateOrProvinceName', 'California'),),
790 (('localityName', 'Mountain View'),),
791 (('commonName', 'mail.google.com'),)),
792 'subjectAltName': (('othername', 'blabla'), )}
793 ok(cert, 'mail.google.com')
794
795 # No DNS entry subjectAltName and no commonName
796 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
797 'subject': ((('countryName', 'US'),),
798 (('stateOrProvinceName', 'California'),),
799 (('localityName', 'Mountain View'),),
800 (('organizationName', 'Google Inc'),)),
801 'subjectAltName': (('othername', 'blabla'),)}
802 fail(cert, 'google.com')
803
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000804 # Empty cert / no cert
805 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
806 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
807
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200808 # Issue #17980: avoid denials of service by refusing more than one
809 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100810 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
811 with self.assertRaisesRegex(
812 ssl.CertificateError,
813 "partial wildcards in leftmost label are not supported"):
814 ssl.match_hostname(cert, 'axxb.example.com')
815
816 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
817 with self.assertRaisesRegex(
818 ssl.CertificateError,
819 "wildcard can only be present in the leftmost label"):
820 ssl.match_hostname(cert, 'www.sub.example.com')
821
822 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
823 with self.assertRaisesRegex(
824 ssl.CertificateError,
825 "too many wildcards"):
826 ssl.match_hostname(cert, 'axxbxxc.example.com')
827
828 cert = {'subject': ((('commonName', '*'),),)}
829 with self.assertRaisesRegex(
830 ssl.CertificateError,
831 "sole wildcard without additional labels are not support"):
832 ssl.match_hostname(cert, 'host')
833
834 cert = {'subject': ((('commonName', '*.com'),),)}
835 with self.assertRaisesRegex(
836 ssl.CertificateError,
837 r"hostname 'com' doesn't match '\*.com'"):
838 ssl.match_hostname(cert, 'com')
839
840 # extra checks for _inet_paton()
841 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
842 with self.assertRaises(ValueError):
843 ssl._inet_paton(invalid)
844 for ipaddr in ['127.0.0.1', '192.168.0.1']:
845 self.assertTrue(ssl._inet_paton(ipaddr))
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700846 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100847 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
848 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200849
Antoine Pitroud5323212010-10-22 18:19:07 +0000850 def test_server_side(self):
851 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000853 with socket.socket() as sock:
854 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
855 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000856
Antoine Pitroud6494802011-07-21 01:11:30 +0200857 def test_unknown_channel_binding(self):
858 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200859 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200860 c = socket.socket(socket.AF_INET)
861 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200862 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100863 with self.assertRaises(ValueError):
864 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200865 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200866
867 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
868 "'tls-unique' channel binding not available")
869 def test_tls_unique_channel_binding(self):
870 # unconnected should return None for known type
871 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200872 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100873 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200874 # the same for server-side
875 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200876 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100877 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200878
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600879 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200880 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600881 r = repr(ss)
882 with self.assertWarns(ResourceWarning) as cm:
883 ss = None
884 support.gc_collect()
885 self.assertIn(r, str(cm.warning.args[0]))
886
Christian Heimes6d7ad132013-06-09 18:02:55 +0200887 def test_get_default_verify_paths(self):
888 paths = ssl.get_default_verify_paths()
889 self.assertEqual(len(paths), 6)
890 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
891
892 with support.EnvironmentVarGuard() as env:
893 env["SSL_CERT_DIR"] = CAPATH
894 env["SSL_CERT_FILE"] = CERTFILE
895 paths = ssl.get_default_verify_paths()
896 self.assertEqual(paths.cafile, CERTFILE)
897 self.assertEqual(paths.capath, CAPATH)
898
Christian Heimes44109d72013-11-22 01:51:30 +0100899 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
900 def test_enum_certificates(self):
901 self.assertTrue(ssl.enum_certificates("CA"))
902 self.assertTrue(ssl.enum_certificates("ROOT"))
903
904 self.assertRaises(TypeError, ssl.enum_certificates)
905 self.assertRaises(WindowsError, ssl.enum_certificates, "")
906
Christian Heimesc2d65e12013-11-22 16:13:55 +0100907 trust_oids = set()
908 for storename in ("CA", "ROOT"):
909 store = ssl.enum_certificates(storename)
910 self.assertIsInstance(store, list)
911 for element in store:
912 self.assertIsInstance(element, tuple)
913 self.assertEqual(len(element), 3)
914 cert, enc, trust = element
915 self.assertIsInstance(cert, bytes)
916 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Steve Dowerfdd17ab2019-09-10 02:02:04 -0700917 self.assertIsInstance(trust, (frozenset, set, bool))
918 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100919 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100920
921 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100922 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200923
Christian Heimes46bebee2013-06-09 19:03:31 +0200924 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100925 def test_enum_crls(self):
926 self.assertTrue(ssl.enum_crls("CA"))
927 self.assertRaises(TypeError, ssl.enum_crls)
928 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200929
Christian Heimes44109d72013-11-22 01:51:30 +0100930 crls = ssl.enum_crls("CA")
931 self.assertIsInstance(crls, list)
932 for element in crls:
933 self.assertIsInstance(element, tuple)
934 self.assertEqual(len(element), 2)
935 self.assertIsInstance(element[0], bytes)
936 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200937
Christian Heimes46bebee2013-06-09 19:03:31 +0200938
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100939 def test_asn1object(self):
940 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
941 '1.3.6.1.5.5.7.3.1')
942
943 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
944 self.assertEqual(val, expected)
945 self.assertEqual(val.nid, 129)
946 self.assertEqual(val.shortname, 'serverAuth')
947 self.assertEqual(val.longname, 'TLS Web Server Authentication')
948 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
949 self.assertIsInstance(val, ssl._ASN1Object)
950 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
951
952 val = ssl._ASN1Object.fromnid(129)
953 self.assertEqual(val, expected)
954 self.assertIsInstance(val, ssl._ASN1Object)
955 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100956 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
957 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100958 for i in range(1000):
959 try:
960 obj = ssl._ASN1Object.fromnid(i)
961 except ValueError:
962 pass
963 else:
964 self.assertIsInstance(obj.nid, int)
965 self.assertIsInstance(obj.shortname, str)
966 self.assertIsInstance(obj.longname, str)
967 self.assertIsInstance(obj.oid, (str, type(None)))
968
969 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
970 self.assertEqual(val, expected)
971 self.assertIsInstance(val, ssl._ASN1Object)
972 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
973 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
974 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100975 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
976 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100977
Christian Heimes72d28502013-11-23 13:56:58 +0100978 def test_purpose_enum(self):
979 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
980 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
981 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
982 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
983 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
984 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
985 '1.3.6.1.5.5.7.3.1')
986
987 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
988 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
989 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
990 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
991 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
992 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
993 '1.3.6.1.5.5.7.3.2')
994
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100995 def test_unsupported_dtls(self):
996 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
997 self.addCleanup(s.close)
998 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200999 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001000 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001002 with self.assertRaises(NotImplementedError) as cx:
1003 ctx.wrap_socket(s)
1004 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1005
Antoine Pitrouc695c952014-04-28 20:57:36 +02001006 def cert_time_ok(self, timestring, timestamp):
1007 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1008
1009 def cert_time_fail(self, timestring):
1010 with self.assertRaises(ValueError):
1011 ssl.cert_time_to_seconds(timestring)
1012
1013 @unittest.skipUnless(utc_offset(),
1014 'local time needs to be different from UTC')
1015 def test_cert_time_to_seconds_timezone(self):
1016 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1017 # results if local timezone is not UTC
1018 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1019 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1020
1021 def test_cert_time_to_seconds(self):
1022 timestring = "Jan 5 09:34:43 2018 GMT"
1023 ts = 1515144883.0
1024 self.cert_time_ok(timestring, ts)
1025 # accept keyword parameter, assert its name
1026 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1027 # accept both %e and %d (space or zero generated by strftime)
1028 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1029 # case-insensitive
1030 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1031 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1032 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1033 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1034 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1035 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1036 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1037 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1038
1039 newyear_ts = 1230768000.0
1040 # leap seconds
1041 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1042 # same timestamp
1043 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1044
1045 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1046 # allow 60th second (even if it is not a leap second)
1047 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1048 # allow 2nd leap second for compatibility with time.strptime()
1049 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1050 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1051
Mike53f7a7c2017-12-14 14:04:53 +03001052 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001053 # 99991231235959Z (rfc 5280)
1054 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1055
1056 @support.run_with_locale('LC_ALL', '')
1057 def test_cert_time_to_seconds_locale(self):
1058 # `cert_time_to_seconds()` should be locale independent
1059
1060 def local_february_name():
1061 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1062
1063 if local_february_name().lower() == 'feb':
1064 self.skipTest("locale-specific month name needs to be "
1065 "different from C locale")
1066
1067 # locale-independent
1068 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1069 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1070
Martin Panter3840b2a2016-03-27 01:53:46 +00001071 def test_connect_ex_error(self):
1072 server = socket.socket(socket.AF_INET)
1073 self.addCleanup(server.close)
1074 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001075 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001076 cert_reqs=ssl.CERT_REQUIRED)
1077 self.addCleanup(s.close)
1078 rc = s.connect_ex((HOST, port))
1079 # Issue #19919: Windows machines or VMs hosted on Windows
1080 # machines sometimes return EWOULDBLOCK.
1081 errors = (
1082 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1083 errno.EWOULDBLOCK,
1084 )
1085 self.assertIn(rc, errors)
1086
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001087
Antoine Pitrou152efa22010-05-16 18:19:27 +00001088class ContextTests(unittest.TestCase):
1089
1090 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001091 for protocol in PROTOCOLS:
1092 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001093 ctx = ssl.SSLContext()
1094 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001095 self.assertRaises(ValueError, ssl.SSLContext, -1)
1096 self.assertRaises(ValueError, ssl.SSLContext, 42)
1097
1098 def test_protocol(self):
1099 for proto in PROTOCOLS:
1100 ctx = ssl.SSLContext(proto)
1101 self.assertEqual(ctx.protocol, proto)
1102
1103 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001104 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001105 ctx.set_ciphers("ALL")
1106 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001107 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001108 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001109
Christian Heimes892d66e2018-01-29 14:10:18 +01001110 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1111 "Test applies only to Python default ciphers")
1112 def test_python_ciphers(self):
1113 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1114 ciphers = ctx.get_ciphers()
1115 for suite in ciphers:
1116 name = suite['name']
1117 self.assertNotIn("PSK", name)
1118 self.assertNotIn("SRP", name)
1119 self.assertNotIn("MD5", name)
1120 self.assertNotIn("RC4", name)
1121 self.assertNotIn("3DES", name)
1122
Christian Heimes25bfcd52016-09-06 00:04:45 +02001123 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1124 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001125 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001126 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001127 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001128 self.assertIn('AES256-GCM-SHA384', names)
1129 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001130
Antoine Pitroub5218772010-05-21 09:56:06 +00001131 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001132 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001133 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001134 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001135 # SSLContext also enables these by default
1136 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001137 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1138 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001139 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001140 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001141 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001142 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001143 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1144 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001145 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001146 # Ubuntu has OP_NO_SSLv3 forced on by default
1147 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001148 else:
1149 with self.assertRaises(ValueError):
1150 ctx.options = 0
1151
Christian Heimesa170fa12017-09-15 20:27:30 +02001152 def test_verify_mode_protocol(self):
1153 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001154 # Default value
1155 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1156 ctx.verify_mode = ssl.CERT_OPTIONAL
1157 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1158 ctx.verify_mode = ssl.CERT_REQUIRED
1159 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1160 ctx.verify_mode = ssl.CERT_NONE
1161 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1162 with self.assertRaises(TypeError):
1163 ctx.verify_mode = None
1164 with self.assertRaises(ValueError):
1165 ctx.verify_mode = 42
1166
Christian Heimesa170fa12017-09-15 20:27:30 +02001167 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1168 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1169 self.assertFalse(ctx.check_hostname)
1170
1171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1172 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1173 self.assertTrue(ctx.check_hostname)
1174
Christian Heimes61d478c2018-01-27 15:51:38 +01001175 def test_hostname_checks_common_name(self):
1176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1177 self.assertTrue(ctx.hostname_checks_common_name)
1178 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1179 ctx.hostname_checks_common_name = True
1180 self.assertTrue(ctx.hostname_checks_common_name)
1181 ctx.hostname_checks_common_name = False
1182 self.assertFalse(ctx.hostname_checks_common_name)
1183 ctx.hostname_checks_common_name = True
1184 self.assertTrue(ctx.hostname_checks_common_name)
1185 else:
1186 with self.assertRaises(AttributeError):
1187 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001188
Christian Heimes19311322019-09-26 22:53:09 +02001189 @requires_minimum_version
Miss Islington (bot)d6ac67f2019-09-11 10:59:13 -07001190 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001191 def test_min_max_version(self):
1192 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001193 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1194 # Fedora override the setting to TLS 1.0.
Christian Heimes19311322019-09-26 22:53:09 +02001195 minimum_range = {
1196 # stock OpenSSL
1197 ssl.TLSVersion.MINIMUM_SUPPORTED,
1198 # Fedora 29 uses TLS 1.0 by default
1199 ssl.TLSVersion.TLSv1,
1200 # RHEL 8 uses TLS 1.2 by default
1201 ssl.TLSVersion.TLSv1_2
1202 }
Miss Islington (bot)9e728802019-12-02 08:34:44 -08001203 maximum_range = {
1204 # stock OpenSSL
1205 ssl.TLSVersion.MAXIMUM_SUPPORTED,
1206 # Fedora 32 uses TLS 1.3 by default
1207 ssl.TLSVersion.TLSv1_3
1208 }
Christian Heimes19311322019-09-26 22:53:09 +02001209
Christian Heimes34de2d32019-01-18 16:09:30 +01001210 self.assertIn(
Christian Heimes19311322019-09-26 22:53:09 +02001211 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001212 )
Miss Islington (bot)9e728802019-12-02 08:34:44 -08001213 self.assertIn(
1214 ctx.maximum_version, maximum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001215 )
1216
1217 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1218 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1219 self.assertEqual(
1220 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1221 )
1222 self.assertEqual(
1223 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1224 )
1225
1226 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1227 ctx.maximum_version = ssl.TLSVersion.TLSv1
1228 self.assertEqual(
1229 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1230 )
1231 self.assertEqual(
1232 ctx.maximum_version, ssl.TLSVersion.TLSv1
1233 )
1234
1235 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1236 self.assertEqual(
1237 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1238 )
1239
1240 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1241 self.assertIn(
1242 ctx.maximum_version,
1243 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1244 )
1245
1246 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1247 self.assertIn(
1248 ctx.minimum_version,
1249 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1250 )
1251
1252 with self.assertRaises(ValueError):
1253 ctx.minimum_version = 42
1254
1255 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1256
Christian Heimes19311322019-09-26 22:53:09 +02001257 self.assertIn(
1258 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001259 )
1260 self.assertEqual(
1261 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1262 )
1263 with self.assertRaises(ValueError):
1264 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1265 with self.assertRaises(ValueError):
1266 ctx.maximum_version = ssl.TLSVersion.TLSv1
1267
1268
Christian Heimes2427b502013-11-23 11:24:32 +01001269 @unittest.skipUnless(have_verify_flags(),
1270 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001271 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001272 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001273 # default value
1274 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1275 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001276 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1277 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1278 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1279 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1280 ctx.verify_flags = ssl.VERIFY_DEFAULT
1281 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1282 # supports any value
1283 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1284 self.assertEqual(ctx.verify_flags,
1285 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1286 with self.assertRaises(TypeError):
1287 ctx.verify_flags = None
1288
Antoine Pitrou152efa22010-05-16 18:19:27 +00001289 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001290 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001291 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001292 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001293 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1294 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001295 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001296 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001297 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001298 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001299 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001300 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001301 ctx.load_cert_chain(EMPTYCERT)
1302 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001303 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001304 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1305 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1306 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001307 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001308 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001309 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001310 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001311 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001312 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1313 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001314 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001315 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001316 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001317 # Password protected key and cert
1318 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1319 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1320 ctx.load_cert_chain(CERTFILE_PROTECTED,
1321 password=bytearray(KEY_PASSWORD.encode()))
1322 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1323 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1324 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1325 bytearray(KEY_PASSWORD.encode()))
1326 with self.assertRaisesRegex(TypeError, "should be a string"):
1327 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1328 with self.assertRaises(ssl.SSLError):
1329 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1330 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1331 # openssl has a fixed limit on the password buffer.
1332 # PEM_BUFSIZE is generally set to 1kb.
1333 # Return a string larger than this.
1334 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1335 # Password callback
1336 def getpass_unicode():
1337 return KEY_PASSWORD
1338 def getpass_bytes():
1339 return KEY_PASSWORD.encode()
1340 def getpass_bytearray():
1341 return bytearray(KEY_PASSWORD.encode())
1342 def getpass_badpass():
1343 return "badpass"
1344 def getpass_huge():
1345 return b'a' * (1024 * 1024)
1346 def getpass_bad_type():
1347 return 9
1348 def getpass_exception():
1349 raise Exception('getpass error')
1350 class GetPassCallable:
1351 def __call__(self):
1352 return KEY_PASSWORD
1353 def getpass(self):
1354 return KEY_PASSWORD
1355 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1356 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1357 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1358 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1359 ctx.load_cert_chain(CERTFILE_PROTECTED,
1360 password=GetPassCallable().getpass)
1361 with self.assertRaises(ssl.SSLError):
1362 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1363 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1364 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1365 with self.assertRaisesRegex(TypeError, "must return a string"):
1366 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1367 with self.assertRaisesRegex(Exception, "getpass error"):
1368 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1369 # Make sure the password function isn't called if it isn't needed
1370 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001371
1372 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001374 ctx.load_verify_locations(CERTFILE)
1375 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1376 ctx.load_verify_locations(BYTES_CERTFILE)
1377 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1378 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001379 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001380 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001381 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001382 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001383 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001384 ctx.load_verify_locations(BADCERT)
1385 ctx.load_verify_locations(CERTFILE, CAPATH)
1386 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1387
Victor Stinner80f75e62011-01-29 11:31:20 +00001388 # Issue #10989: crash if the second argument type is invalid
1389 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1390
Christian Heimesefff7062013-11-21 03:35:02 +01001391 def test_load_verify_cadata(self):
1392 # test cadata
1393 with open(CAFILE_CACERT) as f:
1394 cacert_pem = f.read()
1395 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1396 with open(CAFILE_NEURONIO) as f:
1397 neuronio_pem = f.read()
1398 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1399
1400 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001401 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001402 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1403 ctx.load_verify_locations(cadata=cacert_pem)
1404 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1405 ctx.load_verify_locations(cadata=neuronio_pem)
1406 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1407 # cert already in hash table
1408 ctx.load_verify_locations(cadata=neuronio_pem)
1409 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1410
1411 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001412 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001413 combined = "\n".join((cacert_pem, neuronio_pem))
1414 ctx.load_verify_locations(cadata=combined)
1415 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1416
1417 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001418 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001419 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1420 neuronio_pem, "tail"]
1421 ctx.load_verify_locations(cadata="\n".join(combined))
1422 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1423
1424 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001426 ctx.load_verify_locations(cadata=cacert_der)
1427 ctx.load_verify_locations(cadata=neuronio_der)
1428 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1429 # cert already in hash table
1430 ctx.load_verify_locations(cadata=cacert_der)
1431 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1432
1433 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001434 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001435 combined = b"".join((cacert_der, neuronio_der))
1436 ctx.load_verify_locations(cadata=combined)
1437 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1438
1439 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001440 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001441 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1442
1443 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1444 ctx.load_verify_locations(cadata="broken")
1445 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1446 ctx.load_verify_locations(cadata=b"broken")
1447
1448
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001449 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001450 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001451 ctx.load_dh_params(DHFILE)
1452 if os.name != 'nt':
1453 ctx.load_dh_params(BYTES_DHFILE)
1454 self.assertRaises(TypeError, ctx.load_dh_params)
1455 self.assertRaises(TypeError, ctx.load_dh_params, None)
1456 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001457 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001458 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001459 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001460 ctx.load_dh_params(CERTFILE)
1461
Antoine Pitroub0182c82010-10-12 20:09:02 +00001462 def test_session_stats(self):
1463 for proto in PROTOCOLS:
1464 ctx = ssl.SSLContext(proto)
1465 self.assertEqual(ctx.session_stats(), {
1466 'number': 0,
1467 'connect': 0,
1468 'connect_good': 0,
1469 'connect_renegotiate': 0,
1470 'accept': 0,
1471 'accept_good': 0,
1472 'accept_renegotiate': 0,
1473 'hits': 0,
1474 'misses': 0,
1475 'timeouts': 0,
1476 'cache_full': 0,
1477 })
1478
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001479 def test_set_default_verify_paths(self):
1480 # There's not much we can do to test that it acts as expected,
1481 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001482 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001483 ctx.set_default_verify_paths()
1484
Antoine Pitrou501da612011-12-21 09:27:41 +01001485 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001486 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001487 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001488 ctx.set_ecdh_curve("prime256v1")
1489 ctx.set_ecdh_curve(b"prime256v1")
1490 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1491 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1492 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1493 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1494
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001495 @needs_sni
1496 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001497 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001498
1499 # set_servername_callback expects a callable, or None
1500 self.assertRaises(TypeError, ctx.set_servername_callback)
1501 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1502 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1503 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1504
1505 def dummycallback(sock, servername, ctx):
1506 pass
1507 ctx.set_servername_callback(None)
1508 ctx.set_servername_callback(dummycallback)
1509
1510 @needs_sni
1511 def test_sni_callback_refcycle(self):
1512 # Reference cycles through the servername callback are detected
1513 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001514 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001515 def dummycallback(sock, servername, ctx, cycle=ctx):
1516 pass
1517 ctx.set_servername_callback(dummycallback)
1518 wr = weakref.ref(ctx)
1519 del ctx, dummycallback
1520 gc.collect()
1521 self.assertIs(wr(), None)
1522
Christian Heimes9a5395a2013-06-17 15:44:12 +02001523 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001524 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001525 self.assertEqual(ctx.cert_store_stats(),
1526 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1527 ctx.load_cert_chain(CERTFILE)
1528 self.assertEqual(ctx.cert_store_stats(),
1529 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1530 ctx.load_verify_locations(CERTFILE)
1531 self.assertEqual(ctx.cert_store_stats(),
1532 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001533 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001534 self.assertEqual(ctx.cert_store_stats(),
1535 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1536
1537 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001539 self.assertEqual(ctx.get_ca_certs(), [])
1540 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1541 ctx.load_verify_locations(CERTFILE)
1542 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001543 # but CAFILE_CACERT is a CA cert
1544 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001545 self.assertEqual(ctx.get_ca_certs(),
1546 [{'issuer': ((('organizationName', 'Root CA'),),
1547 (('organizationalUnitName', 'http://www.cacert.org'),),
1548 (('commonName', 'CA Cert Signing Authority'),),
1549 (('emailAddress', 'support@cacert.org'),)),
1550 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1551 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1552 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001553 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001554 'subject': ((('organizationName', 'Root CA'),),
1555 (('organizationalUnitName', 'http://www.cacert.org'),),
1556 (('commonName', 'CA Cert Signing Authority'),),
1557 (('emailAddress', 'support@cacert.org'),)),
1558 'version': 3}])
1559
Martin Panterb55f8b72016-01-14 12:53:56 +00001560 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001561 pem = f.read()
1562 der = ssl.PEM_cert_to_DER_cert(pem)
1563 self.assertEqual(ctx.get_ca_certs(True), [der])
1564
Christian Heimes72d28502013-11-23 13:56:58 +01001565 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001566 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001567 ctx.load_default_certs()
1568
Christian Heimesa170fa12017-09-15 20:27:30 +02001569 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001570 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1571 ctx.load_default_certs()
1572
Christian Heimesa170fa12017-09-15 20:27:30 +02001573 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001574 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1575
Christian Heimesa170fa12017-09-15 20:27:30 +02001576 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001577 self.assertRaises(TypeError, ctx.load_default_certs, None)
1578 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1579
Benjamin Peterson91244e02014-10-03 18:17:15 -04001580 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001581 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001582 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001583 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001584 with support.EnvironmentVarGuard() as env:
1585 env["SSL_CERT_DIR"] = CAPATH
1586 env["SSL_CERT_FILE"] = CERTFILE
1587 ctx.load_default_certs()
1588 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1589
Benjamin Peterson91244e02014-10-03 18:17:15 -04001590 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001591 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001592 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001593 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001594 ctx.load_default_certs()
1595 stats = ctx.cert_store_stats()
1596
Christian Heimesa170fa12017-09-15 20:27:30 +02001597 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001598 with support.EnvironmentVarGuard() as env:
1599 env["SSL_CERT_DIR"] = CAPATH
1600 env["SSL_CERT_FILE"] = CERTFILE
1601 ctx.load_default_certs()
1602 stats["x509"] += 1
1603 self.assertEqual(ctx.cert_store_stats(), stats)
1604
Christian Heimes358cfd42016-09-10 22:43:48 +02001605 def _assert_context_options(self, ctx):
1606 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1607 if OP_NO_COMPRESSION != 0:
1608 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1609 OP_NO_COMPRESSION)
1610 if OP_SINGLE_DH_USE != 0:
1611 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1612 OP_SINGLE_DH_USE)
1613 if OP_SINGLE_ECDH_USE != 0:
1614 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1615 OP_SINGLE_ECDH_USE)
1616 if OP_CIPHER_SERVER_PREFERENCE != 0:
1617 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1618 OP_CIPHER_SERVER_PREFERENCE)
1619
Christian Heimes4c05b472013-11-23 15:58:30 +01001620 def test_create_default_context(self):
1621 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001622
Christian Heimesa170fa12017-09-15 20:27:30 +02001623 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001624 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001625 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001626 self._assert_context_options(ctx)
1627
Christian Heimes4c05b472013-11-23 15:58:30 +01001628 with open(SIGNING_CA) as f:
1629 cadata = f.read()
1630 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1631 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001632 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001633 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001634 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001635
1636 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001638 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001639 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001640
Christian Heimes67986f92013-11-23 22:43:47 +01001641 def test__create_stdlib_context(self):
1642 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001643 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001644 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001645 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001646 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001647
1648 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1649 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1650 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001651 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001652
1653 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001654 cert_reqs=ssl.CERT_REQUIRED,
1655 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001656 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1657 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001658 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001659 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001660
1661 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001662 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001663 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001664 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001665
Christian Heimes1aa9a752013-12-02 02:41:19 +01001666 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001667 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001668 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001669 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001670
Christian Heimese82c0342017-09-15 20:29:57 +02001671 # Auto set CERT_REQUIRED
1672 ctx.check_hostname = True
1673 self.assertTrue(ctx.check_hostname)
1674 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1675 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001676 ctx.verify_mode = ssl.CERT_REQUIRED
1677 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001678 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001679
Christian Heimese82c0342017-09-15 20:29:57 +02001680 # Changing verify_mode does not affect check_hostname
1681 ctx.check_hostname = False
1682 ctx.verify_mode = ssl.CERT_NONE
1683 ctx.check_hostname = False
1684 self.assertFalse(ctx.check_hostname)
1685 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1686 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001687 ctx.check_hostname = True
1688 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001689 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1690
1691 ctx.check_hostname = False
1692 ctx.verify_mode = ssl.CERT_OPTIONAL
1693 ctx.check_hostname = False
1694 self.assertFalse(ctx.check_hostname)
1695 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1696 # keep CERT_OPTIONAL
1697 ctx.check_hostname = True
1698 self.assertTrue(ctx.check_hostname)
1699 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001700
1701 # Cannot set CERT_NONE with check_hostname enabled
1702 with self.assertRaises(ValueError):
1703 ctx.verify_mode = ssl.CERT_NONE
1704 ctx.check_hostname = False
1705 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001706 ctx.verify_mode = ssl.CERT_NONE
1707 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001708
Christian Heimes5fe668c2016-09-12 00:01:11 +02001709 def test_context_client_server(self):
1710 # PROTOCOL_TLS_CLIENT has sane defaults
1711 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1712 self.assertTrue(ctx.check_hostname)
1713 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1714
1715 # PROTOCOL_TLS_SERVER has different but also sane defaults
1716 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1717 self.assertFalse(ctx.check_hostname)
1718 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1719
Christian Heimes4df60f12017-09-15 20:26:05 +02001720 def test_context_custom_class(self):
1721 class MySSLSocket(ssl.SSLSocket):
1722 pass
1723
1724 class MySSLObject(ssl.SSLObject):
1725 pass
1726
1727 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1728 ctx.sslsocket_class = MySSLSocket
1729 ctx.sslobject_class = MySSLObject
1730
1731 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1732 self.assertIsInstance(sock, MySSLSocket)
1733 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1734 self.assertIsInstance(obj, MySSLObject)
1735
Christian Heimes78c7d522019-06-03 21:00:10 +02001736 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1737 def test_num_tickest(self):
1738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1739 self.assertEqual(ctx.num_tickets, 2)
1740 ctx.num_tickets = 1
1741 self.assertEqual(ctx.num_tickets, 1)
1742 ctx.num_tickets = 0
1743 self.assertEqual(ctx.num_tickets, 0)
1744 with self.assertRaises(ValueError):
1745 ctx.num_tickets = -1
1746 with self.assertRaises(TypeError):
1747 ctx.num_tickets = None
1748
1749 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1750 self.assertEqual(ctx.num_tickets, 2)
1751 with self.assertRaises(ValueError):
1752 ctx.num_tickets = 1
1753
Antoine Pitrou152efa22010-05-16 18:19:27 +00001754
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001755class SSLErrorTests(unittest.TestCase):
1756
1757 def test_str(self):
1758 # The str() of a SSLError doesn't include the errno
1759 e = ssl.SSLError(1, "foo")
1760 self.assertEqual(str(e), "foo")
1761 self.assertEqual(e.errno, 1)
1762 # Same for a subclass
1763 e = ssl.SSLZeroReturnError(1, "foo")
1764 self.assertEqual(str(e), "foo")
1765 self.assertEqual(e.errno, 1)
1766
1767 def test_lib_reason(self):
1768 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001769 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001770 with self.assertRaises(ssl.SSLError) as cm:
1771 ctx.load_dh_params(CERTFILE)
1772 self.assertEqual(cm.exception.library, 'PEM')
1773 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1774 s = str(cm.exception)
1775 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1776
1777 def test_subclass(self):
1778 # Check that the appropriate SSLError subclass is raised
1779 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001780 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1781 ctx.check_hostname = False
1782 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001783 with socket.create_server(("127.0.0.1", 0)) as s:
1784 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001785 c.setblocking(False)
1786 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001787 with self.assertRaises(ssl.SSLWantReadError) as cm:
1788 c.do_handshake()
1789 s = str(cm.exception)
1790 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1791 # For compatibility
1792 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1793
1794
Christian Heimes61d478c2018-01-27 15:51:38 +01001795 def test_bad_server_hostname(self):
1796 ctx = ssl.create_default_context()
1797 with self.assertRaises(ValueError):
1798 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1799 server_hostname="")
1800 with self.assertRaises(ValueError):
1801 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1802 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001803 with self.assertRaises(TypeError):
1804 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1805 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001806
1807
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001808class MemoryBIOTests(unittest.TestCase):
1809
1810 def test_read_write(self):
1811 bio = ssl.MemoryBIO()
1812 bio.write(b'foo')
1813 self.assertEqual(bio.read(), b'foo')
1814 self.assertEqual(bio.read(), b'')
1815 bio.write(b'foo')
1816 bio.write(b'bar')
1817 self.assertEqual(bio.read(), b'foobar')
1818 self.assertEqual(bio.read(), b'')
1819 bio.write(b'baz')
1820 self.assertEqual(bio.read(2), b'ba')
1821 self.assertEqual(bio.read(1), b'z')
1822 self.assertEqual(bio.read(1), b'')
1823
1824 def test_eof(self):
1825 bio = ssl.MemoryBIO()
1826 self.assertFalse(bio.eof)
1827 self.assertEqual(bio.read(), b'')
1828 self.assertFalse(bio.eof)
1829 bio.write(b'foo')
1830 self.assertFalse(bio.eof)
1831 bio.write_eof()
1832 self.assertFalse(bio.eof)
1833 self.assertEqual(bio.read(2), b'fo')
1834 self.assertFalse(bio.eof)
1835 self.assertEqual(bio.read(1), b'o')
1836 self.assertTrue(bio.eof)
1837 self.assertEqual(bio.read(), b'')
1838 self.assertTrue(bio.eof)
1839
1840 def test_pending(self):
1841 bio = ssl.MemoryBIO()
1842 self.assertEqual(bio.pending, 0)
1843 bio.write(b'foo')
1844 self.assertEqual(bio.pending, 3)
1845 for i in range(3):
1846 bio.read(1)
1847 self.assertEqual(bio.pending, 3-i-1)
1848 for i in range(3):
1849 bio.write(b'x')
1850 self.assertEqual(bio.pending, i+1)
1851 bio.read()
1852 self.assertEqual(bio.pending, 0)
1853
1854 def test_buffer_types(self):
1855 bio = ssl.MemoryBIO()
1856 bio.write(b'foo')
1857 self.assertEqual(bio.read(), b'foo')
1858 bio.write(bytearray(b'bar'))
1859 self.assertEqual(bio.read(), b'bar')
1860 bio.write(memoryview(b'baz'))
1861 self.assertEqual(bio.read(), b'baz')
1862
1863 def test_error_types(self):
1864 bio = ssl.MemoryBIO()
1865 self.assertRaises(TypeError, bio.write, 'foo')
1866 self.assertRaises(TypeError, bio.write, None)
1867 self.assertRaises(TypeError, bio.write, True)
1868 self.assertRaises(TypeError, bio.write, 1)
1869
1870
Christian Heimes9d50ab52018-02-27 10:17:30 +01001871class SSLObjectTests(unittest.TestCase):
1872 def test_private_init(self):
1873 bio = ssl.MemoryBIO()
1874 with self.assertRaisesRegex(TypeError, "public constructor"):
1875 ssl.SSLObject(bio, bio)
1876
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001877 def test_unwrap(self):
1878 client_ctx, server_ctx, hostname = testing_context()
1879 c_in = ssl.MemoryBIO()
1880 c_out = ssl.MemoryBIO()
1881 s_in = ssl.MemoryBIO()
1882 s_out = ssl.MemoryBIO()
1883 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1884 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1885
1886 # Loop on the handshake for a bit to get it settled
1887 for _ in range(5):
1888 try:
1889 client.do_handshake()
1890 except ssl.SSLWantReadError:
1891 pass
1892 if c_out.pending:
1893 s_in.write(c_out.read())
1894 try:
1895 server.do_handshake()
1896 except ssl.SSLWantReadError:
1897 pass
1898 if s_out.pending:
1899 c_in.write(s_out.read())
1900 # Now the handshakes should be complete (don't raise WantReadError)
1901 client.do_handshake()
1902 server.do_handshake()
1903
1904 # Now if we unwrap one side unilaterally, it should send close-notify
1905 # and raise WantReadError:
1906 with self.assertRaises(ssl.SSLWantReadError):
1907 client.unwrap()
1908
1909 # But server.unwrap() does not raise, because it reads the client's
1910 # close-notify:
1911 s_in.write(c_out.read())
1912 server.unwrap()
1913
1914 # And now that the client gets the server's close-notify, it doesn't
1915 # raise either.
1916 c_in.write(s_out.read())
1917 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001918
Martin Panter3840b2a2016-03-27 01:53:46 +00001919class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001920 """Tests that connect to a simple server running in the background"""
1921
1922 def setUp(self):
1923 server = ThreadedEchoServer(SIGNED_CERTFILE)
1924 self.server_addr = (HOST, server.port)
1925 server.__enter__()
1926 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001927
Antoine Pitrou480a1242010-04-28 21:37:09 +00001928 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001929 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001930 cert_reqs=ssl.CERT_NONE) as s:
1931 s.connect(self.server_addr)
1932 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001933 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001934
Martin Panter3840b2a2016-03-27 01:53:46 +00001935 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001936 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001937 cert_reqs=ssl.CERT_REQUIRED,
1938 ca_certs=SIGNING_CA) as s:
1939 s.connect(self.server_addr)
1940 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001941 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001942
Martin Panter3840b2a2016-03-27 01:53:46 +00001943 def test_connect_fail(self):
1944 # This should fail because we have no verification certs. Connection
1945 # failure crashes ThreadedEchoServer, so run this in an independent
1946 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001947 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 cert_reqs=ssl.CERT_REQUIRED)
1949 self.addCleanup(s.close)
1950 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1951 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001952
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001953 def test_connect_ex(self):
1954 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001955 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001956 cert_reqs=ssl.CERT_REQUIRED,
1957 ca_certs=SIGNING_CA)
1958 self.addCleanup(s.close)
1959 self.assertEqual(0, s.connect_ex(self.server_addr))
1960 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001961
1962 def test_non_blocking_connect_ex(self):
1963 # Issue #11326: non-blocking connect_ex() should allow handshake
1964 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001965 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 cert_reqs=ssl.CERT_REQUIRED,
1967 ca_certs=SIGNING_CA,
1968 do_handshake_on_connect=False)
1969 self.addCleanup(s.close)
1970 s.setblocking(False)
1971 rc = s.connect_ex(self.server_addr)
1972 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1973 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1974 # Wait for connect to finish
1975 select.select([], [s], [], 5.0)
1976 # Non-blocking handshake
1977 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001978 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001979 s.do_handshake()
1980 break
1981 except ssl.SSLWantReadError:
1982 select.select([s], [], [], 5.0)
1983 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001984 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 # SSL established
1986 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001987
Antoine Pitrou152efa22010-05-16 18:19:27 +00001988 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001989 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001990 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1992 s.connect(self.server_addr)
1993 self.assertEqual({}, s.getpeercert())
1994 # Same with a server hostname
1995 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1996 server_hostname="dummy") as s:
1997 s.connect(self.server_addr)
1998 ctx.verify_mode = ssl.CERT_REQUIRED
1999 # This should succeed because we specify the root cert
2000 ctx.load_verify_locations(SIGNING_CA)
2001 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2002 s.connect(self.server_addr)
2003 cert = s.getpeercert()
2004 self.assertTrue(cert)
2005
2006 def test_connect_with_context_fail(self):
2007 # This should fail because we have no verification certs. Connection
2008 # failure crashes ThreadedEchoServer, so run this in an independent
2009 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002010 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002011 ctx.verify_mode = ssl.CERT_REQUIRED
2012 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2013 self.addCleanup(s.close)
2014 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2015 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002016
2017 def test_connect_capath(self):
2018 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002019 # NOTE: the subject hashing algorithm has been changed between
2020 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2021 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002022 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002024 ctx.verify_mode = ssl.CERT_REQUIRED
2025 ctx.load_verify_locations(capath=CAPATH)
2026 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2027 s.connect(self.server_addr)
2028 cert = s.getpeercert()
2029 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002030
Martin Panter3840b2a2016-03-27 01:53:46 +00002031 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002032 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002033 ctx.verify_mode = ssl.CERT_REQUIRED
2034 ctx.load_verify_locations(capath=BYTES_CAPATH)
2035 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2036 s.connect(self.server_addr)
2037 cert = s.getpeercert()
2038 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002039
Christian Heimesefff7062013-11-21 03:35:02 +01002040 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002041 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002042 pem = f.read()
2043 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002045 ctx.verify_mode = ssl.CERT_REQUIRED
2046 ctx.load_verify_locations(cadata=pem)
2047 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2048 s.connect(self.server_addr)
2049 cert = s.getpeercert()
2050 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002051
Martin Panter3840b2a2016-03-27 01:53:46 +00002052 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002053 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002054 ctx.verify_mode = ssl.CERT_REQUIRED
2055 ctx.load_verify_locations(cadata=der)
2056 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2057 s.connect(self.server_addr)
2058 cert = s.getpeercert()
2059 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002060
Antoine Pitroue3220242010-04-24 11:13:53 +00002061 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2062 def test_makefile_close(self):
2063 # Issue #5238: creating a file-like object with makefile() shouldn't
2064 # delay closing the underlying "real socket" (here tested with its
2065 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002066 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002067 ss.connect(self.server_addr)
2068 fd = ss.fileno()
2069 f = ss.makefile()
2070 f.close()
2071 # The fd is still open
2072 os.read(fd, 0)
2073 # Closing the SSL socket should close the fd too
2074 ss.close()
2075 gc.collect()
2076 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002077 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002078 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002079
Antoine Pitrou480a1242010-04-28 21:37:09 +00002080 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002081 s = socket.socket(socket.AF_INET)
2082 s.connect(self.server_addr)
2083 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002084 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002085 cert_reqs=ssl.CERT_NONE,
2086 do_handshake_on_connect=False)
2087 self.addCleanup(s.close)
2088 count = 0
2089 while True:
2090 try:
2091 count += 1
2092 s.do_handshake()
2093 break
2094 except ssl.SSLWantReadError:
2095 select.select([s], [], [])
2096 except ssl.SSLWantWriteError:
2097 select.select([], [s], [])
2098 if support.verbose:
2099 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002100
Antoine Pitrou480a1242010-04-28 21:37:09 +00002101 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002102 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002103
Martin Panter3840b2a2016-03-27 01:53:46 +00002104 def test_get_server_certificate_fail(self):
2105 # Connection failure crashes ThreadedEchoServer, so run this in an
2106 # independent test method
2107 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002108
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002109 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002110 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002111 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2112 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002113 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002114 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2115 s.connect(self.server_addr)
2116 # Error checking can happen at instantiation or when connecting
2117 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2118 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002119 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002120 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2121 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002122
Christian Heimes9a5395a2013-06-17 15:44:12 +02002123 def test_get_ca_certs_capath(self):
2124 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002125 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002126 ctx.load_verify_locations(capath=CAPATH)
2127 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002128 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2129 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002130 s.connect(self.server_addr)
2131 cert = s.getpeercert()
2132 self.assertTrue(cert)
2133 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002134
Christian Heimes575596e2013-12-15 21:49:17 +01002135 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002136 def test_context_setget(self):
2137 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002138 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2139 ctx1.load_verify_locations(capath=CAPATH)
2140 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2141 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002142 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002143 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002144 ss.connect(self.server_addr)
2145 self.assertIs(ss.context, ctx1)
2146 self.assertIs(ss._sslobj.context, ctx1)
2147 ss.context = ctx2
2148 self.assertIs(ss.context, ctx2)
2149 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002150
2151 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2152 # A simple IO loop. Call func(*args) depending on the error we get
2153 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2154 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002155 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002156 count = 0
2157 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002158 if time.monotonic() > deadline:
2159 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002160 errno = None
2161 count += 1
2162 try:
2163 ret = func(*args)
2164 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002165 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002166 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002167 raise
2168 errno = e.errno
2169 # Get any data from the outgoing BIO irrespective of any error, and
2170 # send it to the socket.
2171 buf = outgoing.read()
2172 sock.sendall(buf)
2173 # If there's no error, we're done. For WANT_READ, we need to get
2174 # data from the socket and put it in the incoming BIO.
2175 if errno is None:
2176 break
2177 elif errno == ssl.SSL_ERROR_WANT_READ:
2178 buf = sock.recv(32768)
2179 if buf:
2180 incoming.write(buf)
2181 else:
2182 incoming.write_eof()
2183 if support.verbose:
2184 sys.stdout.write("Needed %d calls to complete %s().\n"
2185 % (count, func.__name__))
2186 return ret
2187
Martin Panter3840b2a2016-03-27 01:53:46 +00002188 def test_bio_handshake(self):
2189 sock = socket.socket(socket.AF_INET)
2190 self.addCleanup(sock.close)
2191 sock.connect(self.server_addr)
2192 incoming = ssl.MemoryBIO()
2193 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002194 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2195 self.assertTrue(ctx.check_hostname)
2196 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002197 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002198 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2199 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002200 self.assertIs(sslobj._sslobj.owner, sslobj)
2201 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002202 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002203 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002204 self.assertRaises(ValueError, sslobj.getpeercert)
2205 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2206 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2207 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2208 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002209 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002210 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002211 self.assertTrue(sslobj.getpeercert())
2212 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2213 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2214 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002215 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002216 except ssl.SSLSyscallError:
2217 # If the server shuts down the TCP connection without sending a
2218 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2219 pass
2220 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2221
2222 def test_bio_read_write_data(self):
2223 sock = socket.socket(socket.AF_INET)
2224 self.addCleanup(sock.close)
2225 sock.connect(self.server_addr)
2226 incoming = ssl.MemoryBIO()
2227 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002228 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002229 ctx.verify_mode = ssl.CERT_NONE
2230 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2231 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2232 req = b'FOO\n'
2233 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2234 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2235 self.assertEqual(buf, b'foo\n')
2236 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002237
2238
Martin Panter3840b2a2016-03-27 01:53:46 +00002239class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002240
Martin Panter3840b2a2016-03-27 01:53:46 +00002241 def test_timeout_connect_ex(self):
2242 # Issue #12065: on a timeout, connect_ex() should return the original
2243 # errno (mimicking the behaviour of non-SSL sockets).
2244 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002245 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002246 cert_reqs=ssl.CERT_REQUIRED,
2247 do_handshake_on_connect=False)
2248 self.addCleanup(s.close)
2249 s.settimeout(0.0000001)
2250 rc = s.connect_ex((REMOTE_HOST, 443))
2251 if rc == 0:
2252 self.skipTest("REMOTE_HOST responded too quickly")
2253 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2254
2255 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2256 def test_get_server_certificate_ipv6(self):
2257 with support.transient_internet('ipv6.google.com'):
2258 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2259 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2260
Martin Panter3840b2a2016-03-27 01:53:46 +00002261
2262def _test_get_server_certificate(test, host, port, cert=None):
2263 pem = ssl.get_server_certificate((host, port))
2264 if not pem:
2265 test.fail("No server certificate on %s:%s!" % (host, port))
2266
2267 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2268 if not pem:
2269 test.fail("No server certificate on %s:%s!" % (host, port))
2270 if support.verbose:
2271 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2272
2273def _test_get_server_certificate_fail(test, host, port):
2274 try:
2275 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2276 except ssl.SSLError as x:
2277 #should fail
2278 if support.verbose:
2279 sys.stdout.write("%s\n" % x)
2280 else:
2281 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2282
2283
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002284from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002285
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002286class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002288 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002289
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002290 """A mildly complicated class, because we want it to work both
2291 with and without the SSL wrapper around the socket connection, so
2292 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002293
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 def __init__(self, server, connsock, addr):
2295 self.server = server
2296 self.running = False
2297 self.sock = connsock
2298 self.addr = addr
2299 self.sock.setblocking(1)
2300 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002301 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002302 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002304 def wrap_conn(self):
2305 try:
2306 self.sslconn = self.server.context.wrap_socket(
2307 self.sock, server_side=True)
2308 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2309 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002310 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002311 # We treat ConnectionResetError as though it were an
2312 # SSLError - OpenSSL on Ubuntu abruptly closes the
2313 # connection when asked to use an unsupported protocol.
2314 #
Christian Heimes529525f2018-05-23 22:24:45 +02002315 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2316 # tries to send session tickets after handshake.
2317 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002318 #
2319 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2320 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002321 self.server.conn_errors.append(str(e))
2322 if self.server.chatty:
2323 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2324 self.running = False
2325 self.close()
2326 return False
2327 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002328 # OSError may occur with wrong protocols, e.g. both
2329 # sides use PROTOCOL_TLS_SERVER.
2330 #
2331 # XXX Various errors can have happened here, for example
2332 # a mismatching protocol version, an invalid certificate,
2333 # or a low-level bug. This should be made more discriminating.
2334 #
2335 # bpo-31323: Store the exception as string to prevent
2336 # a reference leak: server -> conn_errors -> exception
2337 # -> traceback -> self (ConnectionHandler) -> server
2338 self.server.conn_errors.append(str(e))
2339 if self.server.chatty:
2340 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2341 self.running = False
2342 self.server.stop()
2343 self.close()
2344 return False
2345 else:
2346 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2347 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2348 cert = self.sslconn.getpeercert()
2349 if support.verbose and self.server.chatty:
2350 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2351 cert_binary = self.sslconn.getpeercert(True)
2352 if support.verbose and self.server.chatty:
2353 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2354 cipher = self.sslconn.cipher()
2355 if support.verbose and self.server.chatty:
2356 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2357 sys.stdout.write(" server: selected protocol is now "
2358 + str(self.sslconn.selected_npn_protocol()) + "\n")
2359 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002360
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002361 def read(self):
2362 if self.sslconn:
2363 return self.sslconn.read()
2364 else:
2365 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002366
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002367 def write(self, bytes):
2368 if self.sslconn:
2369 return self.sslconn.write(bytes)
2370 else:
2371 return self.sock.send(bytes)
2372
2373 def close(self):
2374 if self.sslconn:
2375 self.sslconn.close()
2376 else:
2377 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002378
Antoine Pitrou480a1242010-04-28 21:37:09 +00002379 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002380 self.running = True
2381 if not self.server.starttls_server:
2382 if not self.wrap_conn():
2383 return
2384 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002385 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002386 msg = self.read()
2387 stripped = msg.strip()
2388 if not stripped:
2389 # eof, so quit this handler
2390 self.running = False
2391 try:
2392 self.sock = self.sslconn.unwrap()
2393 except OSError:
2394 # Many tests shut the TCP connection down
2395 # without an SSL shutdown. This causes
2396 # unwrap() to raise OSError with errno=0!
2397 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002398 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 self.sslconn = None
2400 self.close()
2401 elif stripped == b'over':
2402 if support.verbose and self.server.connectionchatty:
2403 sys.stdout.write(" server: client closed connection\n")
2404 self.close()
2405 return
2406 elif (self.server.starttls_server and
2407 stripped == b'STARTTLS'):
2408 if support.verbose and self.server.connectionchatty:
2409 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2410 self.write(b"OK\n")
2411 if not self.wrap_conn():
2412 return
2413 elif (self.server.starttls_server and self.sslconn
2414 and stripped == b'ENDTLS'):
2415 if support.verbose and self.server.connectionchatty:
2416 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2417 self.write(b"OK\n")
2418 self.sock = self.sslconn.unwrap()
2419 self.sslconn = None
2420 if support.verbose and self.server.connectionchatty:
2421 sys.stdout.write(" server: connection is now unencrypted...\n")
2422 elif stripped == b'CB tls-unique':
2423 if support.verbose and self.server.connectionchatty:
2424 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2425 data = self.sslconn.get_channel_binding("tls-unique")
2426 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002427 elif stripped == b'PHA':
2428 if support.verbose and self.server.connectionchatty:
2429 sys.stdout.write(" server: initiating post handshake auth\n")
2430 try:
2431 self.sslconn.verify_client_post_handshake()
2432 except ssl.SSLError as e:
2433 self.write(repr(e).encode("us-ascii") + b"\n")
2434 else:
2435 self.write(b"OK\n")
2436 elif stripped == b'HASCERT':
2437 if self.sslconn.getpeercert() is not None:
2438 self.write(b'TRUE\n')
2439 else:
2440 self.write(b'FALSE\n')
2441 elif stripped == b'GETCERT':
2442 cert = self.sslconn.getpeercert()
2443 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002444 else:
2445 if (support.verbose and
2446 self.server.connectionchatty):
2447 ctype = (self.sslconn and "encrypted") or "unencrypted"
2448 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2449 % (msg, ctype, msg.lower(), ctype))
2450 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002451 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002452 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2453 # when connection is not shut down gracefully.
2454 if self.server.chatty and support.verbose:
2455 sys.stdout.write(
2456 " Connection reset by peer: {}\n".format(
2457 self.addr)
2458 )
2459 self.close()
2460 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002461 except ssl.SSLError as err:
2462 # On Windows sometimes test_pha_required_nocert receives the
2463 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2464 # before the 'tlsv13 alert certificate required' exception.
2465 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2466 # is received test_pha_required_nocert fails with ConnectionResetError
2467 # because the underlying socket is closed
2468 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2469 if self.server.chatty and support.verbose:
2470 sys.stdout.write(err.args[1])
2471 # test_pha_required_nocert is expecting this exception
2472 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002473 except OSError:
2474 if self.server.chatty:
2475 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002476 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002477 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002478
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 # normally, we'd just stop here, but for the test
2480 # harness, we want to stop the server
2481 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002482
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 def __init__(self, certificate=None, ssl_version=None,
2484 certreqs=None, cacerts=None,
2485 chatty=True, connectionchatty=False, starttls_server=False,
2486 npn_protocols=None, alpn_protocols=None,
2487 ciphers=None, context=None):
2488 if context:
2489 self.context = context
2490 else:
2491 self.context = ssl.SSLContext(ssl_version
2492 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002493 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002494 self.context.verify_mode = (certreqs if certreqs is not None
2495 else ssl.CERT_NONE)
2496 if cacerts:
2497 self.context.load_verify_locations(cacerts)
2498 if certificate:
2499 self.context.load_cert_chain(certificate)
2500 if npn_protocols:
2501 self.context.set_npn_protocols(npn_protocols)
2502 if alpn_protocols:
2503 self.context.set_alpn_protocols(alpn_protocols)
2504 if ciphers:
2505 self.context.set_ciphers(ciphers)
2506 self.chatty = chatty
2507 self.connectionchatty = connectionchatty
2508 self.starttls_server = starttls_server
2509 self.sock = socket.socket()
2510 self.port = support.bind_port(self.sock)
2511 self.flag = None
2512 self.active = False
2513 self.selected_npn_protocols = []
2514 self.selected_alpn_protocols = []
2515 self.shared_ciphers = []
2516 self.conn_errors = []
2517 threading.Thread.__init__(self)
2518 self.daemon = True
2519
2520 def __enter__(self):
2521 self.start(threading.Event())
2522 self.flag.wait()
2523 return self
2524
2525 def __exit__(self, *args):
2526 self.stop()
2527 self.join()
2528
2529 def start(self, flag=None):
2530 self.flag = flag
2531 threading.Thread.start(self)
2532
2533 def run(self):
2534 self.sock.settimeout(0.05)
2535 self.sock.listen()
2536 self.active = True
2537 if self.flag:
2538 # signal an event
2539 self.flag.set()
2540 while self.active:
2541 try:
2542 newconn, connaddr = self.sock.accept()
2543 if support.verbose and self.chatty:
2544 sys.stdout.write(' server: new connection from '
2545 + repr(connaddr) + '\n')
2546 handler = self.ConnectionHandler(self, newconn, connaddr)
2547 handler.start()
2548 handler.join()
2549 except socket.timeout:
2550 pass
2551 except KeyboardInterrupt:
2552 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002553 except BaseException as e:
2554 if support.verbose and self.chatty:
2555 sys.stdout.write(
2556 ' connection handling failed: ' + repr(e) + '\n')
2557
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002558 self.sock.close()
2559
2560 def stop(self):
2561 self.active = False
2562
2563class AsyncoreEchoServer(threading.Thread):
2564
2565 # this one's based on asyncore.dispatcher
2566
2567 class EchoServer (asyncore.dispatcher):
2568
2569 class ConnectionHandler(asyncore.dispatcher_with_send):
2570
2571 def __init__(self, conn, certfile):
2572 self.socket = test_wrap_socket(conn, server_side=True,
2573 certfile=certfile,
2574 do_handshake_on_connect=False)
2575 asyncore.dispatcher_with_send.__init__(self, self.socket)
2576 self._ssl_accepting = True
2577 self._do_ssl_handshake()
2578
2579 def readable(self):
2580 if isinstance(self.socket, ssl.SSLSocket):
2581 while self.socket.pending() > 0:
2582 self.handle_read_event()
2583 return True
2584
2585 def _do_ssl_handshake(self):
2586 try:
2587 self.socket.do_handshake()
2588 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2589 return
2590 except ssl.SSLEOFError:
2591 return self.handle_close()
2592 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002593 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 except OSError as err:
2595 if err.args[0] == errno.ECONNABORTED:
2596 return self.handle_close()
2597 else:
2598 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002599
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002600 def handle_read(self):
2601 if self._ssl_accepting:
2602 self._do_ssl_handshake()
2603 else:
2604 data = self.recv(1024)
2605 if support.verbose:
2606 sys.stdout.write(" server: read %s from client\n" % repr(data))
2607 if not data:
2608 self.close()
2609 else:
2610 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002611
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 def handle_close(self):
2613 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002614 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002615 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002616
2617 def handle_error(self):
2618 raise
2619
Trent Nelson78520002008-04-10 20:54:35 +00002620 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002621 self.certfile = certfile
2622 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2623 self.port = support.bind_port(sock, '')
2624 asyncore.dispatcher.__init__(self, sock)
2625 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002626
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002627 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002628 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002629 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2630 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002631
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002632 def handle_error(self):
2633 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002634
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002635 def __init__(self, certfile):
2636 self.flag = None
2637 self.active = False
2638 self.server = self.EchoServer(certfile)
2639 self.port = self.server.port
2640 threading.Thread.__init__(self)
2641 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002642
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002643 def __str__(self):
2644 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002645
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002646 def __enter__(self):
2647 self.start(threading.Event())
2648 self.flag.wait()
2649 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002650
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002652 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002653 sys.stdout.write(" cleanup: stopping server.\n")
2654 self.stop()
2655 if support.verbose:
2656 sys.stdout.write(" cleanup: joining server thread.\n")
2657 self.join()
2658 if support.verbose:
2659 sys.stdout.write(" cleanup: successfully joined.\n")
2660 # make sure that ConnectionHandler is removed from socket_map
2661 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002662
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002663 def start (self, flag=None):
2664 self.flag = flag
2665 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002666
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002667 def run(self):
2668 self.active = True
2669 if self.flag:
2670 self.flag.set()
2671 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002672 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002673 asyncore.loop(1)
2674 except:
2675 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002676
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002677 def stop(self):
2678 self.active = False
2679 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002680
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002681def server_params_test(client_context, server_context, indata=b"FOO\n",
2682 chatty=True, connectionchatty=False, sni_name=None,
2683 session=None):
2684 """
2685 Launch a server, connect a client to it and try various reads
2686 and writes.
2687 """
2688 stats = {}
2689 server = ThreadedEchoServer(context=server_context,
2690 chatty=chatty,
2691 connectionchatty=False)
2692 with server:
2693 with client_context.wrap_socket(socket.socket(),
2694 server_hostname=sni_name, session=session) as s:
2695 s.connect((HOST, server.port))
2696 for arg in [indata, bytearray(indata), memoryview(indata)]:
2697 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002698 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002699 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002700 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002702 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002703 if connectionchatty:
2704 if support.verbose:
2705 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002706 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002707 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002708 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2709 % (outdata[:20], len(outdata),
2710 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002711 s.write(b"over\n")
2712 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002713 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002714 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002715 stats.update({
2716 'compression': s.compression(),
2717 'cipher': s.cipher(),
2718 'peercert': s.getpeercert(),
2719 'client_alpn_protocol': s.selected_alpn_protocol(),
2720 'client_npn_protocol': s.selected_npn_protocol(),
2721 'version': s.version(),
2722 'session_reused': s.session_reused,
2723 'session': s.session,
2724 })
2725 s.close()
2726 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2727 stats['server_npn_protocols'] = server.selected_npn_protocols
2728 stats['server_shared_ciphers'] = server.shared_ciphers
2729 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002730
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002731def try_protocol_combo(server_protocol, client_protocol, expect_success,
2732 certsreqs=None, server_options=0, client_options=0):
2733 """
2734 Try to SSL-connect using *client_protocol* to *server_protocol*.
2735 If *expect_success* is true, assert that the connection succeeds,
2736 if it's false, assert that the connection fails.
2737 Also, if *expect_success* is a string, assert that it is the protocol
2738 version actually used by the connection.
2739 """
2740 if certsreqs is None:
2741 certsreqs = ssl.CERT_NONE
2742 certtype = {
2743 ssl.CERT_NONE: "CERT_NONE",
2744 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2745 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2746 }[certsreqs]
2747 if support.verbose:
2748 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2749 sys.stdout.write(formatstr %
2750 (ssl.get_protocol_name(client_protocol),
2751 ssl.get_protocol_name(server_protocol),
2752 certtype))
2753 client_context = ssl.SSLContext(client_protocol)
2754 client_context.options |= client_options
2755 server_context = ssl.SSLContext(server_protocol)
2756 server_context.options |= server_options
2757
Victor Stinner3ef63442019-02-19 18:06:03 +01002758 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2759 if (min_version is not None
2760 # SSLContext.minimum_version is only available on recent OpenSSL
2761 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2762 and hasattr(server_context, 'minimum_version')
2763 and server_protocol == ssl.PROTOCOL_TLS
2764 and server_context.minimum_version > min_version):
2765 # If OpenSSL configuration is strict and requires more recent TLS
2766 # version, we have to change the minimum to test old TLS versions.
2767 server_context.minimum_version = min_version
2768
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2770 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2771 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002772 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002773 client_context.set_ciphers("ALL")
2774
2775 for ctx in (client_context, server_context):
2776 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002777 ctx.load_cert_chain(SIGNED_CERTFILE)
2778 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002779 try:
2780 stats = server_params_test(client_context, server_context,
2781 chatty=False, connectionchatty=False)
2782 # Protocol mismatch can result in either an SSLError, or a
2783 # "Connection reset by peer" error.
2784 except ssl.SSLError:
2785 if expect_success:
2786 raise
2787 except OSError as e:
2788 if expect_success or e.errno != errno.ECONNRESET:
2789 raise
2790 else:
2791 if not expect_success:
2792 raise AssertionError(
2793 "Client protocol %s succeeded with server protocol %s!"
2794 % (ssl.get_protocol_name(client_protocol),
2795 ssl.get_protocol_name(server_protocol)))
2796 elif (expect_success is not True
2797 and expect_success != stats['version']):
2798 raise AssertionError("version mismatch: expected %r, got %r"
2799 % (expect_success, stats['version']))
2800
2801
2802class ThreadedTests(unittest.TestCase):
2803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 def test_echo(self):
2805 """Basic test of an SSL client connecting to a server"""
2806 if support.verbose:
2807 sys.stdout.write("\n")
2808 for protocol in PROTOCOLS:
2809 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2810 continue
Christian Heimes19311322019-09-26 22:53:09 +02002811 if not has_tls_protocol(protocol):
2812 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002813 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2814 context = ssl.SSLContext(protocol)
2815 context.load_cert_chain(CERTFILE)
2816 server_params_test(context, context,
2817 chatty=True, connectionchatty=True)
2818
Christian Heimesa170fa12017-09-15 20:27:30 +02002819 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002820
2821 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2822 server_params_test(client_context=client_context,
2823 server_context=server_context,
2824 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002825 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002826
2827 client_context.check_hostname = False
2828 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2829 with self.assertRaises(ssl.SSLError) as e:
2830 server_params_test(client_context=server_context,
2831 server_context=client_context,
2832 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002833 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002834 self.assertIn('called a function you should not call',
2835 str(e.exception))
2836
2837 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2838 with self.assertRaises(ssl.SSLError) as e:
2839 server_params_test(client_context=server_context,
2840 server_context=server_context,
2841 chatty=True, connectionchatty=True)
2842 self.assertIn('called a function you should not call',
2843 str(e.exception))
2844
2845 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2846 with self.assertRaises(ssl.SSLError) as e:
2847 server_params_test(client_context=server_context,
2848 server_context=client_context,
2849 chatty=True, connectionchatty=True)
2850 self.assertIn('called a function you should not call',
2851 str(e.exception))
2852
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 def test_getpeercert(self):
2854 if support.verbose:
2855 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002856
2857 client_context, server_context, hostname = testing_context()
2858 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002859 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002860 with client_context.wrap_socket(socket.socket(),
2861 do_handshake_on_connect=False,
2862 server_hostname=hostname) as s:
2863 s.connect((HOST, server.port))
2864 # getpeercert() raise ValueError while the handshake isn't
2865 # done.
2866 with self.assertRaises(ValueError):
2867 s.getpeercert()
2868 s.do_handshake()
2869 cert = s.getpeercert()
2870 self.assertTrue(cert, "Can't get peer certificate.")
2871 cipher = s.cipher()
2872 if support.verbose:
2873 sys.stdout.write(pprint.pformat(cert) + '\n')
2874 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2875 if 'subject' not in cert:
2876 self.fail("No subject field in certificate: %s." %
2877 pprint.pformat(cert))
2878 if ((('organizationName', 'Python Software Foundation'),)
2879 not in cert['subject']):
2880 self.fail(
2881 "Missing or invalid 'organizationName' field in certificate subject; "
2882 "should be 'Python Software Foundation'.")
2883 self.assertIn('notBefore', cert)
2884 self.assertIn('notAfter', cert)
2885 before = ssl.cert_time_to_seconds(cert['notBefore'])
2886 after = ssl.cert_time_to_seconds(cert['notAfter'])
2887 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002888
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002889 @unittest.skipUnless(have_verify_flags(),
2890 "verify_flags need OpenSSL > 0.9.8")
2891 def test_crl_check(self):
2892 if support.verbose:
2893 sys.stdout.write("\n")
2894
Christian Heimesa170fa12017-09-15 20:27:30 +02002895 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002896
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002897 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002898 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002899
2900 # VERIFY_DEFAULT should pass
2901 server = ThreadedEchoServer(context=server_context, chatty=True)
2902 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002903 with client_context.wrap_socket(socket.socket(),
2904 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002905 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906 cert = s.getpeercert()
2907 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002909 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002910 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002911
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002912 server = ThreadedEchoServer(context=server_context, chatty=True)
2913 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002914 with client_context.wrap_socket(socket.socket(),
2915 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002916 with self.assertRaisesRegex(ssl.SSLError,
2917 "certificate verify failed"):
2918 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002919
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002920 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002921 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002923 server = ThreadedEchoServer(context=server_context, chatty=True)
2924 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002925 with client_context.wrap_socket(socket.socket(),
2926 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002927 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002928 cert = s.getpeercert()
2929 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002930
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 def test_check_hostname(self):
2932 if support.verbose:
2933 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002934
Christian Heimesa170fa12017-09-15 20:27:30 +02002935 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002936
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 # correct hostname should verify
2938 server = ThreadedEchoServer(context=server_context, chatty=True)
2939 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002940 with client_context.wrap_socket(socket.socket(),
2941 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002942 s.connect((HOST, server.port))
2943 cert = s.getpeercert()
2944 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002945
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 # incorrect hostname should raise an exception
2947 server = ThreadedEchoServer(context=server_context, chatty=True)
2948 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002949 with client_context.wrap_socket(socket.socket(),
2950 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002951 with self.assertRaisesRegex(
2952 ssl.CertificateError,
2953 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002954 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002955
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002956 # missing server_hostname arg should cause an exception, too
2957 server = ThreadedEchoServer(context=server_context, chatty=True)
2958 with server:
2959 with socket.socket() as s:
2960 with self.assertRaisesRegex(ValueError,
2961 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002962 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002963
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002964 def test_ecc_cert(self):
2965 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2966 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002967 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002968 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2969
2970 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2971 # load ECC cert
2972 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2973
2974 # correct hostname should verify
2975 server = ThreadedEchoServer(context=server_context, chatty=True)
2976 with server:
2977 with client_context.wrap_socket(socket.socket(),
2978 server_hostname=hostname) as s:
2979 s.connect((HOST, server.port))
2980 cert = s.getpeercert()
2981 self.assertTrue(cert, "Can't get peer certificate.")
2982 cipher = s.cipher()[0].split('-')
2983 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2984
2985 def test_dual_rsa_ecc(self):
2986 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2987 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002988 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2989 # algorithms.
2990 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002991 # only ECDSA certs
2992 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2993 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2994
2995 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2996 # load ECC and RSA key/cert pairs
2997 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2998 server_context.load_cert_chain(SIGNED_CERTFILE)
2999
3000 # correct hostname should verify
3001 server = ThreadedEchoServer(context=server_context, chatty=True)
3002 with server:
3003 with client_context.wrap_socket(socket.socket(),
3004 server_hostname=hostname) as s:
3005 s.connect((HOST, server.port))
3006 cert = s.getpeercert()
3007 self.assertTrue(cert, "Can't get peer certificate.")
3008 cipher = s.cipher()[0].split('-')
3009 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3010
Christian Heimes66e57422018-01-29 14:25:13 +01003011 def test_check_hostname_idn(self):
3012 if support.verbose:
3013 sys.stdout.write("\n")
3014
Christian Heimes11a14932018-02-24 02:35:08 +01003015 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003016 server_context.load_cert_chain(IDNSANSFILE)
3017
Christian Heimes11a14932018-02-24 02:35:08 +01003018 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003019 context.verify_mode = ssl.CERT_REQUIRED
3020 context.check_hostname = True
3021 context.load_verify_locations(SIGNING_CA)
3022
3023 # correct hostname should verify, when specified in several
3024 # different ways
3025 idn_hostnames = [
3026 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003027 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003028 ('xn--knig-5qa.idn.pythontest.net',
3029 'xn--knig-5qa.idn.pythontest.net'),
3030 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003031 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003032
3033 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003034 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003035 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3036 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3037 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003038 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3039
3040 # ('königsgäßchen.idna2008.pythontest.net',
3041 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3042 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3043 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3044 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3045 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3046
Christian Heimes66e57422018-01-29 14:25:13 +01003047 ]
3048 for server_hostname, expected_hostname in idn_hostnames:
3049 server = ThreadedEchoServer(context=server_context, chatty=True)
3050 with server:
3051 with context.wrap_socket(socket.socket(),
3052 server_hostname=server_hostname) as s:
3053 self.assertEqual(s.server_hostname, expected_hostname)
3054 s.connect((HOST, server.port))
3055 cert = s.getpeercert()
3056 self.assertEqual(s.server_hostname, expected_hostname)
3057 self.assertTrue(cert, "Can't get peer certificate.")
3058
Christian Heimes66e57422018-01-29 14:25:13 +01003059 # incorrect hostname should raise an exception
3060 server = ThreadedEchoServer(context=server_context, chatty=True)
3061 with server:
3062 with context.wrap_socket(socket.socket(),
3063 server_hostname="python.example.org") as s:
3064 with self.assertRaises(ssl.CertificateError):
3065 s.connect((HOST, server.port))
3066
Christian Heimes529525f2018-05-23 22:24:45 +02003067 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 """Connecting when the server rejects the client's certificate
3069
3070 Launch a server with CERT_REQUIRED, and check that trying to
3071 connect to it with a wrong client certificate fails.
3072 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003073 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003074 # load client cert that is not signed by trusted CA
3075 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003076 # require TLS client authentication
3077 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003078 # TLS 1.3 has different handshake
3079 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003080
3081 server = ThreadedEchoServer(
3082 context=server_context, chatty=True, connectionchatty=True,
3083 )
3084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003086 client_context.wrap_socket(socket.socket(),
3087 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003088 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 # Expect either an SSL error about the server rejecting
3090 # the connection, or a low-level connection reset (which
3091 # sometimes happens on Windows)
3092 s.connect((HOST, server.port))
3093 except ssl.SSLError as e:
3094 if support.verbose:
3095 sys.stdout.write("\nSSLError is %r\n" % e)
3096 except OSError as e:
3097 if e.errno != errno.ECONNRESET:
3098 raise
3099 if support.verbose:
3100 sys.stdout.write("\nsocket.error is %r\n" % e)
3101 else:
3102 self.fail("Use of invalid cert should have failed!")
3103
Christian Heimes19311322019-09-26 22:53:09 +02003104 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003105 def test_wrong_cert_tls13(self):
3106 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003107 # load client cert that is not signed by trusted CA
3108 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003109 server_context.verify_mode = ssl.CERT_REQUIRED
3110 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3111 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3112
3113 server = ThreadedEchoServer(
3114 context=server_context, chatty=True, connectionchatty=True,
3115 )
3116 with server, \
3117 client_context.wrap_socket(socket.socket(),
3118 server_hostname=hostname) as s:
3119 # TLS 1.3 perform client cert exchange after handshake
3120 s.connect((HOST, server.port))
3121 try:
3122 s.write(b'data')
3123 s.read(4)
3124 except ssl.SSLError as e:
3125 if support.verbose:
3126 sys.stdout.write("\nSSLError is %r\n" % e)
3127 except OSError as e:
3128 if e.errno != errno.ECONNRESET:
3129 raise
3130 if support.verbose:
3131 sys.stdout.write("\nsocket.error is %r\n" % e)
3132 else:
3133 self.fail("Use of invalid cert should have failed!")
3134
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 def test_rude_shutdown(self):
3136 """A brutal shutdown of an SSL server should raise an OSError
3137 in the client when attempting handshake.
3138 """
3139 listener_ready = threading.Event()
3140 listener_gone = threading.Event()
3141
3142 s = socket.socket()
3143 port = support.bind_port(s, HOST)
3144
3145 # `listener` runs in a thread. It sits in an accept() until
3146 # the main thread connects. Then it rudely closes the socket,
3147 # and sets Event `listener_gone` to let the main thread know
3148 # the socket is gone.
3149 def listener():
3150 s.listen()
3151 listener_ready.set()
3152 newsock, addr = s.accept()
3153 newsock.close()
3154 s.close()
3155 listener_gone.set()
3156
3157 def connector():
3158 listener_ready.wait()
3159 with socket.socket() as c:
3160 c.connect((HOST, port))
3161 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003162 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 ssl_sock = test_wrap_socket(c)
3164 except OSError:
3165 pass
3166 else:
3167 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169 t = threading.Thread(target=listener)
3170 t.start()
3171 try:
3172 connector()
3173 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003174 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003175
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003176 def test_ssl_cert_verify_error(self):
3177 if support.verbose:
3178 sys.stdout.write("\n")
3179
3180 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3181 server_context.load_cert_chain(SIGNED_CERTFILE)
3182
3183 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3184
3185 server = ThreadedEchoServer(context=server_context, chatty=True)
3186 with server:
3187 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003188 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003189 try:
3190 s.connect((HOST, server.port))
3191 except ssl.SSLError as e:
3192 msg = 'unable to get local issuer certificate'
3193 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3194 self.assertEqual(e.verify_code, 20)
3195 self.assertEqual(e.verify_message, msg)
3196 self.assertIn(msg, repr(e))
3197 self.assertIn('certificate verify failed', repr(e))
3198
Christian Heimes19311322019-09-26 22:53:09 +02003199 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003200 def test_protocol_sslv2(self):
3201 """Connecting to an SSLv2 server with various client options"""
3202 if support.verbose:
3203 sys.stdout.write("\n")
3204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3205 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3206 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003207 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimes19311322019-09-26 22:53:09 +02003208 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3210 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3211 # SSLv23 client with specific SSL options
3212 if no_sslv2_implies_sslv3_hello():
3213 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003214 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003215 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003216 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003218 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003220
Christian Heimesa170fa12017-09-15 20:27:30 +02003221 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 """Connecting to an SSLv23 server with various client options"""
3223 if support.verbose:
3224 sys.stdout.write("\n")
Christian Heimes19311322019-09-26 22:53:09 +02003225 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003226 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003227 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003228 except OSError as x:
3229 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3230 if support.verbose:
3231 sys.stdout.write(
3232 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3233 % str(x))
Christian Heimes19311322019-09-26 22:53:09 +02003234 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003235 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3236 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimes19311322019-09-26 22:53:09 +02003237 if has_tls_version('TLSv1'):
3238 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003239
Christian Heimes19311322019-09-26 22:53:09 +02003240 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003241 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3242 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimes19311322019-09-26 22:53:09 +02003243 if has_tls_version('TLSv1'):
3244 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003245
Christian Heimes19311322019-09-26 22:53:09 +02003246 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003247 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3248 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimes19311322019-09-26 22:53:09 +02003249 if has_tls_version('TLSv1'):
3250 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003251
3252 # Server with specific SSL options
Christian Heimes19311322019-09-26 22:53:09 +02003253 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003254 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003255 server_options=ssl.OP_NO_SSLv3)
3256 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003257 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003258 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes19311322019-09-26 22:53:09 +02003259 if has_tls_version('TLSv1'):
3260 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3261 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262
Christian Heimes19311322019-09-26 22:53:09 +02003263 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003264 def test_protocol_sslv3(self):
3265 """Connecting to an SSLv3 server with various client options"""
3266 if support.verbose:
3267 sys.stdout.write("\n")
3268 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3270 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimes19311322019-09-26 22:53:09 +02003271 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003273 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003274 client_options=ssl.OP_NO_SSLv3)
3275 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3276 if no_sslv2_implies_sslv3_hello():
3277 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003278 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003279 False, client_options=ssl.OP_NO_SSLv2)
3280
Christian Heimes19311322019-09-26 22:53:09 +02003281 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003282 def test_protocol_tlsv1(self):
3283 """Connecting to a TLSv1 server with various client options"""
3284 if support.verbose:
3285 sys.stdout.write("\n")
3286 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3288 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimes19311322019-09-26 22:53:09 +02003289 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003290 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimes19311322019-09-26 22:53:09 +02003291 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003292 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003293 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003294 client_options=ssl.OP_NO_TLSv1)
3295
Christian Heimes19311322019-09-26 22:53:09 +02003296 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003297 def test_protocol_tlsv1_1(self):
3298 """Connecting to a TLSv1.1 server with various client options.
3299 Testing against older TLS versions."""
3300 if support.verbose:
3301 sys.stdout.write("\n")
3302 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimes19311322019-09-26 22:53:09 +02003303 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003304 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimes19311322019-09-26 22:53:09 +02003305 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003306 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003307 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003308 client_options=ssl.OP_NO_TLSv1_1)
3309
Christian Heimesa170fa12017-09-15 20:27:30 +02003310 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimes19311322019-09-26 22:53:09 +02003311 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3312 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003313
Christian Heimes19311322019-09-26 22:53:09 +02003314 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003315 def test_protocol_tlsv1_2(self):
3316 """Connecting to a TLSv1.2 server with various client options.
3317 Testing against older TLS versions."""
3318 if support.verbose:
3319 sys.stdout.write("\n")
3320 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3321 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3322 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimes19311322019-09-26 22:53:09 +02003323 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003324 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimes19311322019-09-26 22:53:09 +02003325 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003327 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003328 client_options=ssl.OP_NO_TLSv1_2)
3329
Christian Heimesa170fa12017-09-15 20:27:30 +02003330 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003331 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3332 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3333 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3334 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3335
3336 def test_starttls(self):
3337 """Switching from clear text to encrypted and back again."""
3338 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3339
3340 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003341 starttls_server=True,
3342 chatty=True,
3343 connectionchatty=True)
3344 wrapped = False
3345 with server:
3346 s = socket.socket()
3347 s.setblocking(1)
3348 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003349 if support.verbose:
3350 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003351 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003352 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003353 sys.stdout.write(
3354 " client: sending %r...\n" % indata)
3355 if wrapped:
3356 conn.write(indata)
3357 outdata = conn.read()
3358 else:
3359 s.send(indata)
3360 outdata = s.recv(1024)
3361 msg = outdata.strip().lower()
3362 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3363 # STARTTLS ok, switch to secure mode
3364 if support.verbose:
3365 sys.stdout.write(
3366 " client: read %r from server, starting TLS...\n"
3367 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003368 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003369 wrapped = True
3370 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3371 # ENDTLS ok, switch back to clear text
3372 if support.verbose:
3373 sys.stdout.write(
3374 " client: read %r from server, ending TLS...\n"
3375 % msg)
3376 s = conn.unwrap()
3377 wrapped = False
3378 else:
3379 if support.verbose:
3380 sys.stdout.write(
3381 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003382 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003383 sys.stdout.write(" client: closing connection.\n")
3384 if wrapped:
3385 conn.write(b"over\n")
3386 else:
3387 s.send(b"over\n")
3388 if wrapped:
3389 conn.close()
3390 else:
3391 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003392
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 def test_socketserver(self):
3394 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003395 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003396 # try to connect
3397 if support.verbose:
3398 sys.stdout.write('\n')
3399 with open(CERTFILE, 'rb') as f:
3400 d1 = f.read()
3401 d2 = ''
3402 # now fetch the same data from the HTTPS server
3403 url = 'https://localhost:%d/%s' % (
3404 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003405 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003406 f = urllib.request.urlopen(url, context=context)
3407 try:
3408 dlen = f.info().get("content-length")
3409 if dlen and (int(dlen) > 0):
3410 d2 = f.read(int(dlen))
3411 if support.verbose:
3412 sys.stdout.write(
3413 " client: read %d bytes from remote server '%s'\n"
3414 % (len(d2), server))
3415 finally:
3416 f.close()
3417 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003418
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003419 def test_asyncore_server(self):
3420 """Check the example asyncore integration."""
3421 if support.verbose:
3422 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003423
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003424 indata = b"FOO\n"
3425 server = AsyncoreEchoServer(CERTFILE)
3426 with server:
3427 s = test_wrap_socket(socket.socket())
3428 s.connect(('127.0.0.1', server.port))
3429 if support.verbose:
3430 sys.stdout.write(
3431 " client: sending %r...\n" % indata)
3432 s.write(indata)
3433 outdata = s.read()
3434 if support.verbose:
3435 sys.stdout.write(" client: read %r\n" % outdata)
3436 if outdata != indata.lower():
3437 self.fail(
3438 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3439 % (outdata[:20], len(outdata),
3440 indata[:20].lower(), len(indata)))
3441 s.write(b"over\n")
3442 if support.verbose:
3443 sys.stdout.write(" client: closing connection.\n")
3444 s.close()
3445 if support.verbose:
3446 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003447
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003448 def test_recv_send(self):
3449 """Test recv(), send() and friends."""
3450 if support.verbose:
3451 sys.stdout.write("\n")
3452
3453 server = ThreadedEchoServer(CERTFILE,
3454 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003455 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003456 cacerts=CERTFILE,
3457 chatty=True,
3458 connectionchatty=False)
3459 with server:
3460 s = test_wrap_socket(socket.socket(),
3461 server_side=False,
3462 certfile=CERTFILE,
3463 ca_certs=CERTFILE,
3464 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003465 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003466 s.connect((HOST, server.port))
3467 # helper methods for standardising recv* method signatures
3468 def _recv_into():
3469 b = bytearray(b"\0"*100)
3470 count = s.recv_into(b)
3471 return b[:count]
3472
3473 def _recvfrom_into():
3474 b = bytearray(b"\0"*100)
3475 count, addr = s.recvfrom_into(b)
3476 return b[:count]
3477
3478 # (name, method, expect success?, *args, return value func)
3479 send_methods = [
3480 ('send', s.send, True, [], len),
3481 ('sendto', s.sendto, False, ["some.address"], len),
3482 ('sendall', s.sendall, True, [], lambda x: None),
3483 ]
3484 # (name, method, whether to expect success, *args)
3485 recv_methods = [
3486 ('recv', s.recv, True, []),
3487 ('recvfrom', s.recvfrom, False, ["some.address"]),
3488 ('recv_into', _recv_into, True, []),
3489 ('recvfrom_into', _recvfrom_into, False, []),
3490 ]
3491 data_prefix = "PREFIX_"
3492
3493 for (meth_name, send_meth, expect_success, args,
3494 ret_val_meth) in send_methods:
3495 indata = (data_prefix + meth_name).encode('ascii')
3496 try:
3497 ret = send_meth(indata, *args)
3498 msg = "sending with {}".format(meth_name)
3499 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3500 outdata = s.read()
3501 if outdata != indata.lower():
3502 self.fail(
3503 "While sending with <<{name:s}>> bad data "
3504 "<<{outdata:r}>> ({nout:d}) received; "
3505 "expected <<{indata:r}>> ({nin:d})\n".format(
3506 name=meth_name, outdata=outdata[:20],
3507 nout=len(outdata),
3508 indata=indata[:20], nin=len(indata)
3509 )
3510 )
3511 except ValueError as e:
3512 if expect_success:
3513 self.fail(
3514 "Failed to send with method <<{name:s}>>; "
3515 "expected to succeed.\n".format(name=meth_name)
3516 )
3517 if not str(e).startswith(meth_name):
3518 self.fail(
3519 "Method <<{name:s}>> failed with unexpected "
3520 "exception message: {exp:s}\n".format(
3521 name=meth_name, exp=e
3522 )
3523 )
3524
3525 for meth_name, recv_meth, expect_success, args in recv_methods:
3526 indata = (data_prefix + meth_name).encode('ascii')
3527 try:
3528 s.send(indata)
3529 outdata = recv_meth(*args)
3530 if outdata != indata.lower():
3531 self.fail(
3532 "While receiving with <<{name:s}>> bad data "
3533 "<<{outdata:r}>> ({nout:d}) received; "
3534 "expected <<{indata:r}>> ({nin:d})\n".format(
3535 name=meth_name, outdata=outdata[:20],
3536 nout=len(outdata),
3537 indata=indata[:20], nin=len(indata)
3538 )
3539 )
3540 except ValueError as e:
3541 if expect_success:
3542 self.fail(
3543 "Failed to receive with method <<{name:s}>>; "
3544 "expected to succeed.\n".format(name=meth_name)
3545 )
3546 if not str(e).startswith(meth_name):
3547 self.fail(
3548 "Method <<{name:s}>> failed with unexpected "
3549 "exception message: {exp:s}\n".format(
3550 name=meth_name, exp=e
3551 )
3552 )
3553 # consume data
3554 s.read()
3555
3556 # read(-1, buffer) is supported, even though read(-1) is not
3557 data = b"data"
3558 s.send(data)
3559 buffer = bytearray(len(data))
3560 self.assertEqual(s.read(-1, buffer), len(data))
3561 self.assertEqual(buffer, data)
3562
Christian Heimes888bbdc2017-09-07 14:18:21 -07003563 # sendall accepts bytes-like objects
3564 if ctypes is not None:
3565 ubyte = ctypes.c_ubyte * len(data)
3566 byteslike = ubyte.from_buffer_copy(data)
3567 s.sendall(byteslike)
3568 self.assertEqual(s.read(), data)
3569
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003570 # Make sure sendmsg et al are disallowed to avoid
3571 # inadvertent disclosure of data and/or corruption
3572 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003573 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003574 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3575 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3576 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003577 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 s.write(b"over\n")
3579
3580 self.assertRaises(ValueError, s.recv, -1)
3581 self.assertRaises(ValueError, s.read, -1)
3582
3583 s.close()
3584
3585 def test_recv_zero(self):
3586 server = ThreadedEchoServer(CERTFILE)
3587 server.__enter__()
3588 self.addCleanup(server.__exit__, None, None)
3589 s = socket.create_connection((HOST, server.port))
3590 self.addCleanup(s.close)
3591 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3592 self.addCleanup(s.close)
3593
3594 # recv/read(0) should return no data
3595 s.send(b"data")
3596 self.assertEqual(s.recv(0), b"")
3597 self.assertEqual(s.read(0), b"")
3598 self.assertEqual(s.read(), b"data")
3599
3600 # Should not block if the other end sends no data
3601 s.setblocking(False)
3602 self.assertEqual(s.recv(0), b"")
3603 self.assertEqual(s.recv_into(bytearray()), 0)
3604
3605 def test_nonblocking_send(self):
3606 server = ThreadedEchoServer(CERTFILE,
3607 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003608 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003609 cacerts=CERTFILE,
3610 chatty=True,
3611 connectionchatty=False)
3612 with server:
3613 s = test_wrap_socket(socket.socket(),
3614 server_side=False,
3615 certfile=CERTFILE,
3616 ca_certs=CERTFILE,
3617 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003618 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003619 s.connect((HOST, server.port))
3620 s.setblocking(False)
3621
3622 # If we keep sending data, at some point the buffers
3623 # will be full and the call will block
3624 buf = bytearray(8192)
3625 def fill_buffer():
3626 while True:
3627 s.send(buf)
3628 self.assertRaises((ssl.SSLWantWriteError,
3629 ssl.SSLWantReadError), fill_buffer)
3630
3631 # Now read all the output and discard it
3632 s.setblocking(True)
3633 s.close()
3634
3635 def test_handshake_timeout(self):
3636 # Issue #5103: SSL handshake must respect the socket timeout
3637 server = socket.socket(socket.AF_INET)
3638 host = "127.0.0.1"
3639 port = support.bind_port(server)
3640 started = threading.Event()
3641 finish = False
3642
3643 def serve():
3644 server.listen()
3645 started.set()
3646 conns = []
3647 while not finish:
3648 r, w, e = select.select([server], [], [], 0.1)
3649 if server in r:
3650 # Let the socket hang around rather than having
3651 # it closed by garbage collection.
3652 conns.append(server.accept()[0])
3653 for sock in conns:
3654 sock.close()
3655
3656 t = threading.Thread(target=serve)
3657 t.start()
3658 started.wait()
3659
3660 try:
3661 try:
3662 c = socket.socket(socket.AF_INET)
3663 c.settimeout(0.2)
3664 c.connect((host, port))
3665 # Will attempt handshake and time out
3666 self.assertRaisesRegex(socket.timeout, "timed out",
3667 test_wrap_socket, c)
3668 finally:
3669 c.close()
3670 try:
3671 c = socket.socket(socket.AF_INET)
3672 c = test_wrap_socket(c)
3673 c.settimeout(0.2)
3674 # Will attempt handshake and time out
3675 self.assertRaisesRegex(socket.timeout, "timed out",
3676 c.connect, (host, port))
3677 finally:
3678 c.close()
3679 finally:
3680 finish = True
3681 t.join()
3682 server.close()
3683
3684 def test_server_accept(self):
3685 # Issue #16357: accept() on a SSLSocket created through
3686 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003687 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003688 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003689 context.load_verify_locations(SIGNING_CA)
3690 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003691 server = socket.socket(socket.AF_INET)
3692 host = "127.0.0.1"
3693 port = support.bind_port(server)
3694 server = context.wrap_socket(server, server_side=True)
3695 self.assertTrue(server.server_side)
3696
3697 evt = threading.Event()
3698 remote = None
3699 peer = None
3700 def serve():
3701 nonlocal remote, peer
3702 server.listen()
3703 # Block on the accept and wait on the connection to close.
3704 evt.set()
3705 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003706 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003707
3708 t = threading.Thread(target=serve)
3709 t.start()
3710 # Client wait until server setup and perform a connect.
3711 evt.wait()
3712 client = context.wrap_socket(socket.socket())
3713 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003714 client.send(b'data')
3715 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003716 client_addr = client.getsockname()
3717 client.close()
3718 t.join()
3719 remote.close()
3720 server.close()
3721 # Sanity checks.
3722 self.assertIsInstance(remote, ssl.SSLSocket)
3723 self.assertEqual(peer, client_addr)
3724
3725 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003726 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727 with context.wrap_socket(socket.socket()) as sock:
3728 with self.assertRaises(OSError) as cm:
3729 sock.getpeercert()
3730 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3731
3732 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003733 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003734 with context.wrap_socket(socket.socket()) as sock:
3735 with self.assertRaises(OSError) as cm:
3736 sock.do_handshake()
3737 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3738
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003739 def test_no_shared_ciphers(self):
3740 client_context, server_context, hostname = testing_context()
3741 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3742 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003743 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003744 client_context.set_ciphers("AES128")
3745 server_context.set_ciphers("AES256")
3746 with ThreadedEchoServer(context=server_context) as server:
3747 with client_context.wrap_socket(socket.socket(),
3748 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003749 with self.assertRaises(OSError):
3750 s.connect((HOST, server.port))
3751 self.assertIn("no shared cipher", server.conn_errors[0])
3752
3753 def test_version_basic(self):
3754 """
3755 Basic tests for SSLSocket.version().
3756 More tests are done in the test_protocol_*() methods.
3757 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003758 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3759 context.check_hostname = False
3760 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003762 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003763 chatty=False) as server:
3764 with context.wrap_socket(socket.socket()) as s:
3765 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003766 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003767 s.connect((HOST, server.port))
Christian Heimes19311322019-09-26 22:53:09 +02003768 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003769 self.assertEqual(s.version(), 'TLSv1.3')
3770 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003771 self.assertEqual(s.version(), 'TLSv1.2')
3772 else: # 0.9.8 to 1.0.1
3773 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003774 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003775 self.assertIs(s.version(), None)
3776
Christian Heimes19311322019-09-26 22:53:09 +02003777 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003778 def test_tls1_3(self):
3779 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3780 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003781 context.options |= (
3782 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3783 )
3784 with ThreadedEchoServer(context=context) as server:
3785 with context.wrap_socket(socket.socket()) as s:
3786 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003787 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003788 'TLS_AES_256_GCM_SHA384',
3789 'TLS_CHACHA20_POLY1305_SHA256',
3790 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003791 })
3792 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003793
Christian Heimes19311322019-09-26 22:53:09 +02003794 @requires_minimum_version
3795 @requires_tls_version('TLSv1_2')
3796 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003797 client_context, server_context, hostname = testing_context()
3798 # client TLSv1.0 to 1.2
3799 client_context.minimum_version = ssl.TLSVersion.TLSv1
3800 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3801 # server only TLSv1.2
3802 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3803 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3804
3805 with ThreadedEchoServer(context=server_context) as server:
3806 with client_context.wrap_socket(socket.socket(),
3807 server_hostname=hostname) as s:
3808 s.connect((HOST, server.port))
3809 self.assertEqual(s.version(), 'TLSv1.2')
3810
Christian Heimes19311322019-09-26 22:53:09 +02003811 @requires_minimum_version
3812 @requires_tls_version('TLSv1_1')
3813 def test_min_max_version_tlsv1_1(self):
3814 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003815 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimes19311322019-09-26 22:53:09 +02003816 client_context.minimum_version = ssl.TLSVersion.TLSv1
3817 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003818 server_context.minimum_version = ssl.TLSVersion.TLSv1
3819 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3820
3821 with ThreadedEchoServer(context=server_context) as server:
3822 with client_context.wrap_socket(socket.socket(),
3823 server_hostname=hostname) as s:
3824 s.connect((HOST, server.port))
3825 self.assertEqual(s.version(), 'TLSv1.1')
3826
Christian Heimes19311322019-09-26 22:53:09 +02003827 @requires_minimum_version
3828 @requires_tls_version('TLSv1_2')
3829 def test_min_max_version_mismatch(self):
3830 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003831 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003832 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)d6ac67f2019-09-11 10:59:13 -07003833 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003834 client_context.maximum_version = ssl.TLSVersion.TLSv1
Miss Islington (bot)8af4e0c2019-09-11 11:19:12 -07003835 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003836 with ThreadedEchoServer(context=server_context) as server:
3837 with client_context.wrap_socket(socket.socket(),
3838 server_hostname=hostname) as s:
3839 with self.assertRaises(ssl.SSLError) as e:
3840 s.connect((HOST, server.port))
3841 self.assertIn("alert", str(e.exception))
3842
Christian Heimes19311322019-09-26 22:53:09 +02003843 @requires_minimum_version
3844 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003845 def test_min_max_version_sslv3(self):
3846 client_context, server_context, hostname = testing_context()
3847 server_context.minimum_version = ssl.TLSVersion.SSLv3
3848 client_context.minimum_version = ssl.TLSVersion.SSLv3
3849 client_context.maximum_version = ssl.TLSVersion.SSLv3
3850 with ThreadedEchoServer(context=server_context) as server:
3851 with client_context.wrap_socket(socket.socket(),
3852 server_hostname=hostname) as s:
3853 s.connect((HOST, server.port))
3854 self.assertEqual(s.version(), 'SSLv3')
3855
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003856 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3857 def test_default_ecdh_curve(self):
3858 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3859 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003860 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003862 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3863 # cipher name.
3864 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3866 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3867 # our default cipher list should prefer ECDH-based ciphers
3868 # automatically.
3869 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3870 context.set_ciphers("ECCdraft:ECDH")
3871 with ThreadedEchoServer(context=context) as server:
3872 with context.wrap_socket(socket.socket()) as s:
3873 s.connect((HOST, server.port))
3874 self.assertIn("ECDH", s.cipher()[0])
3875
3876 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3877 "'tls-unique' channel binding not available")
3878 def test_tls_unique_channel_binding(self):
3879 """Test tls-unique channel binding."""
3880 if support.verbose:
3881 sys.stdout.write("\n")
3882
Christian Heimes05d9fe32018-02-27 08:55:39 +01003883 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003884
3885 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003886 chatty=True,
3887 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003888
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003889 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003890 with client_context.wrap_socket(
3891 socket.socket(),
3892 server_hostname=hostname) as s:
3893 s.connect((HOST, server.port))
3894 # get the data
3895 cb_data = s.get_channel_binding("tls-unique")
3896 if support.verbose:
3897 sys.stdout.write(
3898 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003899
Christian Heimes05d9fe32018-02-27 08:55:39 +01003900 # check if it is sane
3901 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003902 if s.version() == 'TLSv1.3':
3903 self.assertEqual(len(cb_data), 48)
3904 else:
3905 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003906
Christian Heimes05d9fe32018-02-27 08:55:39 +01003907 # and compare with the peers version
3908 s.write(b"CB tls-unique\n")
3909 peer_data_repr = s.read().strip()
3910 self.assertEqual(peer_data_repr,
3911 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912
3913 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003914 with client_context.wrap_socket(
3915 socket.socket(),
3916 server_hostname=hostname) as s:
3917 s.connect((HOST, server.port))
3918 new_cb_data = s.get_channel_binding("tls-unique")
3919 if support.verbose:
3920 sys.stdout.write(
3921 "got another channel binding data: {0!r}\n".format(
3922 new_cb_data)
3923 )
3924 # is it really unique
3925 self.assertNotEqual(cb_data, new_cb_data)
3926 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003927 if s.version() == 'TLSv1.3':
3928 self.assertEqual(len(cb_data), 48)
3929 else:
3930 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003931 s.write(b"CB tls-unique\n")
3932 peer_data_repr = s.read().strip()
3933 self.assertEqual(peer_data_repr,
3934 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003935
3936 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003937 client_context, server_context, hostname = testing_context()
3938 stats = server_params_test(client_context, server_context,
3939 chatty=True, connectionchatty=True,
3940 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 if support.verbose:
3942 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3943 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3944
3945 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3946 "ssl.OP_NO_COMPRESSION needed for this test")
3947 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003948 client_context, server_context, hostname = testing_context()
3949 client_context.options |= ssl.OP_NO_COMPRESSION
3950 server_context.options |= ssl.OP_NO_COMPRESSION
3951 stats = server_params_test(client_context, server_context,
3952 chatty=True, connectionchatty=True,
3953 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003954 self.assertIs(stats['compression'], None)
3955
3956 def test_dh_params(self):
3957 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003958 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003959 # test scenario needs TLS <= 1.2
3960 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003961 server_context.load_dh_params(DHFILE)
3962 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003963 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003964 stats = server_params_test(client_context, server_context,
3965 chatty=True, connectionchatty=True,
3966 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 cipher = stats["cipher"][0]
3968 parts = cipher.split("-")
3969 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3970 self.fail("Non-DH cipher: " + cipher[0])
3971
Christian Heimesb7b92252018-02-25 09:49:31 +01003972 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003973 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003974 def test_ecdh_curve(self):
3975 # server secp384r1, client auto
3976 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003977
Christian Heimesb7b92252018-02-25 09:49:31 +01003978 server_context.set_ecdh_curve("secp384r1")
3979 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3980 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3981 stats = server_params_test(client_context, server_context,
3982 chatty=True, connectionchatty=True,
3983 sni_name=hostname)
3984
3985 # server auto, client secp384r1
3986 client_context, server_context, hostname = testing_context()
3987 client_context.set_ecdh_curve("secp384r1")
3988 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3989 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3990 stats = server_params_test(client_context, server_context,
3991 chatty=True, connectionchatty=True,
3992 sni_name=hostname)
3993
3994 # server / client curve mismatch
3995 client_context, server_context, hostname = testing_context()
3996 client_context.set_ecdh_curve("prime256v1")
3997 server_context.set_ecdh_curve("secp384r1")
3998 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3999 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
4000 try:
4001 stats = server_params_test(client_context, server_context,
4002 chatty=True, connectionchatty=True,
4003 sni_name=hostname)
4004 except ssl.SSLError:
4005 pass
4006 else:
4007 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004008 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004009 self.fail("mismatch curve did not fail")
4010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 def test_selected_alpn_protocol(self):
4012 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004013 client_context, server_context, hostname = testing_context()
4014 stats = server_params_test(client_context, server_context,
4015 chatty=True, connectionchatty=True,
4016 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 self.assertIs(stats['client_alpn_protocol'], None)
4018
4019 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4020 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4021 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004022 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004023 server_context.set_alpn_protocols(['foo', 'bar'])
4024 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004025 chatty=True, connectionchatty=True,
4026 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 self.assertIs(stats['client_alpn_protocol'], None)
4028
4029 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4030 def test_alpn_protocols(self):
4031 server_protocols = ['foo', 'bar', 'milkshake']
4032 protocol_tests = [
4033 (['foo', 'bar'], 'foo'),
4034 (['bar', 'foo'], 'foo'),
4035 (['milkshake'], 'milkshake'),
4036 (['http/3.0', 'http/4.0'], None)
4037 ]
4038 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004039 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004040 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004041 client_context.set_alpn_protocols(client_protocols)
4042
4043 try:
4044 stats = server_params_test(client_context,
4045 server_context,
4046 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004047 connectionchatty=True,
4048 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 except ssl.SSLError as e:
4050 stats = e
4051
Christian Heimes05d9fe32018-02-27 08:55:39 +01004052 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004053 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4054 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4055 self.assertIsInstance(stats, ssl.SSLError)
4056 else:
4057 msg = "failed trying %s (s) and %s (c).\n" \
4058 "was expecting %s, but got %%s from the %%s" \
4059 % (str(server_protocols), str(client_protocols),
4060 str(expected))
4061 client_result = stats['client_alpn_protocol']
4062 self.assertEqual(client_result, expected,
4063 msg % (client_result, "client"))
4064 server_result = stats['server_alpn_protocols'][-1] \
4065 if len(stats['server_alpn_protocols']) else 'nothing'
4066 self.assertEqual(server_result, expected,
4067 msg % (server_result, "server"))
4068
4069 def test_selected_npn_protocol(self):
4070 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004071 client_context, server_context, hostname = testing_context()
4072 stats = server_params_test(client_context, server_context,
4073 chatty=True, connectionchatty=True,
4074 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 self.assertIs(stats['client_npn_protocol'], None)
4076
4077 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4078 def test_npn_protocols(self):
4079 server_protocols = ['http/1.1', 'spdy/2']
4080 protocol_tests = [
4081 (['http/1.1', 'spdy/2'], 'http/1.1'),
4082 (['spdy/2', 'http/1.1'], 'http/1.1'),
4083 (['spdy/2', 'test'], 'spdy/2'),
4084 (['abc', 'def'], 'abc')
4085 ]
4086 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004087 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004088 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004089 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004090 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004091 chatty=True, connectionchatty=True,
4092 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 msg = "failed trying %s (s) and %s (c).\n" \
4094 "was expecting %s, but got %%s from the %%s" \
4095 % (str(server_protocols), str(client_protocols),
4096 str(expected))
4097 client_result = stats['client_npn_protocol']
4098 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4099 server_result = stats['server_npn_protocols'][-1] \
4100 if len(stats['server_npn_protocols']) else 'nothing'
4101 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4102
4103 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004105 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004106 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004107 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004108 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 client_context.load_verify_locations(SIGNING_CA)
4110 return server_context, other_context, client_context
4111
4112 def check_common_name(self, stats, name):
4113 cert = stats['peercert']
4114 self.assertIn((('commonName', name),), cert['subject'])
4115
4116 @needs_sni
4117 def test_sni_callback(self):
4118 calls = []
4119 server_context, other_context, client_context = self.sni_contexts()
4120
Christian Heimesa170fa12017-09-15 20:27:30 +02004121 client_context.check_hostname = False
4122
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004123 def servername_cb(ssl_sock, server_name, initial_context):
4124 calls.append((server_name, initial_context))
4125 if server_name is not None:
4126 ssl_sock.context = other_context
4127 server_context.set_servername_callback(servername_cb)
4128
4129 stats = server_params_test(client_context, server_context,
4130 chatty=True,
4131 sni_name='supermessage')
4132 # The hostname was fetched properly, and the certificate was
4133 # changed for the connection.
4134 self.assertEqual(calls, [("supermessage", server_context)])
4135 # CERTFILE4 was selected
4136 self.check_common_name(stats, 'fakehostname')
4137
4138 calls = []
4139 # The callback is called with server_name=None
4140 stats = server_params_test(client_context, server_context,
4141 chatty=True,
4142 sni_name=None)
4143 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004144 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004145
4146 # Check disabling the callback
4147 calls = []
4148 server_context.set_servername_callback(None)
4149
4150 stats = server_params_test(client_context, server_context,
4151 chatty=True,
4152 sni_name='notfunny')
4153 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004154 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004155 self.assertEqual(calls, [])
4156
4157 @needs_sni
4158 def test_sni_callback_alert(self):
4159 # Returning a TLS alert is reflected to the connecting client
4160 server_context, other_context, client_context = self.sni_contexts()
4161
4162 def cb_returning_alert(ssl_sock, server_name, initial_context):
4163 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4164 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004165 with self.assertRaises(ssl.SSLError) as cm:
4166 stats = server_params_test(client_context, server_context,
4167 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004168 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004169 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004171 @needs_sni
4172 def test_sni_callback_raising(self):
4173 # Raising fails the connection with a TLS handshake failure alert.
4174 server_context, other_context, client_context = self.sni_contexts()
4175
4176 def cb_raising(ssl_sock, server_name, initial_context):
4177 1/0
4178 server_context.set_servername_callback(cb_raising)
4179
Victor Stinner00253502019-06-03 03:51:43 +02004180 with support.catch_unraisable_exception() as catch:
4181 with self.assertRaises(ssl.SSLError) as cm:
4182 stats = server_params_test(client_context, server_context,
4183 chatty=False,
4184 sni_name='supermessage')
4185
4186 self.assertEqual(cm.exception.reason,
4187 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4188 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004190 @needs_sni
4191 def test_sni_callback_wrong_return_type(self):
4192 # Returning the wrong return type terminates the TLS connection
4193 # with an internal error alert.
4194 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004195
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004196 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4197 return "foo"
4198 server_context.set_servername_callback(cb_wrong_return_type)
4199
Victor Stinner00253502019-06-03 03:51:43 +02004200 with support.catch_unraisable_exception() as catch:
4201 with self.assertRaises(ssl.SSLError) as cm:
4202 stats = server_params_test(client_context, server_context,
4203 chatty=False,
4204 sni_name='supermessage')
4205
4206
4207 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4208 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004209
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004210 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004211 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004212 client_context.set_ciphers("AES128:AES256")
4213 server_context.set_ciphers("AES256")
4214 expected_algs = [
4215 "AES256", "AES-256",
4216 # TLS 1.3 ciphers are always enabled
4217 "TLS_CHACHA20", "TLS_AES",
4218 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004219
Christian Heimesa170fa12017-09-15 20:27:30 +02004220 stats = server_params_test(client_context, server_context,
4221 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004222 ciphers = stats['server_shared_ciphers'][0]
4223 self.assertGreater(len(ciphers), 0)
4224 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004225 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004226 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004227
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004228 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004229 client_context, server_context, hostname = testing_context()
4230 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004231
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004232 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004233 s = client_context.wrap_socket(socket.socket(),
4234 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004235 s.connect((HOST, server.port))
4236 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004237
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004238 self.assertRaises(ValueError, s.read, 1024)
4239 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004240
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004241 def test_sendfile(self):
4242 TEST_DATA = b"x" * 512
4243 with open(support.TESTFN, 'wb') as f:
4244 f.write(TEST_DATA)
4245 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004246 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004247 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004248 context.load_verify_locations(SIGNING_CA)
4249 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004250 server = ThreadedEchoServer(context=context, chatty=False)
4251 with server:
4252 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004253 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004254 with open(support.TESTFN, 'rb') as file:
4255 s.sendfile(file)
4256 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004257
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004258 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004259 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004260 # TODO: sessions aren't compatible with TLSv1.3 yet
4261 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004262
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004263 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004264 stats = server_params_test(client_context, server_context,
4265 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004266 session = stats['session']
4267 self.assertTrue(session.id)
4268 self.assertGreater(session.time, 0)
4269 self.assertGreater(session.timeout, 0)
4270 self.assertTrue(session.has_ticket)
4271 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4272 self.assertGreater(session.ticket_lifetime_hint, 0)
4273 self.assertFalse(stats['session_reused'])
4274 sess_stat = server_context.session_stats()
4275 self.assertEqual(sess_stat['accept'], 1)
4276 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004277
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004278 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004279 stats = server_params_test(client_context, server_context,
4280 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004281 sess_stat = server_context.session_stats()
4282 self.assertEqual(sess_stat['accept'], 2)
4283 self.assertEqual(sess_stat['hits'], 1)
4284 self.assertTrue(stats['session_reused'])
4285 session2 = stats['session']
4286 self.assertEqual(session2.id, session.id)
4287 self.assertEqual(session2, session)
4288 self.assertIsNot(session2, session)
4289 self.assertGreaterEqual(session2.time, session.time)
4290 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004292 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004293 stats = server_params_test(client_context, server_context,
4294 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004295 self.assertFalse(stats['session_reused'])
4296 session3 = stats['session']
4297 self.assertNotEqual(session3.id, session.id)
4298 self.assertNotEqual(session3, session)
4299 sess_stat = server_context.session_stats()
4300 self.assertEqual(sess_stat['accept'], 3)
4301 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004302
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004303 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004304 stats = server_params_test(client_context, server_context,
4305 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004306 self.assertTrue(stats['session_reused'])
4307 session4 = stats['session']
4308 self.assertEqual(session4.id, session.id)
4309 self.assertEqual(session4, session)
4310 self.assertGreaterEqual(session4.time, session.time)
4311 self.assertGreaterEqual(session4.timeout, session.timeout)
4312 sess_stat = server_context.session_stats()
4313 self.assertEqual(sess_stat['accept'], 4)
4314 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004315
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004316 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004317 client_context, server_context, hostname = testing_context()
4318 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004319
Christian Heimes05d9fe32018-02-27 08:55:39 +01004320 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004321 client_context.options |= ssl.OP_NO_TLSv1_3
4322 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004323
Christian Heimesa170fa12017-09-15 20:27:30 +02004324 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004325 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004326 with client_context.wrap_socket(socket.socket(),
4327 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004328 # session is None before handshake
4329 self.assertEqual(s.session, None)
4330 self.assertEqual(s.session_reused, None)
4331 s.connect((HOST, server.port))
4332 session = s.session
4333 self.assertTrue(session)
4334 with self.assertRaises(TypeError) as e:
4335 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004336 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004337
Christian Heimesa170fa12017-09-15 20:27:30 +02004338 with client_context.wrap_socket(socket.socket(),
4339 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004340 s.connect((HOST, server.port))
4341 # cannot set session after handshake
4342 with self.assertRaises(ValueError) as e:
4343 s.session = session
4344 self.assertEqual(str(e.exception),
4345 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004346
Christian Heimesa170fa12017-09-15 20:27:30 +02004347 with client_context.wrap_socket(socket.socket(),
4348 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004349 # can set session before handshake and before the
4350 # connection was established
4351 s.session = session
4352 s.connect((HOST, server.port))
4353 self.assertEqual(s.session.id, session.id)
4354 self.assertEqual(s.session, session)
4355 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004356
Christian Heimesa170fa12017-09-15 20:27:30 +02004357 with client_context2.wrap_socket(socket.socket(),
4358 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004359 # cannot re-use session with a different SSLContext
4360 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004361 s.session = session
4362 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004363 self.assertEqual(str(e.exception),
4364 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004365
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004366
Christian Heimes19311322019-09-26 22:53:09 +02004367@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004368class TestPostHandshakeAuth(unittest.TestCase):
4369 def test_pha_setter(self):
4370 protocols = [
4371 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4372 ]
4373 for protocol in protocols:
4374 ctx = ssl.SSLContext(protocol)
4375 self.assertEqual(ctx.post_handshake_auth, False)
4376
4377 ctx.post_handshake_auth = True
4378 self.assertEqual(ctx.post_handshake_auth, True)
4379
4380 ctx.verify_mode = ssl.CERT_REQUIRED
4381 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4382 self.assertEqual(ctx.post_handshake_auth, True)
4383
4384 ctx.post_handshake_auth = False
4385 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4386 self.assertEqual(ctx.post_handshake_auth, False)
4387
4388 ctx.verify_mode = ssl.CERT_OPTIONAL
4389 ctx.post_handshake_auth = True
4390 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4391 self.assertEqual(ctx.post_handshake_auth, True)
4392
4393 def test_pha_required(self):
4394 client_context, server_context, hostname = testing_context()
4395 server_context.post_handshake_auth = True
4396 server_context.verify_mode = ssl.CERT_REQUIRED
4397 client_context.post_handshake_auth = True
4398 client_context.load_cert_chain(SIGNED_CERTFILE)
4399
4400 server = ThreadedEchoServer(context=server_context, chatty=False)
4401 with server:
4402 with client_context.wrap_socket(socket.socket(),
4403 server_hostname=hostname) as s:
4404 s.connect((HOST, server.port))
4405 s.write(b'HASCERT')
4406 self.assertEqual(s.recv(1024), b'FALSE\n')
4407 s.write(b'PHA')
4408 self.assertEqual(s.recv(1024), b'OK\n')
4409 s.write(b'HASCERT')
4410 self.assertEqual(s.recv(1024), b'TRUE\n')
4411 # PHA method just returns true when cert is already available
4412 s.write(b'PHA')
4413 self.assertEqual(s.recv(1024), b'OK\n')
4414 s.write(b'GETCERT')
4415 cert_text = s.recv(4096).decode('us-ascii')
4416 self.assertIn('Python Software Foundation CA', cert_text)
4417
4418 def test_pha_required_nocert(self):
4419 client_context, server_context, hostname = testing_context()
4420 server_context.post_handshake_auth = True
4421 server_context.verify_mode = ssl.CERT_REQUIRED
4422 client_context.post_handshake_auth = True
4423
Miss Islington (bot)4c403b82019-07-09 05:55:08 -07004424 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4425 # (it is only raised sometimes on Windows)
4426 with support.catch_threading_exception() as cm:
4427 server = ThreadedEchoServer(context=server_context, chatty=False)
4428 with server:
4429 with client_context.wrap_socket(socket.socket(),
4430 server_hostname=hostname) as s:
4431 s.connect((HOST, server.port))
4432 s.write(b'PHA')
4433 # receive CertificateRequest
4434 self.assertEqual(s.recv(1024), b'OK\n')
4435 # send empty Certificate + Finish
4436 s.write(b'HASCERT')
4437 # receive alert
4438 with self.assertRaisesRegex(
4439 ssl.SSLError,
4440 'tlsv13 alert certificate required'):
4441 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004442
4443 def test_pha_optional(self):
4444 if support.verbose:
4445 sys.stdout.write("\n")
4446
4447 client_context, server_context, hostname = testing_context()
4448 server_context.post_handshake_auth = True
4449 server_context.verify_mode = ssl.CERT_REQUIRED
4450 client_context.post_handshake_auth = True
4451 client_context.load_cert_chain(SIGNED_CERTFILE)
4452
4453 # check CERT_OPTIONAL
4454 server_context.verify_mode = ssl.CERT_OPTIONAL
4455 server = ThreadedEchoServer(context=server_context, chatty=False)
4456 with server:
4457 with client_context.wrap_socket(socket.socket(),
4458 server_hostname=hostname) as s:
4459 s.connect((HOST, server.port))
4460 s.write(b'HASCERT')
4461 self.assertEqual(s.recv(1024), b'FALSE\n')
4462 s.write(b'PHA')
4463 self.assertEqual(s.recv(1024), b'OK\n')
4464 s.write(b'HASCERT')
4465 self.assertEqual(s.recv(1024), b'TRUE\n')
4466
4467 def test_pha_optional_nocert(self):
4468 if support.verbose:
4469 sys.stdout.write("\n")
4470
4471 client_context, server_context, hostname = testing_context()
4472 server_context.post_handshake_auth = True
4473 server_context.verify_mode = ssl.CERT_OPTIONAL
4474 client_context.post_handshake_auth = True
4475
4476 server = ThreadedEchoServer(context=server_context, chatty=False)
4477 with server:
4478 with client_context.wrap_socket(socket.socket(),
4479 server_hostname=hostname) as s:
4480 s.connect((HOST, server.port))
4481 s.write(b'HASCERT')
4482 self.assertEqual(s.recv(1024), b'FALSE\n')
4483 s.write(b'PHA')
4484 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004485 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004486 s.write(b'HASCERT')
4487 self.assertEqual(s.recv(1024), b'FALSE\n')
4488
4489 def test_pha_no_pha_client(self):
4490 client_context, server_context, hostname = testing_context()
4491 server_context.post_handshake_auth = True
4492 server_context.verify_mode = ssl.CERT_REQUIRED
4493 client_context.load_cert_chain(SIGNED_CERTFILE)
4494
4495 server = ThreadedEchoServer(context=server_context, chatty=False)
4496 with server:
4497 with client_context.wrap_socket(socket.socket(),
4498 server_hostname=hostname) as s:
4499 s.connect((HOST, server.port))
4500 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4501 s.verify_client_post_handshake()
4502 s.write(b'PHA')
4503 self.assertIn(b'extension not received', s.recv(1024))
4504
4505 def test_pha_no_pha_server(self):
4506 # server doesn't have PHA enabled, cert is requested in handshake
4507 client_context, server_context, hostname = testing_context()
4508 server_context.verify_mode = ssl.CERT_REQUIRED
4509 client_context.post_handshake_auth = True
4510 client_context.load_cert_chain(SIGNED_CERTFILE)
4511
4512 server = ThreadedEchoServer(context=server_context, chatty=False)
4513 with server:
4514 with client_context.wrap_socket(socket.socket(),
4515 server_hostname=hostname) as s:
4516 s.connect((HOST, server.port))
4517 s.write(b'HASCERT')
4518 self.assertEqual(s.recv(1024), b'TRUE\n')
4519 # PHA doesn't fail if there is already a cert
4520 s.write(b'PHA')
4521 self.assertEqual(s.recv(1024), b'OK\n')
4522 s.write(b'HASCERT')
4523 self.assertEqual(s.recv(1024), b'TRUE\n')
4524
4525 def test_pha_not_tls13(self):
4526 # TLS 1.2
4527 client_context, server_context, hostname = testing_context()
4528 server_context.verify_mode = ssl.CERT_REQUIRED
4529 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4530 client_context.post_handshake_auth = True
4531 client_context.load_cert_chain(SIGNED_CERTFILE)
4532
4533 server = ThreadedEchoServer(context=server_context, chatty=False)
4534 with server:
4535 with client_context.wrap_socket(socket.socket(),
4536 server_hostname=hostname) as s:
4537 s.connect((HOST, server.port))
4538 # PHA fails for TLS != 1.3
4539 s.write(b'PHA')
4540 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4541
Christian Heimesf22c4cf2019-07-01 09:25:48 +02004542 def test_bpo37428_pha_cert_none(self):
4543 # verify that post_handshake_auth does not implicitly enable cert
4544 # validation.
4545 hostname = SIGNED_CERTFILE_HOSTNAME
4546 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4547 client_context.post_handshake_auth = True
4548 client_context.load_cert_chain(SIGNED_CERTFILE)
4549 # no cert validation and CA on client side
4550 client_context.check_hostname = False
4551 client_context.verify_mode = ssl.CERT_NONE
4552
4553 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4554 server_context.load_cert_chain(SIGNED_CERTFILE)
4555 server_context.load_verify_locations(SIGNING_CA)
4556 server_context.post_handshake_auth = True
4557 server_context.verify_mode = ssl.CERT_REQUIRED
4558
4559 server = ThreadedEchoServer(context=server_context, chatty=False)
4560 with server:
4561 with client_context.wrap_socket(socket.socket(),
4562 server_hostname=hostname) as s:
4563 s.connect((HOST, server.port))
4564 s.write(b'HASCERT')
4565 self.assertEqual(s.recv(1024), b'FALSE\n')
4566 s.write(b'PHA')
4567 self.assertEqual(s.recv(1024), b'OK\n')
4568 s.write(b'HASCERT')
4569 self.assertEqual(s.recv(1024), b'TRUE\n')
4570 # server cert has not been validated
4571 self.assertEqual(s.getpeercert(), {})
4572
Christian Heimes9fb051f2018-09-23 08:32:31 +02004573
Christian Heimesc7f70692019-05-31 11:44:05 +02004574HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4575requires_keylog = unittest.skipUnless(
4576 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4577
4578class TestSSLDebug(unittest.TestCase):
4579
4580 def keylog_lines(self, fname=support.TESTFN):
4581 with open(fname) as f:
4582 return len(list(f))
4583
4584 @requires_keylog
4585 def test_keylog_defaults(self):
4586 self.addCleanup(support.unlink, support.TESTFN)
4587 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4588 self.assertEqual(ctx.keylog_filename, None)
4589
4590 self.assertFalse(os.path.isfile(support.TESTFN))
4591 ctx.keylog_filename = support.TESTFN
4592 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4593 self.assertTrue(os.path.isfile(support.TESTFN))
4594 self.assertEqual(self.keylog_lines(), 1)
4595
4596 ctx.keylog_filename = None
4597 self.assertEqual(ctx.keylog_filename, None)
4598
4599 with self.assertRaises((IsADirectoryError, PermissionError)):
4600 # Windows raises PermissionError
4601 ctx.keylog_filename = os.path.dirname(
4602 os.path.abspath(support.TESTFN))
4603
4604 with self.assertRaises(TypeError):
4605 ctx.keylog_filename = 1
4606
4607 @requires_keylog
4608 def test_keylog_filename(self):
4609 self.addCleanup(support.unlink, support.TESTFN)
4610 client_context, server_context, hostname = testing_context()
4611
4612 client_context.keylog_filename = support.TESTFN
4613 server = ThreadedEchoServer(context=server_context, chatty=False)
4614 with server:
4615 with client_context.wrap_socket(socket.socket(),
4616 server_hostname=hostname) as s:
4617 s.connect((HOST, server.port))
4618 # header, 5 lines for TLS 1.3
4619 self.assertEqual(self.keylog_lines(), 6)
4620
4621 client_context.keylog_filename = None
4622 server_context.keylog_filename = support.TESTFN
4623 server = ThreadedEchoServer(context=server_context, chatty=False)
4624 with server:
4625 with client_context.wrap_socket(socket.socket(),
4626 server_hostname=hostname) as s:
4627 s.connect((HOST, server.port))
4628 self.assertGreaterEqual(self.keylog_lines(), 11)
4629
4630 client_context.keylog_filename = support.TESTFN
4631 server_context.keylog_filename = support.TESTFN
4632 server = ThreadedEchoServer(context=server_context, chatty=False)
4633 with server:
4634 with client_context.wrap_socket(socket.socket(),
4635 server_hostname=hostname) as s:
4636 s.connect((HOST, server.port))
4637 self.assertGreaterEqual(self.keylog_lines(), 21)
4638
4639 client_context.keylog_filename = None
4640 server_context.keylog_filename = None
4641
4642 @requires_keylog
4643 @unittest.skipIf(sys.flags.ignore_environment,
4644 "test is not compatible with ignore_environment")
4645 def test_keylog_env(self):
4646 self.addCleanup(support.unlink, support.TESTFN)
4647 with unittest.mock.patch.dict(os.environ):
4648 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4649 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4650
4651 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4652 self.assertEqual(ctx.keylog_filename, None)
4653
4654 ctx = ssl.create_default_context()
4655 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4656
4657 ctx = ssl._create_stdlib_context()
4658 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4659
4660 def test_msg_callback(self):
4661 client_context, server_context, hostname = testing_context()
4662
4663 def msg_cb(conn, direction, version, content_type, msg_type, data):
4664 pass
4665
4666 self.assertIs(client_context._msg_callback, None)
4667 client_context._msg_callback = msg_cb
4668 self.assertIs(client_context._msg_callback, msg_cb)
4669 with self.assertRaises(TypeError):
4670 client_context._msg_callback = object()
4671
4672 def test_msg_callback_tls12(self):
4673 client_context, server_context, hostname = testing_context()
4674 client_context.options |= ssl.OP_NO_TLSv1_3
4675
4676 msg = []
4677
4678 def msg_cb(conn, direction, version, content_type, msg_type, data):
4679 self.assertIsInstance(conn, ssl.SSLSocket)
4680 self.assertIsInstance(data, bytes)
4681 self.assertIn(direction, {'read', 'write'})
4682 msg.append((direction, version, content_type, msg_type))
4683
4684 client_context._msg_callback = msg_cb
4685
4686 server = ThreadedEchoServer(context=server_context, chatty=False)
4687 with server:
4688 with client_context.wrap_socket(socket.socket(),
4689 server_hostname=hostname) as s:
4690 s.connect((HOST, server.port))
4691
Christian Heimese35d1ba2019-06-03 20:40:15 +02004692 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004693 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4694 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004695 msg
4696 )
4697 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004698 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4699 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004700 msg
4701 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004702
4703
Thomas Woutersed03b412007-08-28 21:37:11 +00004704def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004705 if support.verbose:
4706 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004707 'Mac': platform.mac_ver,
4708 'Windows': platform.win32_ver,
4709 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004710 for name, func in plats.items():
4711 plat = func()
4712 if plat and plat[0]:
4713 plat = '%s %r' % (name, plat)
4714 break
4715 else:
4716 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004717 print("test_ssl: testing with %r %r" %
4718 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4719 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004720 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004721 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4722 try:
4723 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4724 except AttributeError:
4725 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004726
Antoine Pitrou152efa22010-05-16 18:19:27 +00004727 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004728 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004729 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004730 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004731 BADCERT, BADKEY, EMPTYCERT]:
4732 if not os.path.exists(filename):
4733 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004734
Martin Panter3840b2a2016-03-27 01:53:46 +00004735 tests = [
4736 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004737 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004738 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004739 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004740
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004741 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004742 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004743
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004744 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004745 try:
4746 support.run_unittest(*tests)
4747 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004748 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004749
4750if __name__ == "__main__":
4751 test_main()